diff --git a/booth.spec b/booth.spec index 8da407e..8b10c14 100644 --- a/booth.spec +++ b/booth.spec @@ -1,189 +1,190 @@ %if 0%{?suse_version} %global booth_docdir %{_defaultdocdir}/%{name} %else # newer fedora distros have _pkgdocdir, rely on that when # available %{!?_pkgdocdir: %global _pkgdocdir %%{_docdir}/%{name}-%{version}} # Directory where we install documentation %global booth_docdir %{_pkgdocdir} %endif %global test_path %{_datadir}/booth/tests %if 0%{?suse_version} %define _libexecdir %{_libdir} %endif %define with_extra_warnings 0 %define with_debugging 0 %define without_fatal_warnings 1 %define _fwdefdir /etc/sysconfig/SuSEfirewall2.d/services %if 0%{?fedora} || 0%{?centos} || 0%{?rhel} %define pkg_group System Environment/Daemons %else %define pkg_group Productivity/Clustering/HA %endif Name: booth Url: https://github.com/ClusterLabs/booth Summary: Ticket Manager for Multi-site Clusters License: GPL-2.0+ Group: %{pkg_group} Version: 0.2.0 Release: 0 Source: booth.tar.bz2 Source1: %name-rpmlintrc BuildRoot: %{_tmppath}/%{name}-%{version}-build BuildRequires: asciidoc BuildRequires: autoconf BuildRequires: automake BuildRequires: glib2-devel +BuildRequires: mhash-devel %if 0%{?fedora} || 0%{?centos} || 0%{?rhel} BuildRequires: cluster-glue-libs-devel BuildRequires: pacemaker-libs-devel %else BuildRequires: libglue-devel BuildRequires: libpacemaker-devel %endif BuildRequires: libxml2-devel BuildRequires: pkgconfig %if 0%{?fedora} || 0%{?centos} || 0%{?rhel} Requires: pacemaker >= 1.1.8 Requires: cluster-glue-libs >= 1.0.6 %else Requires: pacemaker-ticket-support >= 2.0 %endif %description Booth manages tickets which authorize cluster sites located in geographically dispersed locations to run resources. It facilitates support of geographically distributed clustering in Pacemaker. %prep %setup -q -n %{name} %build ./autogen.sh %configure \ --with-initddir=%{_initrddir} \ --docdir=%{booth_docdir} make #except check #%check #make check %install make DESTDIR=$RPM_BUILD_ROOT install docdir=%{booth_docdir} mkdir -p %{buildroot}/%{_mandir}/man8/ gzip < docs/boothd.8 > %{buildroot}/%{_mandir}/man8/booth.8.gz ln %{buildroot}/%{_mandir}/man8/booth.8.gz %{buildroot}/%{_mandir}/man8/boothd.8.gz %if %{defined _unitdir} # systemd mkdir -p %{buildroot}/%{_unitdir} cp -a conf/booth@.service %{buildroot}/%{_unitdir}/booth@.service ln -s /usr/sbin/service %{buildroot}%{_sbindir}/rcbooth-arbitrator %else # sysV init ln -s ../../%{_initddir}/booth-arbitrator %{buildroot}%{_sbindir}/rcbooth-arbitrator %endif #install test-parts mkdir -p %{buildroot}/%{test_path} cp -a unit-tests/ script/unit-test.py test conf %{buildroot}/%{test_path}/ chmod +x %{buildroot}/%{test_path}/test/booth_path chmod +x %{buildroot}/%{test_path}/test/live_test.sh mkdir -p %{buildroot}/%{test_path}/src/ ln -s %{_sbindir}/boothd %{buildroot}/%{test_path}/src/ rm -f %{buildroot}/%{test_path}/test/*.pyc %if 0%{?suse_version} #SUSE firewall rule mkdir -p $RPM_BUILD_ROOT/%{_fwdefdir} install -m 644 %{S:2} $RPM_BUILD_ROOT/%{_fwdefdir}/booth %endif %check %if 0%{?run_build_tests} echo "%%run_build_tests set to %run_build_tests; including tests" make check %else echo "%%run_build_tests set to %run_build_tests; skipping tests" %endif %clean rm -rf %{buildroot} %files %defattr(-,root,root,-) %{_sbindir}/booth %{_sbindir}/boothd %{_mandir}/man8/booth.8.gz %{_mandir}/man8/boothd.8.gz %dir /usr/lib/ocf %dir /usr/lib/ocf/resource.d %dir /usr/lib/ocf/resource.d/pacemaker %dir %{_sysconfdir}/booth %{_sbindir}/rcbooth-arbitrator /usr/lib/ocf/resource.d/pacemaker/booth-site %config %{_sysconfdir}/booth/booth.conf.example %if 0%{?suse_version} %config %{_fwdefdir}/booth %endif %if %{defined _unitdir} %{_unitdir}/booth@.service %exclude %{_initddir}/booth-arbitrator %else %{_initddir}/booth-arbitrator %endif %dir %{_datadir}/booth %{_datadir}/booth/service-runnable %doc AUTHORS README COPYING %doc README.upgrade-from-v0.1 # this should be preun, but... %pre # new installation? test -x %{_sbindir}/booth || exit 0 # stop the arbitrator if it's the previous paxos version 1.0 if [ "`booth version | awk '{print $2}'`" = "1.0" ]; then echo "booth v0.1 found" if grep -qs 'ticket.*;' /etc/booth/booth.conf; then echo "Convert the booth configuration in /etc/booth/booth.conf!" fi if ps -o pid,cmd -e | grep -qs "[b]oothd arbitrator"; then rcbooth-arbitrator stop fi fi exit 0 %package test Summary: Test scripts for Booth Group: %{pkg_group} Requires: booth Requires: python %description test This package contains automated tests for Booth, the Cluster Ticket Manager for Pacemaker. %files test %defattr(-,root,root) %doc README-testing %{test_path} %dir /usr/lib/ocf %dir /usr/lib/ocf/resource.d %dir /usr/lib/ocf/resource.d/booth /usr/lib/ocf/resource.d/booth/sharedrsc %changelog diff --git a/docs/boothd.8.txt b/docs/boothd.8.txt index 4f46005..7310ae7 100644 --- a/docs/boothd.8.txt +++ b/docs/boothd.8.txt @@ -1,490 +1,502 @@ BOOTHD(8) =========== :doctype: manpage NAME ---- boothd - The Booth Cluster Ticket Manager. SYNOPSIS -------- *boothd* 'daemon' [-SD] [-c 'config'] [-l 'lockfile'] *booth* 'list' [-s 'site'] [-c 'config'] *booth* 'grant' [-s 'site'] [-c 'config'] [-FCw] 'ticket' *booth* 'revoke' [-s 'site'] [-c 'config'] [-w] 'ticket' *booth* 'status' [-D] [-c 'config'] DESCRIPTION ----------- Booth manages tickets which authorizes one of the cluster sites located in geographically dispersed distances to run certain resources. It is designed to be extend Pacemaker to support geographically distributed clustering. It is based on the RAFT protocol, see eg. for details. SHORT EXAMPLES -------------- --------------------- # boothd daemon -D # booth list # booth grant ticket-nfs # booth revoke ticket-nfs --------------------- OPTIONS ------- *-c* 'configfile':: Configuration to use. + Can be a full path to a configuration file, or a short name; in the latter case, the directory '/etc/booth' and suffix '.conf' are added. Per default 'booth' is used, which results in the path '/etc/booth/booth.conf'. + The configuration name also determines the name of the PID file - for the defaults, '/var/run/booth/booth.pid'. *-s*:: Site address or name. *-F*:: 'immediate grant': Don't wait for unreachable sites to relinquish the ticket. See the 'Booth ticket management' section below for more details. + This option may be DANGEROUS. It makes booth grant the ticket even though it cannot ascertain that unreachable sites don't hold the same ticket. It is up to the user to make sure that unreachable sites don't have this ticket as granted. *-w*:: 'wait for the request outcome': The client waits for the final outcome of grant or revoke request. *-C*:: 'wait for ticket commit to CIB': The client waits for the ticket commit to CIB (only for grant requests). If one or more sites are unreachable, this takes the ticket expire time (plus, if defined, the 'acquire-after' time). *-h*, *--help*:: Give a short usage output. *--version*:: Report version information. *-S*:: 'systemd' mode: don't fork. This is like '-D' but without the debug output. *-D*:: Debug output/don't daemonize. Increases the debug output level; booth daemon remains in the foreground. *-l* 'lockfile':: Use another lock file. By default, the lock file name is inferred from the configuration file name. Normally not needed. COMMANDS -------- Whether the binary is called as 'boothd' or 'booth' doesn't matter; the first argument determines the mode of operation. *'daemon'*:: Tells 'boothd' to serve a site. The locally configured interfaces are searched for an IP address that is defined in the configuration. booth then runs in either /arbitrator/ or /site/ mode. *'client'*:: Booth clients can list the ticket information (see also 'crm_ticket -L'), and revoke or grant tickets to a site. + The grant and, under certain circumstances, revoke operations may take a while to return a definite operation's outcome. The client will wait up to the network timeout value (by default 5 seconds) for the result. Unless the '-w' option was set, in which case the client waits indefinitely. + In this mode the configuration file is searched for an IP address that is locally reachable, ie. matches a configured subnet. This allows to run the client commands on another node in the same cluster, as long as the config file and the service IP is locally reachable. + For instance, if the booth service IP is 192.168.55.200, and the local node has 192.168.55.15 configured on one of its network interfaces, it knows which site it belongs to. + Use '-s' to direct client to connect to a different site. *'status'*:: 'boothd' looks for the (locked) PID file and the UDP socket, prints some output to stdout (for use in shell scripts) and returns an OCF-compatible return code. With '-D', a human-readable message is printed to STDERR as well. CONFIGURATION FILE ------------------ The configuration file must be identical on all sites and arbitrators. A minimal file may look like this: ----------------------- site="192.168.201.100" site="192.168.202.100" arbitrator="192.168.203.100" ticket="ticket-db8" ----------------------- Comments start with a hash-sign (''#''). Whitespace at the start and end of the line, and around the ''='', are ignored. The following key/value pairs are defined: *'port'*:: The UDP/TCP port to use. Default is '9929'. *'transport'*:: The transport protocol to use for Raft exchanges. Currently only UDP is supported. + Clients use TCP to communicate with a daemon; Booth will always bind and listen to both UDP and TCP ports. +*'authfile'*:: + File containing the authentication key. The key can be either + binary or text. If the latter, then both leading and trailing + white space, including new lines, is ignored. This key is a + shared secret and used to authenticate both clients and + servers. The key must be between 8 and 64 characters long and + be readable only by the file owner. + *'site'*:: Defines a site Raft member with the given IP. Sites can acquire tickets. The sites' IP should be managed by the cluster. *'arbitrator'*:: Defines an arbitrator Raft member with the given IP. Arbitrators help reach consensus in elections and cannot hold tickets. Booth needs at least three members for normal operation. Odd number of members provides more redundancy. *'site-user'*, *'site-group'*, *'arbitrator-user'*, *'arbitrator-group'*:: These define the credentials 'boothd' will be running with. + On a (Pacemaker) site the booth process will have to call 'crm_ticket', so the default is to use 'hacluster':'haclient'; for an arbitrator this user and group might not exists, so there we default to 'nobody':'nobody'. *'ticket'*:: Registers a ticket. Multiple tickets can be handled by single Booth instance. + Use the special ticket name '__defaults__' to modify the defaults. The '__defaults__' stanza must precede all the other ticket specifications. All times are in seconds. *'expire'*:: The lease time for a ticket. After that time the ticket can be acquired by another site if the ticket holder is not reachable. + The default is '600'. *'acquire-after'*:: Once a ticket is lost, wait this time in addition before acquiring the ticket. + This is to allow for the site that lost the ticket to relinquish the resources, by either stopping them or fencing a node. + A typical delay might be 60 seconds, but ultimately it depends on the protected resources and the fencing configuration. + The default is '0'. *'renewal-freq'*:: Set the ticket renewal frequency period. + If the network reliability is often reduced over prolonged periods, it is advisable to try to renew more often. + Before every renewal, if defined, the command specified in 'before-acquire-handler' is run. In that case the 'renewal-freq' parameter is effectively also the local cluster monitoring interval. *'timeout'*:: After that time 'booth' will re-send packets if there was an insufficient number of replies. This should be long enough to allow packets to reach other members. + The default is '5'. *'retries'*:: Defines how many times to retry sending packets before giving up waiting for acks from other members. + Default is '10'. Values lower than 3 are illegal. + Ticket renewals should allow for this number of retries. Hence, the total retry time must be shorter than the renewal time (either half the expire time or *'renewal-freq'*): timeout*(retries+1) < renewal *'weights'*:: A comma-separated list of integers that define the weight of individual Raft members, in the same order as the 'site' and 'arbitrator' lines. + Default is '0' for all; this means that the order in the configuration file defines priority for conflicting requests. *'before-acquire-handler'*:: If set, this command will be called before 'boothd' tries to acquire or renew a ticket. On exit code other than 0, 'boothd' relinquishes the ticket. + Thus it is possible to ensure whether the services and its dependencies protected by the ticket are in good shape at this site. For instance, if a service in the dependency-chain has a failcount of 'INFINITY' on all available nodes, the service will be unable to run. In that case, it is of no use to claim the ticket. + 'boothd' waits synchronously for the result of the handler, so make sure that the program returns quickly. + See below for details about booth specific environment variables. The distributed 'service-runnable' script is an example which may be used to test whether a pacemaker resource can be started. One example of a booth configuration file: ----------------------- transport = udp port = 9930 # D-85774 site="192.168.201.100" # D-90409 site="::ffff:192.168.202.100" # A-1120 arbitrator="192.168.203.100" ticket="ticket-db8" expire = 600 acquire-after = 60 timeout = 10 retries = 5 renewal-freq = 60 before-acquire-handler = /usr/share/booth/service-runnable db8 ----------------------- BOOTH TICKET MANAGEMENT ----------------------- The booth cluster guarantees that every ticket is owned by only one site at the time. Tickets must be initially granted with the 'booth client grant' command. Once it gets granted, the ticket is managed by the booth cluster. Hence, only granted tickets are managed by 'booth'. If the ticket gets lost, i.e. that the other members of the booth cluster do not hear from the ticket owner in a sufficiently long time, one of the remaining sites will acquire the ticket. This is what is called _ticket failover_. If the remaining members cannot form a majority, then the ticket cannot fail over. A ticket may be revoked at any time with the 'booth client revoke' command. For revoke to succeed, the site holding the ticket must be reachable. Once the ticket is administratively revoked, it is not managed by the booth cluster anymore. For the booth cluster to start managing the ticket again, it must be again granted to a site. The grant operation, in case not all sites are reachable, may get delayed for the ticket expire time (and, if defined, the 'acquire-after' time). The reason is that the other booth members may not know if the ticket is currently granted at the unreachable site. This delay may be disabled with the '-F' option. In that case, it is up to the administrator to make sure that the unreachable site is not holding the ticket. When the ticket is managed by 'booth', it is dangerous to modify it manually using either `crm_ticket` command or `crm site ticket`. Neither of these tools is aware of 'booth' and, consequently, 'booth' itself may not be aware of any ticket status changes. A notable exception is setting the ticket to standby which is typically done before a planned failover. NOTES ----- Tickets are not meant to be moved around quickly, the default 'expire' time is 600 seconds (10 minutes). 'booth' works with both IPv4 and IPv6 addresses. 'booth' renews a ticket before it expires, to account for possible transmission delays. The renewal time, unless explicitly set, is set to half the 'expire' time. HANDLERS -------- Currently, there's only one external handler defined (see the 'before-acquire-handler' configuration item above). The following environment variables are exported to the handler: *'BOOTH_TICKET':: The ticket name, as given in the configuration file. (See 'ticket' item above.) *'BOOTH_LOCAL':: The local site name, as defined in 'site'. *'BOOTH_CONF_PATH':: The path to the active configuration file. *'BOOTH_CONF_NAME':: The configuration name, as used by the '-c' commandline argument. *'BOOTH_TICKET_EXPIRES':: When the ticket expires (in seconds since 1.1.1970), or '0'. The handler is invoked with positional arguments specified after it. FILES ----- *'/etc/booth/booth.conf'*:: The default configuration file name. See also the '-c' argument. +*'/etc/booth/authkey'*:: + There is no default, but this is a typical location for the + shared secret (authentication key). + *'/var/run/booth/'*:: Directory that holds PID/lock files. See also the 'status' command. RAFT IMPLEMENTATION ------------------- In essence, every ticket corresponds to a separate Raft cluster. A ticket is granted to the Raft _Leader_ which then owns (or keeps) the ticket. ARBITRATOR MANAGEMENT --------------------- The booth daemon for an arbitrator which typically doesn't run the cluster stack, may be started through systemd or with '/etc/init.d/booth-arbitrator', depending on which init system the platform supports. The SysV init script starts a booth arbitrator for every configuration file found in '/etc/booth'. Platforms running systemd can enable and start every configuration separately using 'systemctl': ----------- # systemctl enable booth@ # systemctl start booth@ ----------- 'systemctl' requires the configuration name, even for the default name 'booth'. EXIT STATUS ----------- *0*:: Success. For the 'status' command: Daemon running. *1* (PCMK_OCF_UNKNOWN_ERROR):: General error code. *7* (PCMK_OCF_NOT_RUNNING):: No daemon process for that configuration active. BUGS ---- Booth is tested regularly. See the `README-testing` file for more information. Please report any bugs either at GitHub: Or, if you prefer bugzilla, at openSUSE bugzilla (component "High Availability"): https://bugzilla.opensuse.org/enter_bug.cgi?product=openSUSE%20Factory AUTHOR ------ 'boothd' was originally written (mostly) by Jiaju Zhang. In 2013 and 2014 Philipp Marek took over maintainership. Since April 2014 it has been mainly developed by Dejan Muhamedagic. Many people contributed (see the `AUTHORS` file). RESOURCES --------- GitHub: Documentation: COPYING ------- Copyright (C) 2011 Jiaju Zhang Copyright (C) 2013-2014 Philipp Marek Copyright (C) 2014 Dejan Muhamedagic Free use of this software is granted under the terms of the GNU General Public License (GPL). // vim: set ft=asciidoc : diff --git a/src/auth.c b/src/auth.c new file mode 100644 index 0000000..b419400 --- /dev/null +++ b/src/auth.c @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2015 Dejan Muhamedagic + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include "auth.h" + +/* calculate the HMAC of the message in data and store it in result + * it is up to the caller to make sure that there's enough space + * at result for the MAC + */ +int calc_hmac(const void *data, size_t datalen, + hashid hid, unsigned char *result, char *key, int keylen) +{ + MHASH td; + size_t block_size; + + block_size = mhash_get_hash_pblock(hid); + if (!block_size) + return -1; + + td = mhash_hmac_init(hid, key, keylen, block_size); + if (!td) + return -1; + + (void)mhash(td, data, datalen); + if (mhash_hmac_deinit(td, result)) + return -1; + + return 0; +} + +/* test HMAC + */ +int verify_hmac(const void *data, size_t datalen, + hashid hid, unsigned char *hmac, char *key, int keylen) +{ + MHASH td; + unsigned char *our_hmac = NULL; + int rc = -1; + + td = mhash_hmac_init(hid, key, keylen, + mhash_get_hash_pblock(hid)); + if (!td) + return -1; + + our_hmac = malloc(mhash_get_block_size(hid)); + if (!our_hmac) + return -1; + + (void)mhash(td, data, datalen); + if (mhash_hmac_deinit(td, our_hmac)) + goto out_free; + + rc = memcmp(our_hmac, hmac, mhash_get_block_size(hid)); + +out_free: + if (our_hmac) + free(our_hmac); + return rc; +} diff --git a/src/auth.h b/src/auth.h new file mode 100644 index 0000000..49a7007 --- /dev/null +++ b/src/auth.h @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2015 Dejan Muhamedagic + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +#include "b_config.h" +#include + +#if HAVE_LIBMHASH +#include + +#define BOOTH_HASH MHASH_SHA1 + +int calc_hmac(const void *data, size_t datalen, + hashid hid, unsigned char *result, char *key, int keylen); +int verify_hmac(const void *data, size_t datalen, + hashid hid, unsigned char *hmac, char *key, int keylen); +#endif diff --git a/src/booth.h b/src/booth.h index 71ff722..91a1967 100644 --- a/src/booth.h +++ b/src/booth.h @@ -1,288 +1,308 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef _BOOTH_H #define _BOOTH_H #include #include #include #include #include #define BOOTH_RUN_DIR "/var/run/booth/" #define BOOTH_LOG_DIR "/var/log" #define BOOTH_LOGFILE_NAME "booth.log" #define BOOTH_DEFAULT_CONF_DIR "/etc/booth/" #define BOOTH_DEFAULT_CONF_NAME "booth" #define BOOTH_DEFAULT_CONF_EXT ".conf" #define BOOTH_DEFAULT_CONF \ BOOTH_DEFAULT_CONF_DIR BOOTH_DEFAULT_CONF_NAME BOOTH_DEFAULT_CONF_EXT #define DAEMON_NAME "boothd" #define BOOTH_PATH_LEN 127 +#define BOOTH_MAX_KEY_LEN 64 +#define BOOTH_MIN_KEY_LEN 8 +/* hash size is 160 bits (sha1), but add a bit more space in case + * stronger hashes are required */ +#define BOOTH_MAC_SIZE 24 #define BOOTH_DEFAULT_PORT 9929 /* TODO: remove */ #define BOOTH_PROTO_FAMILY AF_INET #define BOOTHC_MAGIC 0x5F1BA08C #define BOOTHC_VERSION 0x00010003 /** Timeout value for poll(). * Determines frequency of periodic jobs, eg. when send-retries are done. * See process_tickets(). */ #define POLL_TIMEOUT 100 /** @{ */ /** The on-network data structures and constants. */ #define BOOTH_NAME_LEN 64 #define CHAR2CONST(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d) /* Says that the ticket shouldn't be active anywhere. * NONE wouldn't be specific enough. */ #define NO_ONE ((uint32_t)-1) /* Says that another one should recover. */ #define TICKET_LOST CHAR2CONST('L', 'O', 'S', 'T') typedef unsigned char boothc_site [BOOTH_NAME_LEN]; typedef unsigned char boothc_ticket[BOOTH_NAME_LEN]; struct boothc_header { /** Authentication data; not used now. */ uint32_t iv; uint32_t auth1; uint32_t auth2; /** BOOTHC_MAGIC */ uint32_t magic; /** BOOTHC_VERSION */ uint32_t version; /** Packet source; site_id. See add_site(). */ uint32_t from; /** Length including header */ uint32_t length; /** The command respectively protocol state. See cmd_request_t. */ uint32_t cmd; /** The matching request (what do we reply to). See cmd_request_t. */ uint32_t request; /** Command options. */ uint32_t options; /** The reason for this RPC. */ uint32_t reason; /** Result of operation. 0 == OK */ uint32_t result; char data[0]; } __attribute__((packed)); struct ticket_msg { /** Ticket name. */ boothc_ticket id; /** Current leader. May be NO_ONE. See add_site(). * For a OP_REQ_VOTE this is */ uint32_t leader; /** Current term. */ uint32_t term; uint32_t term_valid_for; /* Perhaps we need to send a status along, too - like * starting, running, stopping, error, ...? */ } __attribute__((packed)); +struct hmac { + /** hash id, currently set to constant BOOTH_HASH */ + uint32_t hid; + + /** the calculated hash, BOOTH_MAC_SIZE is big enough to + * accommodate the hash of type hid */ + unsigned char hash[BOOTH_MAC_SIZE]; +} __attribute__((packed)); + +struct boothc_hdr_msg { + struct boothc_header header; + struct hmac hmac; +} __attribute__((packed)); struct boothc_ticket_msg { struct boothc_header header; struct ticket_msg ticket; + struct hmac hmac; } __attribute__((packed)); typedef enum { /* 0x43 = "C"ommands */ CMD_LIST = CHAR2CONST('C', 'L', 's', 't'), CMD_GRANT = CHAR2CONST('C', 'G', 'n', 't'), CMD_REVOKE = CHAR2CONST('C', 'R', 'v', 'k'), /* Replies */ CL_RESULT = CHAR2CONST('R', 's', 'l', 't'), CL_LIST = CHAR2CONST('R', 'L', 's', 't'), CL_GRANT = CHAR2CONST('R', 'G', 'n', 't'), CL_REVOKE = CHAR2CONST('R', 'R', 'v', 'k'), /* get status from another server */ OP_STATUS = CHAR2CONST('S', 't', 'a', 't'), OP_MY_INDEX = CHAR2CONST('M', 'I', 'd', 'x'), /* reply to status */ /* Raft */ OP_REQ_VOTE = CHAR2CONST('R', 'V', 'o', 't'), /* start election */ OP_VOTE_FOR = CHAR2CONST('V', 't', 'F', 'r'), /* reply to REQ_VOTE */ OP_HEARTBEAT= CHAR2CONST('H', 'r', 't', 'B'), /* Heartbeat */ OP_ACK = CHAR2CONST('A', 'c', 'k', '.'), /* Ack for heartbeats and revokes */ OP_UPDATE = CHAR2CONST('U', 'p', 'd', 'E'), /* Update ticket */ OP_REVOKE = CHAR2CONST('R', 'e', 'v', 'k'), /* Revoke ticket */ OP_REJECTED = CHAR2CONST('R', 'J', 'C', '!'), } cmd_request_t; typedef enum { /* for compatibility with other functions */ RLT_SUCCESS = 0, RLT_ASYNC = CHAR2CONST('A', 's', 'y', 'n'), RLT_MORE = CHAR2CONST('M', 'o', 'r', 'e'), RLT_SYNC_SUCC = CHAR2CONST('S', 'c', 'c', 's'), RLT_SYNC_FAIL = CHAR2CONST('F', 'a', 'i', 'l'), RLT_INVALID_ARG = CHAR2CONST('I', 'A', 'r', 'g'), RLT_CIB_PENDING = CHAR2CONST('P', 'e', 'n', 'd'), RLT_EXT_FAILED = CHAR2CONST('X', 'P', 'r', 'g'), RLT_TICKET_IDLE = CHAR2CONST('T', 'i', 'd', 'l'), RLT_OVERGRANT = CHAR2CONST('O', 'v', 'e', 'r'), RLT_PROBABLY_SUCCESS = CHAR2CONST('S', 'u', 'c', '?'), RLT_BUSY = CHAR2CONST('B', 'u', 's', 'y'), RLT_TERM_OUTDATED = CHAR2CONST('T', 'O', 'd', 't'), RLT_TERM_STILL_VALID = CHAR2CONST('T', 'V', 'l', 'd'), RLT_YOU_OUTDATED = CHAR2CONST('O', 'u', 't', 'd'), RLT_REDIRECT = CHAR2CONST('R', 'e', 'd', 'r'), } cmd_result_t; typedef enum { /* for compatibility with other functions */ OR_JUST_SO = 0, OR_AGAIN = CHAR2CONST('A', 'a', 'a', 'a'), OR_TKT_LOST = CHAR2CONST('T', 'L', 's', 't'), OR_REACQUIRE = CHAR2CONST('R', 'a', 'c', 'q'), OR_ADMIN = CHAR2CONST('A', 'd', 'm', 'n'), OR_LOCAL_FAIL = CHAR2CONST('L', 'o', 'c', 'F'), OR_STEPDOWN = CHAR2CONST('S', 'p', 'd', 'n'), OR_SPLIT = CHAR2CONST('S', 'p', 'l', 't'), } cmd_reason_t; /* bitwise command options */ typedef enum { OPT_IMMEDIATE = 1, /* immediate grant */ OPT_WAIT = 2, /* wait for the elections' outcome */ OPT_WAIT_COMMIT = 4, /* wait for the ticket commit to CIB */ } cmd_options_t; /** @} */ /** @{ */ struct booth_site { /** Calculated ID. See add_site(). */ int site_id; int type; int local; /** Roles, like ACCEPTOR, PROPOSER, or LEARNER. Not really used ATM. */ int role; char addr_string[BOOTH_NAME_LEN]; int tcp_fd; int udp_fd; /* 0-based, used for indexing into per-ticket weights */ int index; uint64_t bitmask; unsigned short family; union { struct sockaddr_in sa4; struct sockaddr_in6 sa6; }; int saddrlen; int addrlen; } __attribute__((packed)); extern struct booth_site *local; extern struct booth_site * no_leader; /** @} */ struct booth_transport; struct client { int fd; const struct booth_transport *transport; void (*workfn)(int); void (*deadfn)(int); }; extern struct client *clients; extern struct pollfd *pollfds; int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)); int find_client_by_fd(int fd); int do_read(int fd, void *buf, size_t count); int do_write(int fd, void *buf, size_t count); void process_connection(int ci); void safe_copy(char *dest, char *value, size_t buflen, const char *description); +int update_authkey(void); struct command_line { int type; /* ACT_ */ int op; /* OP_ */ int options; /* OPT_ */ char configfile[BOOTH_PATH_LEN]; char lockfile[BOOTH_PATH_LEN]; char site[BOOTH_NAME_LEN]; struct boothc_ticket_msg msg; }; extern struct command_line cl; /* http://gcc.gnu.org/onlinedocs/gcc/Typeof.html */ #define min(a__,b__) \ ({ typeof (a__) _a = (a__); \ typeof (b__) _b = (b__); \ _a < _b ? _a : _b; }) #define max(a__,b__) \ ({ typeof (a__) _a = (a__); \ typeof (b__) _b = (b__); \ _a > _b ? _a : _b; }) #endif /* _BOOTH_H */ diff --git a/src/config.c b/src/config.c index 01847d8..a9e32d7 100644 --- a/src/config.c +++ b/src/config.c @@ -1,798 +1,809 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include #include #include #include #include +#include "b_config.h" #include "booth.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "log.h" static int ticket_size = 0; static int ticket_realloc(void) { const int added = 5; int had, want; void *p; had = booth_conf->ticket_allocated; want = had + added; p = realloc(booth_conf->ticket, sizeof(struct ticket_config) * want); if (!p) { log_error("can't alloc more tickets"); return -ENOMEM; } booth_conf->ticket = p; memset(booth_conf->ticket + had, 0, sizeof(struct ticket_config) * added); booth_conf->ticket_allocated = want; return 0; } int add_site(char *address, int type); int add_site(char *addr_string, int type) { int rv; struct booth_site *site; uLong nid; uint32_t mask; int i; rv = 1; if (booth_conf->site_count == MAX_NODES) { log_error("too many nodes"); goto out; } if (strlen(addr_string)+1 >= sizeof(booth_conf->site[0].addr_string)) { log_error("site address \"%s\" too long", addr_string); goto out; } site = booth_conf->site + booth_conf->site_count; site->family = BOOTH_PROTO_FAMILY; site->type = type; /* Make site_id start at a non-zero point. * Perhaps use hash over string or address? */ strcpy(site->addr_string, addr_string); site->index = booth_conf->site_count; site->bitmask = 1 << booth_conf->site_count; /* Catch site overflow */ assert(site->bitmask); booth_conf->all_bits |= site->bitmask; if (type == SITE) booth_conf->sites_bits |= site->bitmask; site->tcp_fd = -1; booth_conf->site_count++; rv = 0; memset(&site->sa6, 0, sizeof(site->sa6)); nid = crc32(0L, NULL, 0); /* Using the ASCII representation in site->addr_string (both sizeof() * and strlen()) gives quite a lot of collisions; a brute-force run * from 0.0.0.0 to 24.0.0.0 gives ~4% collisions, and this tends to * increase even more. * Whether there'll be a collision in real-life, with 3 or 5 nodes, is * another question ... but for now get the ID from the binary * representation - that had *no* collisions up to 32.0.0.0. */ if (inet_pton(AF_INET, site->addr_string, &site->sa4.sin_addr) > 0) { site->family = AF_INET; site->sa4.sin_family = site->family; site->sa4.sin_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa4); site->addrlen = sizeof(site->sa4.sin_addr); site->site_id = crc32(nid, (void*)&site->sa4.sin_addr, site->addrlen); } else if (inet_pton(AF_INET6, site->addr_string, &site->sa6.sin6_addr) > 0) { site->family = AF_INET6; site->sa6.sin6_family = site->family; site->sa6.sin6_flowinfo = 0; site->sa6.sin6_port = htons(booth_conf->port); site->saddrlen = sizeof(site->sa6); site->addrlen = sizeof(site->sa6.sin6_addr); site->site_id = crc32(nid, (void*)&site->sa6.sin6_addr, site->addrlen); } else { log_error("Address string \"%s\" is bad", site->addr_string); rv = EINVAL; } /* Make sure we will never collide with NO_ONE, * or be negative (to get "get_local_id() < 0" working). */ mask = 1 << (sizeof(site->site_id)*8 -1); assert(NO_ONE & mask); site->site_id &= ~mask; /* Test for collisions with other sites */ for(i=0; iindex; i++) if (booth_conf->site[i].site_id == site->site_id) { log_error("Got a site-ID collision. Please file a bug on https://github.com/ClusterLabs/booth/issues/new, attaching the configuration file."); exit(1); } out: return rv; } inline static char *skip_while_in(const char *cp, int (*fn)(int), const char *allowed) { /* strchr() returns a pointer to the terminator if *cp == 0. */ while (*cp && (fn(*cp) || strchr(allowed, *cp))) cp++; /* discard "const" qualifier */ return (char*)cp; } inline static char *skip_while(char *cp, int (*fn)(int)) { while (fn(*cp)) cp++; return cp; } inline static char *skip_until(char *cp, char expected) { while (*cp && *cp != expected) cp++; return cp; } static inline int is_end_of_line(char *cp) { char c = *cp; return c == '\n' || c == 0 || c == '#'; } static int add_ticket(const char *name, struct ticket_config **tkp, const struct ticket_config *def) { int rv; struct ticket_config *tk; if (booth_conf->ticket_count == booth_conf->ticket_allocated) { rv = ticket_realloc(); if (rv < 0) return rv; } tk = booth_conf->ticket + booth_conf->ticket_count; booth_conf->ticket_count++; tk->last_valid_tk = malloc(sizeof(struct ticket_config)); if (!tk->last_valid_tk) { log_error("out of memory"); return -ENOMEM; } memset(tk->last_valid_tk, 0, sizeof(struct ticket_config)); if (!check_max_len_valid(name, sizeof(tk->name))) { log_error("ticket name \"%s\" too long.", name); return -EINVAL; } if (find_ticket_by_name(name, NULL)) { log_error("ticket name \"%s\" used again.", name); return -EINVAL; } if (* skip_while_in(name, isalnum, "-/")) { log_error("ticket name \"%s\" invalid; only alphanumeric names.", name); return -EINVAL; } strcpy(tk->name, name); tk->timeout = def->timeout; tk->term_duration = def->term_duration; tk->retries = def->retries; memcpy(tk->weight, def->weight, sizeof(tk->weight)); if (tkp) *tkp = tk; return 0; } static int postproc_ticket(struct ticket_config *tk) { if (!tk) return 1; if (!tk->renewal_freq) { tk->renewal_freq = tk->term_duration/2; } if (tk->timeout*(tk->retries+1) >= tk->renewal_freq) { log_error("%s: total amount of time to " "retry sending packets cannot exceed " "renewal frequency " "(%d*(%d+1) >= %d)", tk->name, tk->timeout, tk->retries, tk->renewal_freq); return 0; } return 1; } /* returns number of weights, or -1 on bad input. */ static int parse_weights(const char *input, int weights[MAX_NODES]) { int i, v; char *cp; for(i=0; iproto = UDP; booth_conf->port = BOOTH_DEFAULT_PORT; + booth_conf->authkey[0] = '\0'; /* Provide safe defaults. -1 is reserved, though. */ booth_conf->uid = -2; booth_conf->gid = -2; strcpy(booth_conf->site_user, "hacluster"); strcpy(booth_conf->site_group, "haclient"); strcpy(booth_conf->arb_user, "nobody"); strcpy(booth_conf->arb_group, "nobody"); parse_weights("", defaults.weight); defaults.ext_verifier = NULL; defaults.term_duration = DEFAULT_TICKET_EXPIRY; defaults.timeout = DEFAULT_TICKET_TIMEOUT; defaults.retries = DEFAULT_RETRIES; defaults.acquire_after = 0; error = ""; log_debug("reading config file %s", path); while (fgets(line, sizeof(line), fp)) { lineno++; s = skip_while(line, isspace); if (is_end_of_line(s)) continue; key = s; /* Key */ end_of_key = skip_while_in(key, isalnum, "-_"); if (end_of_key == key) { error = "No key"; goto err; } if (!*end_of_key) goto exp_equal; /* whitespace, and something else but nothing more? */ s = skip_while(end_of_key, isspace); if (*s != '=') { exp_equal: error = "Expected '=' after key"; goto err; } s++; /* It's my buffer, and I terminate if I want to. */ /* But not earlier than that, because we had to check for = */ *end_of_key = 0; /* Value tokenizing */ s = skip_while(s, isspace); switch (*s) { case '"': case '\'': val = s+1; s = skip_until(val, *s); /* Terminate value */ if (!*s) { error = "Unterminated quoted string"; goto err; } /* Remove and skip quote */ *s = 0; s++; if (* skip_while(s, isspace)) { error = "Surplus data after value"; goto err; } *s = 0; break; case 0: no_value: error = "No value"; goto err; break; default: val = s; /* Rest of line. */ i = strlen(s); /* i > 0 because of "case 0" above. */ while (i > 0 && isspace(s[i-1])) i--; s += i; *s = 0; } if (val == s) goto no_value; if (strlen(key) > BOOTH_NAME_LEN || strlen(val) > BOOTH_NAME_LEN) { error = "key/value too long"; goto err; } if (strcmp(key, "transport") == 0) { if (got_transport) { error = "config file has multiple transport lines"; goto err; } if (strcasecmp(val, "UDP") == 0) booth_conf->proto = UDP; else if (strcasecmp(val, "SCTP") == 0) booth_conf->proto = SCTP; else { error = "invalid transport protocol"; goto err; } got_transport = 1; continue; } if (strcmp(key, "port") == 0) { booth_conf->port = atoi(val); continue; } if (strcmp(key, "name") == 0) { safe_copy(booth_conf->name, val, BOOTH_NAME_LEN, "name"); continue; } +#if HAVE_LIBMHASH + if (strcmp(key, "authfile") == 0) { + safe_copy(booth_conf->authfile, + val, BOOTH_PATH_LEN, + "authfile"); + continue; + } +#endif + if (strcmp(key, "site") == 0) { if (add_site(val, SITE)) goto out; continue; } if (strcmp(key, "arbitrator") == 0) { if (add_site(val, ARBITRATOR)) goto out; continue; } if (strcmp(key, "site-user") == 0) { safe_copy(booth_conf->site_user, optarg, BOOTH_NAME_LEN, "site-user"); continue; } if (strcmp(key, "site-group") == 0) { safe_copy(booth_conf->site_group, optarg, BOOTH_NAME_LEN, "site-group"); continue; } if (strcmp(key, "arbitrator-user") == 0) { safe_copy(booth_conf->arb_user, optarg, BOOTH_NAME_LEN, "arbitrator-user"); continue; } if (strcmp(key, "arbitrator-group") == 0) { safe_copy(booth_conf->arb_group, optarg, BOOTH_NAME_LEN, "arbitrator-group"); continue; } if (strcmp(key, "debug") == 0) { if (type != CLIENT) debug_level = max(debug_level, atoi(val)); continue; } if (strcmp(key, "ticket") == 0) { if (current_tk && strcmp(current_tk->name, "__defaults__")) { if (!postproc_ticket(current_tk)) { goto out; } } if (!strcmp(val, "__defaults__")) { current_tk = &defaults; } else if (add_ticket(val, ¤t_tk, &defaults)) { goto out; } continue; } /* current_tk must be allocated at this point, otherwise * we don't know to which ticket the key refers */ if (!current_tk) { error = "Unexpected keyword"; goto err; } if (strcmp(key, "expire") == 0) { current_tk->term_duration = read_time(val); if (current_tk->term_duration <= 0) { error = "Expected time >0 for expire"; goto err; } continue; } if (strcmp(key, "timeout") == 0) { current_tk->timeout = read_time(val); if (current_tk->timeout <= 0) { error = "Expected time >0 for timeout"; goto err; } if (!min_timeout) { min_timeout = current_tk->timeout; } else { min_timeout = min(min_timeout, current_tk->timeout); } continue; } if (strcmp(key, "retries") == 0) { current_tk->retries = strtol(val, &s, 0); if (*s || s == val || current_tk->retries<3 || current_tk->retries > 100) { error = "Expected plain integer value in the range [3, 100] for retries"; goto err; } continue; } if (strcmp(key, "renewal-freq") == 0) { current_tk->renewal_freq = read_time(val); if (current_tk->renewal_freq <= 0) { error = "Expected time >0 for renewal-freq"; goto err; } continue; } if (strcmp(key, "acquire-after") == 0) { current_tk->acquire_after = read_time(val); if (current_tk->acquire_after < 0) { error = "Expected time >=0 for acquire-after"; goto err; } continue; } if (strcmp(key, "before-acquire-handler") == 0) { if (current_tk->ext_verifier) { free(current_tk->ext_verifier); } current_tk->ext_verifier = strdup(val); if (!current_tk->ext_verifier) { error = "Out of memory"; goto err; } continue; } if (strcmp(key, "weights") == 0) { if (parse_weights(val, current_tk->weight) < 0) goto out; continue; } error = "Unknown keyword"; goto err; } if ((booth_conf->site_count % 2) == 0) { log_warn("Odd number of nodes is strongly recommended!"); } /* Default: make config name match config filename. */ if (!booth_conf->name[0]) { cp = strrchr(path, '/'); cp = cp ? cp+1 : (char *)path; cp2 = strrchr(cp, '.'); if (!cp2) cp2 = cp + strlen(cp); if (cp2-cp >= BOOTH_NAME_LEN) { log_error("booth config file name too long"); goto err; } strncpy(booth_conf->name, cp, cp2-cp); *(booth_conf->name+(cp2-cp)) = '\0'; } if (!postproc_ticket(current_tk)) { goto out; } poll_timeout = min(POLL_TIMEOUT, min_timeout/10); return 0; err: out: log_error("%s in config file line %d", error, lineno); free(booth_conf); booth_conf = NULL; return -1; } int check_config(int type) { struct passwd *pw; struct group *gr; char *cp, *input; if (!booth_conf) return -1; input = (type == ARBITRATOR) ? booth_conf->arb_user : booth_conf->site_user; if (!*input) goto u_inval; if (isdigit(input[0])) { booth_conf->uid = strtol(input, &cp, 0); if (*cp != 0) { u_inval: log_error("User \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { pw = getpwnam(input); if (!pw) goto u_inval; booth_conf->uid = pw->pw_uid; } input = (type == ARBITRATOR) ? booth_conf->arb_group : booth_conf->site_group; if (!*input) goto g_inval; if (isdigit(input[0])) { booth_conf->gid = strtol(input, &cp, 0); if (*cp != 0) { g_inval: log_error("Group \"%s\" cannot be resolved into a UID.", input); return ENOENT; } } else { gr = getgrnam(input); if (!gr) goto g_inval; booth_conf->gid = gr->gr_gid; } /* TODO: check whether uid or gid is 0 again? * The admin may shoot himself in the foot, though. */ return 0; } int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type) { struct booth_site *n; int i; if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if ((n->type == SITE || any_type) && strcmp(n->addr_string, site) == 0) { *node = n; return 1; } } return 0; } int find_site_by_id(uint32_t site_id, struct booth_site **node) { struct booth_site *n; int i; if (site_id == NO_ONE) { *node = no_leader; return 1; } if (!booth_conf) return 0; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (n->site_id == site_id) { *node = n; return 1; } } return 0; } const char *type_to_string(int type) { switch (type) { case ARBITRATOR: return "arbitrator"; case SITE: return "site"; case CLIENT: return "client"; } return "??invalid-type??"; } diff --git a/src/config.h b/src/config.h index 7c121b4..0303e2a 100644 --- a/src/config.h +++ b/src/config.h @@ -1,247 +1,255 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef _CONFIG_H #define _CONFIG_H #include +#include #include "booth.h" #include "timer.h" #include "raft.h" #include "transport.h" /** @{ */ /** Definitions for in-RAM data. */ #define MAX_NODES 16 #define TICKET_ALLOC 16 struct ticket_config { /** \name Configuration items. * @{ */ /** Name of ticket. */ boothc_ticket name; /** How long a term lasts if not refreshed (in ms) */ int term_duration; /** Network related timeouts (in ms) */ int timeout; /** Retries before giving up. */ int retries; /** If >0, time to wait for a site to get fenced. * The ticket may be acquired after that timespan by * another site. */ int acquire_after; /* TODO: needed? */ /* How often to renew the ticket (in ms) */ int renewal_freq; /* Program to ask whether it makes sense to * acquire the ticket */ char *ext_verifier; /** Node weights. */ int weight[MAX_NODES]; /** @} */ /** \name Runtime values. * @{ */ /** Current state. */ server_state_e state; /** Next state. Used at startup. */ server_state_e next_state; /** When something has to be done */ timetype next_cron; /** Current leader. This is effectively the log[] in Raft. */ struct booth_site *leader; /** Leader that got lost. */ struct booth_site *lost_leader; /** Is the ticket granted? */ int is_granted; /** Timestamp of leadership expiration */ timetype term_expires; /** End of election period */ timetype election_end; struct booth_site *voted_for; /** Who the various sites vote for. * NO_OWNER = no vote yet. */ struct booth_site *votes_for[MAX_NODES]; /* bitmap */ uint64_t votes_received; /** Last voting round that was seen. */ uint32_t current_term; /** Do ticket updates whenever we get enough heartbeats. * But do that only once. * This is reset to 0 whenever we broadcast heartbeat and set * to 1 once enough acks are received. * Increased to 2 when the ticket is commited to the CIB (see * delay_commit). */ uint32_t ticket_updated; /** Outcome of whatever ticket request was processed. * Can also be an intermediate stage. */ uint32_t outcome; /** @} */ /** */ uint32_t last_applied; uint32_t next_index[MAX_NODES]; uint32_t match_index[MAX_NODES]; /* Why did we start the elections? */ cmd_reason_t election_reason; /* if it is potentially dangerous to grant the ticket * immediately, then this is set to some point in time, * usually (now + term_duration + acquire_after) */ timetype delay_commit; /* the last request RPC we sent */ uint32_t last_request; /* if we expect some acks, then set this to the id of * the RPC which others will send us; it is cleared once all * replies were received */ uint32_t acks_expected; /* bitmask of servers which sent acks */ uint64_t acks_received; /* timestamp of the request */ timetype req_sent_at; /* we need to wait for MY_INDEX from other servers, * hold the ticket processing for a while until they reply */ int start_postpone; /** Last renewal time */ timetype last_renewal; /* Do we need to update the copy in the CIB? * Normally, the ticket is written only when it changes via * the UPDATE RPC (for followers) and on expiration update * (for leaders) */ int update_cib; /* Is this ticket in election? */ int in_election; /* don't log warnings unnecessarily */ int expect_more_rejects; /** \name Needed while proposals are being done. * @{ */ /* Need to keep the previous valid ticket in case we moved to * start new elections and another server asks for the ticket * status. It would be wrong to send our candidate ticket. */ struct ticket_config *last_valid_tk; /** Whom to vote for the next time. * Needed to push a ticket to someone else. */ #if 0 /** Bitmap of sites that acknowledge that state. */ uint64_t proposal_acknowledges; /** When an incompletely acknowledged proposal gets done. * If all peers agree, that happens sooner. * See switch_state_to(). */ struct timeval proposal_switch; /** Timestamp of proposal expiration. */ time_t proposal_expires; #endif /** Number of send retries left. * Used on the new owner. * Starts at 0, counts up. */ int retry_number; /** @} */ }; struct booth_config { char name[BOOTH_NAME_LEN]; + /** File containing the authentication file. */ + char authfile[BOOTH_PATH_LEN]; + struct stat authstat; + unsigned char authkey[BOOTH_MAX_KEY_LEN]; + int authkey_len; + transport_layer_t proto; uint16_t port; /** Stores the OR of sites bitmasks. */ uint64_t sites_bits; /** Stores the OR of all members' bitmasks. */ uint64_t all_bits; char site_user[BOOTH_NAME_LEN]; char site_group[BOOTH_NAME_LEN]; char arb_user[BOOTH_NAME_LEN]; char arb_group[BOOTH_NAME_LEN]; uid_t uid; gid_t gid; int site_count; struct booth_site site[MAX_NODES]; int ticket_count; int ticket_allocated; struct ticket_config *ticket; }; - extern struct booth_config *booth_conf; +#define is_auth_req() (booth_conf->authkey[0] != '\0') + int read_config(const char *path, int type); int check_config(int type); int find_site_by_name(unsigned char *site, struct booth_site **node, int any_type); int find_site_by_id(uint32_t site_id, struct booth_site **node); const char *type_to_string(int type); #endif /* _CONFIG_H */ diff --git a/src/inline-fn.h b/src/inline-fn.h index 84ad666..5521e28 100644 --- a/src/inline-fn.h +++ b/src/inline-fn.h @@ -1,285 +1,285 @@ /* * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef _INLINE_FN_H #define _INLINE_FN_H #include #include #include #include #include "timer.h" #include "config.h" #include "transport.h" inline static uint32_t get_local_id(void) { return local ? local->site_id : -1; } inline static uint32_t get_node_id(struct booth_site *node) { return node ? node->site_id : 0; } /** Returns number of seconds left, if any. */ inline static int term_time_left(struct ticket_config *tk) { int left = 0; if (is_time_set(&tk->term_expires)) { left = time_left(&tk->term_expires); } return (left < 0) ? 0 : left; } inline static int leader_and_valid(struct ticket_config *tk) { if (tk->leader != local) return 0; return term_time_left(tk); } /** Is this some leader? */ inline static int is_owned(const struct ticket_config *tk) { return (tk->leader && tk->leader != no_leader); } inline static int is_resend(struct ticket_config *tk) { timetype now; get_time(&now); return time_sub_int(&now, &tk->req_sent_at) >= tk->timeout; } static inline void init_header_bare(struct boothc_header *h) { assert(local && local->site_id); h->magic = htonl(BOOTHC_MAGIC); h->version = htonl(BOOTHC_VERSION); h->from = htonl(local->site_id); h->iv = htonl(0); h->auth1 = htonl(0); h->auth2 = htonl(0); } +/* get the _real_ message length out of the header + */ +#define sendmsglen(msg) ntohl((msg)->header.length) + static inline void init_header(struct boothc_header *h, int cmd, int request, int options, int result, int reason, int data_len) { init_header_bare(h); - h->length = htonl(data_len); + h->length = htonl(data_len - + (is_auth_req() ? 0 : sizeof(struct hmac))); h->cmd = htonl(cmd); h->request = htonl(request); h->options = htonl(options); h->result = htonl(result); h->reason = htonl(reason); } -static inline void init_ticket_site_header(struct boothc_ticket_msg *msg, int cmd) -{ - init_header(&msg->header, cmd, 0, 0, 0, 0, sizeof(*msg)); -} - #define my_last_term(tk) \ (((tk)->state == ST_CANDIDATE && (tk)->last_valid_tk->current_term) ? \ (tk)->last_valid_tk->current_term : (tk)->current_term) extern int TIME_RES, TIME_MULT; #define msg_term_time(msg) \ ntohl((msg)->ticket.term_valid_for)*TIME_RES/TIME_MULT #define set_msg_term_time(msg, tk) \ (msg)->ticket.term_valid_for = htonl(term_time_left(tk)*TIME_MULT/TIME_RES) static inline void init_ticket_msg(struct boothc_ticket_msg *msg, int cmd, int request, int rv, int reason, struct ticket_config *tk) { assert(sizeof(msg->ticket.id) == sizeof(tk->name)); init_header(&msg->header, cmd, request, 0, rv, reason, sizeof(*msg)); if (!tk) { memset(&msg->ticket, 0, sizeof(msg->ticket)); } else { memcpy(msg->ticket.id, tk->name, sizeof(msg->ticket.id)); msg->ticket.leader = htonl(get_node_id( (tk->leader && tk->leader != no_leader) ? tk->leader : (tk->voted_for ? tk->voted_for : no_leader))); msg->ticket.term = htonl(tk->current_term); set_msg_term_time(msg, tk); } } static inline struct booth_transport const *transport(void) { return booth_transport + booth_conf->proto; } static inline const char *site_string(struct booth_site *site) { return site ? site->addr_string : "NONE"; } static inline const char *ticket_leader_string(struct ticket_config *tk) { return site_string(tk->leader); } /* We allow half of the uint32_t to be used; * half of that below, half of that above the current known "good" value. * 0 UINT32_MAX * |--------------------------+----------------+------------| * | | | * |--------+-------| allowed range * | * current commit index * * So, on overflow it looks like that: * UINT32_MAX 0 * |--------------------------+-----------||---+------------| * | | | * |--------+-------| allowed range * | * current commit index * * This should be possible by using the same datatype and relying * on the under/overflow semantics. * * * Having 30 bits available, and assuming an expire time of * one minute and a (high) commit index step of 64 == 2^6 (because * of weights), we get 2^24 minutes of range - which is ~750 * years. "Should be enough for everybody." */ static inline int index_is_higher_than(uint32_t c_high, uint32_t c_low) { uint32_t diff; if (c_high == c_low) return 0; diff = c_high - c_low; if (diff < UINT32_MAX/4) return 1; diff = c_low - c_high; if (diff < UINT32_MAX/4) return 0; assert(!"commit index out of range - invalid"); } static inline uint32_t index_max2(uint32_t a, uint32_t b) { return index_is_higher_than(a, b) ? a : b; } static inline uint32_t index_max3(uint32_t a, uint32_t b, uint32_t c) { return index_max2( index_max2(a, b), c); } /* only invoked when ticket leader */ static inline void get_next_election_time(struct ticket_config *tk, timetype *next) { assert(tk->leader == local); /* if last_renewal is not set, which is unusual, it may mean * that the ticket never got updated, i.e. nobody acked * ticket updates (say, due to a temporary connection * problem) * we may try a bit later again */ if (!is_time_set(&tk->last_renewal)) { time_reset(next); } else { interval_add(&tk->last_renewal, tk->renewal_freq, next); } /* if delay_commit is earlier than next, then set next to * delay_commit */ if (is_time_set(&tk->delay_commit) && time_cmp(next, &tk->delay_commit, >)) { copy_time(&tk->delay_commit, next); } } static inline void expect_replies(struct ticket_config *tk, int reply_type) { tk->retry_number = 0; tk->acks_expected = reply_type; tk->acks_received = local->bitmask; get_time(&tk->req_sent_at); } static inline void no_resends(struct ticket_config *tk) { tk->retry_number = 0; tk->acks_expected = 0; } static inline struct booth_site *my_vote(struct ticket_config *tk) { return tk->votes_for[ local->index ]; } static inline int count_bits(uint64_t val) { return __builtin_popcount(val); } static inline int majority_of_bits(struct ticket_config *tk, uint64_t val) { /* Use ">" to get majority decision, even for an even number * of participants. */ return count_bits(val) * 2 > booth_conf->site_count; } static inline int all_replied(struct ticket_config *tk) { return !(tk->acks_received ^ booth_conf->all_bits); } static inline int all_sites_replied(struct ticket_config *tk) { return !((tk->acks_received & booth_conf->sites_bits) ^ booth_conf->sites_bits); } #endif diff --git a/src/main.c b/src/main.c index 605eb96..0ef48be 100644 --- a/src/main.c +++ b/src/main.c @@ -1,1453 +1,1528 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include +#include #include #include #include #include #include #include #include #include #include "b_config.h" #include "log.h" #include "booth.h" #include "config.h" #include "transport.h" #include "inline-fn.h" #include "pacemaker.h" #include "ticket.h" #define RELEASE_VERSION "0.2.0" #define RELEASE_STR RELEASE_VERSION " (build " BOOTH_BUILD_VERSION ")" #define CLIENT_NALLOC 32 int daemonize = 0; int enable_stderr = 0; timetype start_time; /** Structure for "clients". * Filehandles with incoming data get registered here (and in pollfds), * along with their callbacks. * Because these can be reallocated with every new fd, addressing * happens _only_ by their numeric index. */ struct client *clients = NULL; struct pollfd *pollfds = NULL; static int client_maxi; static int client_size = 0; static const struct booth_site _no_leader = { .addr_string = "none", .site_id = NO_ONE, }; struct booth_site *no_leader = (struct booth_site*)& _no_leader; typedef enum { BOOTHD_STARTED=0, BOOTHD_STARTING } BOOTH_DAEMON_STATE; int poll_timeout; struct booth_config *booth_conf; struct command_line cl; int do_read(int fd, void *buf, size_t count) { int rv, off = 0; while (off < count) { rv = read(fd, (char *)buf + off, count - off); if (rv == 0) return -1; if (rv == -1 && errno == EINTR) continue; if (rv == -1) return -1; off += rv; } return 0; } int do_write(int fd, void *buf, size_t count) { int rv, off = 0; retry: rv = write(fd, (char *)buf + off, count); if (rv == -1 && errno == EINTR) goto retry; /* If we cannot write _any_ data, we'd be in an (potential) loop. */ if (rv <= 0) { log_error("write failed: %s (%d)", strerror(errno), errno); return rv; } if (rv != count) { count -= rv; off += rv; goto retry; } return 0; } static void client_alloc(void) { int i; if (!clients) { clients = malloc(CLIENT_NALLOC * sizeof(struct client)); pollfds = malloc(CLIENT_NALLOC * sizeof(struct pollfd)); } else { clients = realloc(clients, (client_size + CLIENT_NALLOC) * sizeof(struct client)); pollfds = realloc(pollfds, (client_size + CLIENT_NALLOC) * sizeof(struct pollfd)); } if (!clients || !pollfds) { log_error("can't alloc for client array"); exit(1); } for (i = client_size; i < client_size + CLIENT_NALLOC; i++) { clients[i].workfn = NULL; clients[i].deadfn = NULL; clients[i].fd = -1; pollfds[i].fd = -1; pollfds[i].revents = 0; } client_size += CLIENT_NALLOC; } static void client_dead(int ci) { if (clients[ci].fd != -1) close(clients[ci].fd); clients[ci].fd = -1; clients[ci].workfn = NULL; pollfds[ci].fd = -1; } int client_add(int fd, const struct booth_transport *tpt, void (*workfn)(int ci), void (*deadfn)(int ci)) { int i; struct client *c; if (client_size + 2 >= client_maxi ) { client_alloc(); } for (i = 0; i < client_size; i++) { c = clients + i; if (c->fd != -1) continue; c->workfn = workfn; if (deadfn) c->deadfn = deadfn; else c->deadfn = client_dead; c->transport = tpt; c->fd = fd; pollfds[i].fd = fd; pollfds[i].events = POLLIN; if (i > client_maxi) client_maxi = i; return i; } assert(!"no client"); } int find_client_by_fd(int fd) { int i; for (i = 0; i < client_size; i++) { if (clients[i].fd == fd) return i; } return -1; } -/* Only used for client requests, TCP ???*/ +/* Only used for client requests (tcp) */ void process_connection(int ci) { struct boothc_ticket_msg *msg; + struct boothc_hdr_msg err_reply; int rv, len, expr, fd; void (*deadfn) (int ci); - msg = calloc(sizeof(struct boothc_ticket_msg), 1); if (!msg) { rv = -ENOMEM; log_error("out of memory for client messages"); goto kill; } fd = clients[ci].fd; rv = do_read(fd, &msg->header, sizeof(msg->header)); if (rv < 0) { if (errno == ECONNRESET) log_debug("client %d connection reset for fd %d", ci, clients[ci].fd); goto kill; } if (check_boothc_header(&msg->header, -1) < 0) goto kill; /* Basic sanity checks already done. */ len = ntohl(msg->header.length); if (len) { - if (len != sizeof(struct boothc_ticket_msg)) { - log_error("got message of wrong length %u from client", len); - return; - } expr = len - sizeof(msg->header); rv = do_read(clients[ci].fd, msg->header.data, expr); if (rv < 0) { log_error("connection %d read data error %d, wanted %d", ci, rv, expr); goto kill; } } + if (check_auth(NULL, msg, sizeof(*msg))) { + log_error("client connection %d failed to authenticate", ci); + goto kill; + } /* For CMD_GRANT and CMD_REVOKE: * Don't close connection immediately, but send * result a second later? */ switch (ntohl(msg->header.cmd)) { case CMD_LIST: ticket_answer_list(fd, msg); goto kill; case CMD_GRANT: case CMD_REVOKE: if (process_client_request(&clients[ci], msg) == 1) goto kill; /* request processed definitely, close connection */ else return; default: log_error("connection %d cmd %x unknown", ci, ntohl(msg->header.cmd)); - init_header(&msg->header, CL_RESULT, 0, 0, RLT_INVALID_ARG, 0, sizeof(msg->header)); - send_header_only(fd, &msg->header); + init_header(&err_reply.header, CL_RESULT, 0, 0, RLT_INVALID_ARG, 0, sizeof(err_reply)); + send_client_msg(fd, &err_reply); goto kill; } assert(0); return; kill: deadfn = clients[ci].deadfn; if(deadfn) { deadfn(ci); } if (msg) { free(msg); } return; } /** Callback function for the listening TCP socket. */ static void process_listener(int ci) { int fd, i; fd = accept(clients[ci].fd, NULL, NULL); if (fd < 0) { log_error("process_listener: accept error for fd %d: %s (%d)", clients[ci].fd, strerror(errno), errno); if (clients[ci].deadfn) clients[ci].deadfn(ci); return; } i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("add client connection %d fd %d", i, fd); } +/* trim trailing spaces if the key is ascii + */ +static void trim_key() +{ + unsigned char *p; + int i; + + for (i=0, p=booth_conf->authkey; i < booth_conf->authkey_len; i++, p++) + if (!isascii(*p)) + return; + + p = booth_conf->authkey; + while (booth_conf->authkey_len > 0 && isspace(*p)) { + p++; + booth_conf->authkey_len--; + } + memmove(booth_conf->authkey, p, booth_conf->authkey_len); + + p = booth_conf->authkey + booth_conf->authkey_len - 1; + while (booth_conf->authkey_len > 0 && isspace(*p)) { + booth_conf->authkey_len--; + p--; + } +} + +static int read_authkey() +{ + int fd; + + booth_conf->authkey[0] = '\0'; + if (stat(booth_conf->authfile, &booth_conf->authstat) < 0) { + log_error("cannot stat authentication file %s: %s", + booth_conf->authfile, strerror(errno)); + return -1; + } + if (booth_conf->authstat.st_mode & (S_IRGRP | S_IROTH)) { + log_error("%s: file can be readable only for the owner", booth_conf->authfile); + return -1; + } + fd = open(booth_conf->authfile, O_RDONLY); + if (fd < 0) { + log_error("cannot open %s: %s", + booth_conf->authfile, strerror(errno)); + return -1; + } + booth_conf->authkey_len = read(fd, booth_conf->authkey, BOOTH_MAX_KEY_LEN); + close(fd); + trim_key(); + log_debug("read key of size %d in authfile %s", + booth_conf->authkey_len, booth_conf->authfile); + /* make sure that the key is of minimum length */ + return (booth_conf->authkey_len >= BOOTH_MIN_KEY_LEN) ? 0 : -1; +} + +int update_authkey() +{ + struct stat statbuf; + + if (stat(booth_conf->authfile, &statbuf) < 0) { + log_error("cannot stat authentication file %s: %s", + booth_conf->authfile, strerror(errno)); + return -1; + } + if (statbuf.st_mtime > booth_conf->authstat.st_mtime) { + return read_authkey(); + } + return 0; +} + static int setup_config(int type) { int rv; rv = read_config(cl.configfile, type); if (rv < 0) goto out; + if (booth_conf->authfile[0] != '\0') { + rv = read_authkey(); + if (rv < 0) + goto out; + } /* Set "local" pointer, ignoring errors. */ if (cl.type == DAEMON && cl.site[0]) { if (!find_site_by_name(cl.site, &local, 1)) { log_error("Cannot find \"%s\" in the configuration.", cl.site); return -EINVAL; } local->local = 1; } else find_myself(NULL, type == CLIENT); rv = check_config(type); if (rv < 0) goto out; /* Per default the PID file name is derived from the * configuration name. */ if (!cl.lockfile[0]) { snprintf(cl.lockfile, sizeof(cl.lockfile)-1, "%s/%s.pid", BOOTH_RUN_DIR, booth_conf->name); } out: return rv; } static int setup_transport(void) { int rv; rv = transport()->init(message_recv); if (rv < 0) { log_error("failed to init booth_transport %s", transport()->name); goto out; } rv = booth_transport[TCP].init(NULL); if (rv < 0) { log_error("failed to init booth_transport[TCP]"); goto out; } out: return rv; } static int write_daemon_state(int fd, int state) { char buffer[1024]; int rv, size; size = sizeof(buffer) - 1; rv = snprintf(buffer, size, "booth_pid=%d " "booth_state=%s " "booth_type=%s " "booth_cfg_name='%s' " "booth_id=%d " "booth_addr_string='%s' " "booth_port=%d\n", getpid(), ( state == BOOTHD_STARTED ? "started" : state == BOOTHD_STARTING ? "starting" : "invalid"), type_to_string(local->type), booth_conf->name, local->site_id, local->addr_string, booth_conf->port); if (rv < 0 || rv == size) { log_error("Buffer filled up in write_daemon_state()."); return -1; } size = rv; rv = ftruncate(fd, 0); if (rv < 0) { log_error("lockfile %s truncate error %d: %s", cl.lockfile, errno, strerror(errno)); return rv; } rv = lseek(fd, 0, SEEK_SET); if (rv < 0) { log_error("lseek set fd(%d) offset to 0 error, return(%d), message(%s)", fd, rv, strerror(errno)); rv = -1; return rv; } rv = write(fd, buffer, size); if (rv != size) { log_error("write to fd(%d, %d) returned %d, errno %d, message(%s)", fd, size, rv, errno, strerror(errno)); return -1; } return 0; } static int loop(int fd) { void (*workfn) (int ci); void (*deadfn) (int ci); int rv, i; rv = setup_transport(); if (rv < 0) goto fail; rv = setup_ticket(); if (rv < 0) goto fail; client_add(local->tcp_fd, booth_transport + TCP, process_listener, NULL); rv = write_daemon_state(fd, BOOTHD_STARTED); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTED, cl.lockfile, strerror(errno)); goto fail; } log_info("BOOTH %s daemon started, node id is 0x%08X (%d).", type_to_string(local->type), local->site_id, local->site_id); while (1) { rv = poll(pollfds, client_maxi + 1, poll_timeout); if (rv == -1 && errno == EINTR) continue; if (rv < 0) { log_error("poll failed: %s (%d)", strerror(errno), errno); goto fail; } for (i = 0; i <= client_maxi; i++) { if (clients[i].fd < 0) continue; if (pollfds[i].revents & POLLIN) { workfn = clients[i].workfn; if (workfn) workfn(i); } if (pollfds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { deadfn = clients[i].deadfn; if (deadfn) deadfn(i); } } process_tickets(); } return 0; fail: return -1; } static int query_get_string_answer(cmd_request_t cmd) { struct booth_site *site; - struct boothc_header reply; + struct boothc_hdr_msg reply; char *data; int data_len; int rv; struct booth_transport const *tpt; data = NULL; init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg)); if (!*cl.site) site = local; else if (!find_site_by_name(cl.site, &site, 1)) { log_error("cannot find site \"%s\"", cl.site); rv = ENOENT; goto out; } tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_free; - rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); + rv = tpt->send(site, &cl.msg, sendmsglen(&cl.msg)); if (rv < 0) goto out_free; - rv = tpt->recv(site, &reply, sizeof(reply)); + rv = tpt->recv_auth(site, &reply, sizeof(reply)); if (rv < 0) goto out_free; - data_len = ntohl(reply.length) - sizeof(reply); + data_len = ntohl(reply.header.length) - rv; + /* no tickets? */ if (!data_len) { rv = 0; goto out_close; } data = malloc(data_len); if (!data) { rv = -ENOMEM; goto out_free; } rv = tpt->recv(site, data, data_len); if (rv < 0) goto out_free; do_write(STDOUT_FILENO, data, data_len); rv = 0; out_free: free(data); out_close: tpt->close(site); out: return rv; } static int test_reply(int reply_code, cmd_request_t cmd) { int rv = 0; const char *op_str = ""; if (cmd == CMD_GRANT) op_str = "grant"; else if (cmd == CMD_REVOKE) op_str = "revoke"; else { log_error("internal error reading reply result!"); return -1; } switch (reply_code) { case RLT_OVERGRANT: log_info("You're granting a granted ticket. " "If you wanted to migrate a ticket, " "use revoke first, then use grant."); rv = -1; break; case RLT_TICKET_IDLE: log_info("ticket is not owned"); rv = 0; break; case RLT_ASYNC: log_info("%s command sent, result will be returned " "asynchronously. Please use \"booth list\" to " "see the outcome.", op_str); rv = 0; break; case RLT_CIB_PENDING: log_info("%s succeeded (CIB commit pending)", op_str); /* wait for the CIB commit? */ rv = (cl.options & OPT_WAIT_COMMIT) ? 3 : 0; break; case RLT_MORE: rv = 2; break; case RLT_SYNC_SUCC: case RLT_SUCCESS: log_info("%s succeeded!", op_str); rv = 0; break; case RLT_SYNC_FAIL: log_info("%s failed!", op_str); rv = -1; break; case RLT_INVALID_ARG: log_error("ticket \"%s\" does not exist", cl.msg.ticket.id); break; case RLT_EXT_FAILED: log_error("before-acquire-handler for ticket \"%s\" failed, grant denied", cl.msg.ticket.id); break; case RLT_REDIRECT: /* talk to another site */ rv = 1; break; default: log_error("got an error code: %x", rv); rv = -1; } return rv; } static int do_command(cmd_request_t cmd) { struct booth_site *site; struct boothc_ticket_msg reply; struct booth_transport const *tpt; uint32_t leader_id; int rv; int reply_cnt = 0, msg_logged = 0; const char *op_str = ""; if (cmd == CMD_GRANT) op_str = "grant"; else if (cmd == CMD_REVOKE) op_str = "revoke"; rv = 0; site = NULL; if (!*cl.site) site = local; else { if (!find_site_by_name(cl.site, &site, 1)) { log_error("Site \"%s\" not configured.", cl.site); goto out_close; } } if (site->type == ARBITRATOR) { if (site == local) { log_error("We're just an arbitrator, cannot grant/revoke tickets here."); } else { log_error("%s is just an arbitrator, cannot grant/revoke tickets there.", cl.site); } goto out_close; } assert(site->type == SITE); /* We don't check for existence of ticket, so that asking can be * done without local configuration, too. * Although, that means that the UDP port has to be specified, too. */ if (!cl.msg.ticket.id[0]) { /* If the loaded configuration has only a single ticket defined, use that. */ if (booth_conf->ticket_count == 1) { strcpy(cl.msg.ticket.id, booth_conf->ticket[0].name); } else { log_error("No ticket given."); goto out_close; } } redirect: init_header(&cl.msg.header, cmd, 0, cl.options, 0, 0, sizeof(cl.msg)); /* Always use TCP for client - at least for now. */ tpt = booth_transport + TCP; rv = tpt->open(site); if (rv < 0) goto out_close; - rv = tpt->send(site, &cl.msg, sizeof(cl.msg)); + rv = tpt->send(site, &cl.msg, sendmsglen(&cl.msg)); if (rv < 0) goto out_close; read_more: - rv = tpt->recv(site, &reply, sizeof(reply)); + rv = tpt->recv_auth(site, &reply, sizeof(reply)); if (rv < 0) goto out_close; rv = test_reply(ntohl(reply.header.result), cmd); if (rv == 1) { local_transport->close(site); leader_id = ntohl(reply.ticket.leader); if (!find_site_by_id(leader_id, &site)) { log_error("Message with unknown redirect site %x received", leader_id); rv = -1; goto out_close; } goto redirect; } else if (rv == 2 || rv == 3) { /* the server has more to say */ /* don't wait too long */ if (reply_cnt > 1 && !(cl.options & OPT_WAIT)) { rv = 0; log_info("Giving up on waiting for the definite result. " "Please use \"booth list\" later to " "see the outcome."); goto out_close; } if (reply_cnt == 0) { log_info("%s request sent, " "waiting for the result ...", op_str); msg_logged++; } else if (rv == 3 && msg_logged < 2) { log_info("waiting for the CIB commit ..."); msg_logged++; } reply_cnt++; goto read_more; } out_close: if (site) local_transport->close(site); return rv; } static int do_grant(void) { return do_command(CMD_GRANT); } static int do_revoke(void) { return do_command(CMD_REVOKE); } static int _lockfile(int mode, int *fdp, pid_t *locked_by) { struct flock lock; int fd, rv; /* After reboot the directory may not yet exist. * Try to create it, but ignore errors. */ if (strncmp(cl.lockfile, BOOTH_RUN_DIR, strlen(BOOTH_RUN_DIR)) == 0) mkdir(BOOTH_RUN_DIR, 0775); if (locked_by) *locked_by = 0; *fdp = -1; fd = open(cl.lockfile, mode, 0664); if (fd < 0) return errno; *fdp = fd; lock.l_type = F_WRLCK; lock.l_start = 0; lock.l_whence = SEEK_SET; lock.l_len = 0; lock.l_pid = 0; if (fcntl(fd, F_SETLK, &lock) == 0) return 0; rv = errno; if (locked_by) if (fcntl(fd, F_GETLK, &lock) == 0) *locked_by = lock.l_pid; return rv; } static inline int is_root(void) { /* TODO: getuid()? Better way to check? */ return geteuid() == 0; } static int create_lockfile(void) { int rv, fd; fd = -1; rv = _lockfile(O_CREAT | O_WRONLY, &fd, NULL); if (fd == -1) { log_error("lockfile %s open error %d: %s", cl.lockfile, rv, strerror(rv)); return -1; } if (rv < 0) { log_error("lockfile %s setlk error %d: %s", cl.lockfile, rv, strerror(rv)); goto fail; } rv = write_daemon_state(fd, BOOTHD_STARTING); if (rv != 0) { log_error("write daemon state %d to lockfile error %s: %s", BOOTHD_STARTING, cl.lockfile, strerror(errno)); goto fail; } if (is_root()) { if (fchown(fd, booth_conf->uid, booth_conf->gid) < 0) log_error("fchown() on lockfile said %d: %s", errno, strerror(errno)); } return fd; fail: close(fd); return -1; } static void unlink_lockfile(int fd) { unlink(cl.lockfile); close(fd); } static void print_usage(void) { printf( "Usage:\n" " booth list [options]\n" " booth {grant|revoke} [options] \n" " booth status [options]\n" "\n" " list: List all tickets\n" " grant: Grant ticket to site\n" " revoke: Revoke ticket\n" "\n" "Options:\n" " -c FILE Specify config file [default " BOOTH_DEFAULT_CONF "]\n" " Can be a path or just a name without \".conf\" suffix\n" " -s Connect/grant to a different site\n" " -F Try to grant the ticket immediately\n" " even if not all sites are reachable\n" " -w Wait forever for the outcome of the request\n" " -C Wait until the ticket is committed to the CIB (grant only)\n" " -h Print this help\n" "\n" "Examples:\n" "\n" " # booth list (list tickets)\n" " # booth grant ticket-A (grant ticket here)\n" " # booth grant -s 10.121.8.183 ticket-A (grant ticket to site 10.121.8.183)\n" " # booth revoke ticket-A (revoke ticket)\n" "\n" "See the booth(8) man page for more details.\n" ); } #define OPTION_STRING "c:Dl:t:s:FhSwC" void safe_copy(char *dest, char *value, size_t buflen, const char *description) { int content_len = buflen - 1; if (strlen(value) >= content_len) { fprintf(stderr, "'%s' exceeds maximum %s length of %d\n", value, description, content_len); exit(EXIT_FAILURE); } strncpy(dest, value, content_len); dest[content_len] = 0; } static int host_convert(char *hostname, char *ip_str, size_t ip_size) { struct addrinfo *result = NULL, hints = {0}; int re = -1; memset(&hints, 0, sizeof(hints)); hints.ai_family = BOOTH_PROTO_FAMILY; hints.ai_socktype = SOCK_DGRAM; re = getaddrinfo(hostname, NULL, &hints, &result); if (re == 0) { struct in_addr addr = ((struct sockaddr_in *)result->ai_addr)->sin_addr; const char *re_ntop = inet_ntop(BOOTH_PROTO_FAMILY, &addr, ip_str, ip_size); if (re_ntop == NULL) { re = -1; } } freeaddrinfo(result); return re; } static int read_arguments(int argc, char **argv) { int optchar; char *arg1 = argv[1]; char *op = NULL; char *cp; char site_arg[INET_ADDRSTRLEN] = {0}; int left; if (argc < 2 || !strcmp(arg1, "help") || !strcmp(arg1, "--help") || !strcmp(arg1, "-h")) { print_usage(); exit(EXIT_SUCCESS); } if (!strcmp(arg1, "version") || !strcmp(arg1, "--version") || !strcmp(arg1, "-V")) { printf("%s %s\n", argv[0], RELEASE_STR); exit(EXIT_SUCCESS); } if (strcmp(arg1, "arbitrator") == 0 || strcmp(arg1, "site") == 0 || strcmp(arg1, "start") == 0 || strcmp(arg1, "daemon") == 0) { cl.type = DAEMON; optind = 2; } else if (strcmp(arg1, "status") == 0) { cl.type = STATUS; optind = 2; } else if (strcmp(arg1, "client") == 0) { cl.type = CLIENT; if (argc < 3) { print_usage(); exit(EXIT_FAILURE); } op = argv[2]; optind = 3; } else { cl.type = CLIENT; op = argv[1]; optind = 2; } if (cl.type == CLIENT) { if (!strcmp(op, "list")) cl.op = CMD_LIST; else if (!strcmp(op, "grant")) cl.op = CMD_GRANT; else if (!strcmp(op, "revoke")) cl.op = CMD_REVOKE; else { fprintf(stderr, "client operation \"%s\" is unknown\n", op); exit(EXIT_FAILURE); } } while (optind < argc) { optchar = getopt(argc, argv, OPTION_STRING); switch (optchar) { case 'c': if (strchr(optarg, '/')) { safe_copy(cl.configfile, optarg, sizeof(cl.configfile), "config file"); } else { /* If no "/" in there, use with default directory. */ strcpy(cl.configfile, BOOTH_DEFAULT_CONF_DIR); cp = cl.configfile + strlen(BOOTH_DEFAULT_CONF_DIR); assert(cp > cl.configfile); assert(*(cp-1) == '/'); /* Write at the \0, ie. after the "/" */ safe_copy(cp, optarg, (sizeof(cl.configfile) - (cp - cl.configfile) - strlen(BOOTH_DEFAULT_CONF_EXT)), "config name"); /* If no extension, append ".conf". * Space is available, see -strlen() above. */ if (!strchr(cp, '.')) strcat(cp, BOOTH_DEFAULT_CONF_EXT); } break; case 'D': debug_level++; enable_stderr = 1; /* Fall through */ case 'S': daemonize = 1; break; case 'l': safe_copy(cl.lockfile, optarg, sizeof(cl.lockfile), "lock file"); break; case 't': if (cl.op == CMD_GRANT || cl.op == CMD_REVOKE) { safe_copy(cl.msg.ticket.id, optarg, sizeof(cl.msg.ticket.id), "ticket name"); } else { print_usage(); exit(EXIT_FAILURE); } break; case 's': /* For testing and debugging: allow "-s site" also for * daemon start, so that the address that should be used * can be set manually. * This makes it easier to start multiple processes * on one machine. */ if (cl.type == CLIENT || (cl.type == DAEMON && debug_level)) { int re = host_convert(optarg, site_arg, INET_ADDRSTRLEN); if (re == 0) { safe_copy(cl.site, site_arg, sizeof(cl.site), "site name"); } else { safe_copy(cl.site, optarg, sizeof(cl.site), "site name"); } } else { log_error("\"-s\" not allowed in daemon mode."); exit(EXIT_FAILURE); } break; case 'F': if (cl.type != CLIENT || cl.op != CMD_GRANT) { log_error("use \"-F\" only for client grant"); exit(EXIT_FAILURE); } cl.options |= OPT_IMMEDIATE; break; case 'w': if (cl.type != CLIENT || (cl.op != CMD_GRANT && cl.op != CMD_REVOKE)) { log_error("use \"-w\" only for grant and revoke"); exit(EXIT_FAILURE); } cl.options |= OPT_WAIT; break; case 'C': if (cl.type != CLIENT || cl.op != CMD_GRANT) { log_error("use \"-C\" only for grant"); exit(EXIT_FAILURE); } cl.options |= OPT_WAIT | OPT_WAIT_COMMIT; break; case 'h': print_usage(); exit(EXIT_SUCCESS); break; case ':': case '?': fprintf(stderr, "Please use '-h' for usage.\n"); exit(EXIT_FAILURE); break; case -1: /* No more parameters on cmdline, only arguments. */ goto extra_args; default: goto unknown; }; } return 0; extra_args: if (cl.type == CLIENT && !cl.msg.ticket.id[0]) { /* Use additional argument as ticket name. */ safe_copy(cl.msg.ticket.id, argv[optind], sizeof(cl.msg.ticket.id), "ticket name"); optind++; } if (optind == argc) return 0; left = argc - optind; fprintf(stderr, "Superfluous argument%s: %s%s\n", left == 1 ? "" : "s", argv[optind], left == 1 ? "" : "..."); exit(EXIT_FAILURE); unknown: fprintf(stderr, "unknown option: %s\n", argv[optind]); exit(EXIT_FAILURE); } static void set_scheduler(void) { struct sched_param sched_param; struct rlimit rlimit; int rv; rlimit.rlim_cur = RLIM_INFINITY; rlimit.rlim_max = RLIM_INFINITY; setrlimit(RLIMIT_MEMLOCK, &rlimit); rv = mlockall(MCL_CURRENT | MCL_FUTURE); if (rv < 0) { log_error("mlockall failed"); } rv = sched_get_priority_max(SCHED_RR); if (rv != -1) { sched_param.sched_priority = rv; rv = sched_setscheduler(0, SCHED_RR, &sched_param); if (rv == -1) log_error("could not set SCHED_RR priority %d: %s (%d)", sched_param.sched_priority, strerror(errno), errno); } else { log_error("could not get maximum scheduler priority err %d", errno); } } static int set_procfs_val(const char *path, const char *val) { int rc = -1; FILE *fp = fopen(path, "w"); if (fp) { if (fprintf(fp, "%s", val) > 0) rc = 0; fclose(fp); } return rc; } static int do_status(int type) { pid_t pid; int rv, lock_fd, ret; const char *reason = NULL; char lockfile_data[1024], *cp; ret = PCMK_OCF_NOT_RUNNING; /* TODO: query all, and return quit only if it's _cleanly_ not * running, ie. _neither_ of port/lockfile/process is available? * * Currently a single failure says "not running", even if "only" the * lockfile has been removed. */ rv = setup_config(type); if (rv) { reason = "Error reading configuration."; ret = PCMK_OCF_UNKNOWN_ERROR; goto quit; } if (!local) { reason = "No Service IP active here."; goto quit; } rv = _lockfile(O_RDWR, &lock_fd, &pid); if (rv == 0) { reason = "PID file not locked."; goto quit; } if (lock_fd == -1) { reason = "No PID file."; goto quit; } if (pid) { fprintf(stdout, "booth_lockpid=%d ", pid); fflush(stdout); } rv = read(lock_fd, lockfile_data, sizeof(lockfile_data) - 1); if (rv < 4) { reason = "Cannot read lockfile data."; ret = PCMK_LSB_UNKNOWN_ERROR; goto quit; } lockfile_data[rv] = 0; if (lock_fd != -1) close(lock_fd); /* Make sure it's only a single line */ cp = strchr(lockfile_data, '\r'); if (cp) *cp = 0; cp = strchr(lockfile_data, '\n'); if (cp) *cp = 0; rv = setup_tcp_listener(1); if (rv == 0) { reason = "TCP port not in use."; goto quit; } fprintf(stdout, "booth_lockfile='%s' %s\n", cl.lockfile, lockfile_data); if (daemonize) fprintf(stderr, "Booth at %s port %d seems to be running.\n", local->addr_string, booth_conf->port); return 0; quit: log_debug("not running: %s", reason); /* Ie. "DEBUG" */ if (daemonize) fprintf(stderr, "not running: %s\n", reason); return ret; } static int limit_this_process(void) { int rv; if (!is_root()) return 0; if (setregid(booth_conf->gid, booth_conf->gid) < 0) { rv = errno; log_error("setregid() didn't work: %s", strerror(rv)); return rv; } if (setreuid(booth_conf->uid, booth_conf->uid) < 0) { rv = errno; log_error("setreuid() didn't work: %s", strerror(rv)); return rv; } /* TODO: ulimits? But that would restrict crm_ticket and handler * scripts, too! */ return 0; } static int lock_fd = -1; static void server_exit(void) { int rv; if (lock_fd >= 0) { /* We might not be able to delete it, but at least * make it empty. */ rv = ftruncate(lock_fd, 0); (void)rv; unlink_lockfile(lock_fd); } log_info("exiting"); } static void sig_exit_handler(int sig) { log_info("caught signal %d", sig); exit(0); } static int do_server(int type) { int rv = -1; static char log_ent[128] = DAEMON_NAME "-"; rv = setup_config(type); if (rv < 0) return rv; - if (!local) { log_error("Cannot find myself in the configuration."); exit(EXIT_FAILURE); } if (!daemonize) { if (daemon(0, 0) < 0) { perror("daemon error"); exit(EXIT_FAILURE); } } /* The lockfile must be written to _after_ the call to daemon(), so * that the lockfile contains the pid of the daemon, not the parent. */ lock_fd = create_lockfile(); if (lock_fd < 0) return lock_fd; atexit(server_exit); strcat(log_ent, type_to_string(local->type)); cl_log_set_entity(log_ent); cl_log_enable_stderr(enable_stderr ? TRUE : FALSE); cl_log_set_facility(HA_LOG_FACILITY); cl_inherit_logging_environment(0); log_info("BOOTH %s %s daemon is starting", type_to_string(local->type), RELEASE_STR); signal(SIGUSR1, (__sighandler_t)tickets_log_info); signal(SIGTERM, (__sighandler_t)sig_exit_handler); signal(SIGINT, (__sighandler_t)sig_exit_handler); set_scheduler(); /* we don't want to be killed by the OOM-killer */ if (set_procfs_val("/proc/self/oom_score_adj", "-999")) (void)set_procfs_val("/proc/self/oom_adj", "-16"); set_proc_title("%s %s %s for [%s]:%d", DAEMON_NAME, cl.configfile, type_to_string(local->type), local->addr_string, booth_conf->port); rv = limit_this_process(); if (rv) return rv; if (cl_enable_coredumps(TRUE) < 0){ cl_log(LOG_ERR, "enabling core dump failed"); } cl_cdtocoredir(); prctl(PR_SET_DUMPABLE, (unsigned long)TRUE, 0UL, 0UL, 0UL); rv = loop(lock_fd); return rv; } static int do_client(void) { int rv = -1; rv = setup_config(CLIENT); if (rv < 0) { log_error("cannot read config"); goto out; } switch (cl.op) { case CMD_LIST: rv = query_get_string_answer(CMD_LIST); break; case CMD_GRANT: rv = do_grant(); break; case CMD_REVOKE: rv = do_revoke(); break; } out: return rv; } int main(int argc, char *argv[], char *envp[]) { int rv; init_set_proc_title(argc, argv, envp); get_time(&start_time); memset(&cl, 0, sizeof(cl)); strncpy(cl.configfile, BOOTH_DEFAULT_CONF, BOOTH_PATH_LEN - 1); cl.lockfile[0] = 0; debug_level = 0; cl_log_set_entity("booth"); cl_log_enable_stderr(TRUE); cl_log_set_facility(0); rv = read_arguments(argc, argv); if (rv < 0) goto out; switch (cl.type) { case STATUS: rv = do_status(cl.type); break; case ARBITRATOR: case DAEMON: case SITE: rv = do_server(cl.type); break; case CLIENT: rv = do_client(); break; } out: /* Normalize values. 0x100 would be seen as "OK" by waitpid(). */ return (rv >= 0 && rv < 0x70) ? rv : 1; } diff --git a/src/raft.c b/src/raft.c index 742195d..e064922 100644 --- a/src/raft.c +++ b/src/raft.c @@ -1,992 +1,992 @@ /* * Copyright (C) 2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include "booth.h" #include "timer.h" #include "transport.h" #include "inline-fn.h" #include "config.h" #include "raft.h" #include "ticket.h" #include "request.h" #include "log.h" inline static void clear_election(struct ticket_config *tk) { int i; struct booth_site *site; tk_log_debug("clear election"); tk->votes_received = 0; foreach_node(i, site) tk->votes_for[site->index] = NULL; } inline static void record_vote(struct ticket_config *tk, struct booth_site *who, struct booth_site *vote) { tk_log_debug("site %s votes for %s", site_string(who), site_string(vote)); if (!tk->votes_for[who->index]) { tk->votes_for[who->index] = vote; tk->votes_received |= who->bitmask; } else { if (tk->votes_for[who->index] != vote) tk_log_warn("%s voted previously " "for %s and now wants to vote for %s (ignored)", site_string(who), site_string(tk->votes_for[who->index]), site_string(vote)); } } static void update_term_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { uint32_t i; i = ntohl(msg->ticket.term); /* if we failed to start the election, then accept the term * from the leader * */ if (tk->state == ST_CANDIDATE) { tk->current_term = i; } else { tk->current_term = max(i, tk->current_term); } } static void set_ticket_expiry(struct ticket_config *tk, int duration) { set_future_time(&tk->term_expires, duration); } static void update_ticket_from_msg(struct ticket_config *tk, struct booth_site *sender, struct boothc_ticket_msg *msg) { int duration; tk_log_debug("updating from %s (%d/%d)", site_string(sender), ntohl(msg->ticket.term), msg_term_time(msg)); duration = min(tk->term_duration, msg_term_time(msg)); set_ticket_expiry(tk, duration); update_term_from_msg(tk, msg); } static void copy_ticket_from_msg(struct ticket_config *tk, struct boothc_ticket_msg *msg) { set_ticket_expiry(tk, msg_term_time(msg)); tk->current_term = ntohl(msg->ticket.term); } static void become_follower(struct ticket_config *tk, struct boothc_ticket_msg *msg) { copy_ticket_from_msg(tk, msg); set_state(tk, ST_FOLLOWER); time_reset(&tk->delay_commit); tk->in_election = 0; /* if we're following and the ticket was granted here * then commit to CIB right away (we're probably restarting) */ if (tk->is_granted) { disown_ticket(tk); ticket_write(tk); } } static void won_elections(struct ticket_config *tk) { set_leader(tk, local); set_state(tk, ST_LEADER); set_ticket_expiry(tk, tk->term_duration); time_reset(&tk->election_end); tk->voted_for = NULL; if (is_time_set(&tk->delay_commit) && all_sites_replied(tk)) { time_reset(&tk->delay_commit); tk_log_debug("reset delay commit as all sites replied"); } save_committed_tkt(tk); ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0); tk->ticket_updated = 0; } /* if more than one member got the same (and maximum within that * election) number of votes, then that is a tie */ static int is_tie(struct ticket_config *tk) { int i; struct booth_site *v; int count[MAX_NODES] = { 0, }; int max_votes = 0, max_cnt = 0; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v) continue; count[v->index]++; max_votes = max(max_votes, count[v->index]); } for(i=0; isite_count; i++) { if (count[i] == max_votes) max_cnt++; } return max_cnt > 1; } static struct booth_site *majority_votes(struct ticket_config *tk) { int i, n; struct booth_site *v; int count[MAX_NODES] = { 0, }; for(i=0; isite_count; i++) { v = tk->votes_for[i]; if (!v || v == no_leader) continue; n = v->index; count[n]++; tk_log_debug("Majority: %d %s wants %d %s => %d", i, site_string(&booth_conf->site[i]), n, site_string(v), count[n]); if (count[n]*2 <= booth_conf->site_count) continue; tk_log_debug("Majority reached: %d of %d for %s", count[n], booth_conf->site_count, site_string(v)); return v; } return NULL; } void elections_end(struct ticket_config *tk) { struct booth_site *new_leader; if (is_past(&tk->election_end)) { /* This is previous election timed out */ tk_log_info("elections finished"); } tk->in_election = 0; new_leader = majority_votes(tk); if (new_leader == local) { tk_log_info("granted successfully here"); won_elections(tk); } else if (new_leader) { tk_log_info("ticket granted at %s", site_string(new_leader)); } else { tk_log_info("nobody won elections, new elections"); tk->outcome = RLT_MORE; foreach_tkt_req(tk, notify_client); if (!new_election(tk, NULL, is_tie(tk) ? 2 : 0, OR_AGAIN)) { ticket_activate_timeout(tk); } } } static int newer_term(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg, int in_election) { uint32_t term; /* it may happen that we hear about our newer term */ if (leader == local) return 0; term = ntohl(msg->ticket.term); /* §5.1 */ if (term > tk->current_term) { set_state(tk, ST_FOLLOWER); if (!in_election) { set_leader(tk, leader); tk_log_info("from %s: higher term %d vs. %d, following %s", site_string(sender), term, tk->current_term, ticket_leader_string(tk)); } else { tk_log_debug("from %s: higher term %d vs. %d (election)", site_string(sender), term, tk->current_term); } tk->current_term = term; return 1; } return 0; } static int msg_term_invalid(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (is_term_invalid(tk, term)) { tk_log_info("got invalid term from %s " "(%d vs. %d), ignoring", site_string(sender), term, tk->last_valid_tk->current_term); return 1; } return 0; } static int term_too_low(struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg) { uint32_t term; term = ntohl(msg->ticket.term); /* §5.1 */ if (term < tk->current_term) { tk_log_info("sending reject to %s, its term too low " "(%d vs. %d)", site_string(sender), term, tk->current_term ); send_reject(sender, tk, RLT_TERM_OUTDATED, msg); return 1; } return 0; } /* For follower. */ static int answer_HEARTBEAT ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; term = ntohl(msg->ticket.term); tk_log_debug("heartbeat from leader: %s, have %s; term %d vs %d", site_string(leader), ticket_leader_string(tk), term, tk->current_term); if (term < tk->current_term) { if (sender == tk->leader) { tk_log_info("trusting leader %s with a lower term (%d vs %d)", site_string(leader), term, tk->current_term); } else if (is_owned(tk)) { tk_log_warn("different leader %s with a lower term " "(%d vs %d), sending reject", site_string(leader), term, tk->current_term); return send_reject(sender, tk, RLT_TERM_OUTDATED, msg); } } /* got heartbeat, no rejects expected anymore */ tk->expect_more_rejects = 0; /* Needed? */ newer_term(tk, sender, leader, msg, 0); become_follower(tk, msg); /* Racy??? */ assert(sender == leader || !leader); set_leader(tk, leader); /* Ack the heartbeat (we comply). */ return send_msg(OP_ACK, tk, sender, msg); } static int process_UPDATE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (is_owned(tk) && sender != tk->leader) { tk_log_warn("different leader %s wants to update " "our ticket, sending reject", site_string(leader)); return send_reject(sender, tk, RLT_TERM_OUTDATED, msg); } tk_log_debug("leader %s wants to update our ticket", site_string(leader)); become_follower(tk, msg); set_leader(tk, leader); ticket_write(tk); /* run ticket_cron if the ticket expires */ set_ticket_wakeup(tk); return send_msg(OP_ACK, tk, sender, msg); } static int process_REVOKE ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int rv; if (tk->state == ST_INIT && tk->leader == no_leader) { /* assume that our ack got lost */ rv = send_msg(OP_ACK, tk, sender, msg); } else if (tk->leader != sender) { tk_log_error("%s wants to revoke ticket, " "but it is not granted there (ignoring)", site_string(sender)); return 1; } else if (tk->state != ST_FOLLOWER) { tk_log_error("unexpected ticket revoke from %s " "(in state %s) (ignoring)", site_string(sender), state_to_string(tk->state)); return 1; } else { tk_log_info("%s revokes ticket", site_string(tk->leader)); save_committed_tkt(tk); reset_ticket(tk); set_leader(tk, no_leader); ticket_write(tk); rv = send_msg(OP_ACK, tk, sender, msg); } return rv; } /* For leader. */ static int process_ACK( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t term; int req; term = ntohl(msg->ticket.term); if (newer_term(tk, sender, leader, msg, 0)) { /* unexpected higher term */ tk_log_warn("got higher term from %s (%d vs. %d)", site_string(sender), term, tk->current_term); return 0; } /* Don't send a reject. */ if (term < tk->current_term) { /* Doesn't know what he's talking about - perhaps * doesn't receive our packets? */ tk_log_warn("unexpected term " "from %s (%d vs. %d) (ignoring)", site_string(sender), term, tk->current_term); return 0; } /* if the ticket is to be revoked, further processing is not * interesting (and dangerous) */ if (tk->next_state == ST_INIT || tk->state == ST_INIT) return 0; req = ntohl(msg->header.request); if ((req == OP_UPDATE || req == OP_HEARTBEAT) && term == tk->current_term && leader == tk->leader) { if (majority_of_bits(tk, tk->acks_received)) { /* OK, at least half of the nodes are reachable; * Update the ticket and send update messages out */ return leader_update_ticket(tk); } } return 0; } static int process_VOTE_FOR( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { if (leader == no_leader) { /* leader wants to step down? */ if (sender == tk->leader && (tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) { tk_log_info("%s wants to give the ticket away (ticket release)", site_string(tk->leader)); save_committed_tkt(tk); reset_ticket(tk); set_state(tk, ST_FOLLOWER); if (local->type == SITE) { ticket_write(tk); schedule_election(tk, OR_STEPDOWN); } } else { tk_log_info("%s votes for none, ignoring (duplicate ticket release?)", site_string(sender)); } return 0; } if (tk->state != ST_CANDIDATE) { /* lost candidate status, somebody rejected our proposal */ tk_log_debug("candidate status lost, ignoring vote_for from %s", site_string(sender)); return 0; } if (term_too_low(tk, sender, leader, msg)) return 0; if (newer_term(tk, sender, leader, msg, 0)) { clear_election(tk); } record_vote(tk, sender, leader); /* only if all voted can we take the ticket now, otherwise * wait for timeout in ticket_cron */ if (!tk->acks_expected) { /* §5.2 */ elections_end(tk); } return 0; } static int process_REJECTED( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t rv; rv = ntohl(msg->header.result); if (tk->state == ST_CANDIDATE && leader == local) { /* the sender has us as the leader (!) * the elections will time out, then we can try again */ tk_log_warn("ticket was granted to us " "(and we didn't know)"); tk->expect_more_rejects = 1; return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_OUTDATED) { tk_log_warn("ticket outdated (term %d), granted to %s", ntohl(msg->ticket.term), site_string(leader) ); set_leader(tk, leader); tk->expect_more_rejects = 1; become_follower(tk, msg); return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_TERM_STILL_VALID) { if (tk->lost_leader == leader) { if (tk->election_reason == OR_TKT_LOST) { tk_log_warn("%s still has the ticket valid, " "we'll backup a bit", site_string(sender)); } else { tk_log_warn("%s unexpectedly rejects elections", site_string(sender)); } } else { tk_log_warn("ticket was granted to %s " "(and we didn't know)", site_string(leader)); } set_leader(tk, leader); become_follower(tk, msg); tk->expect_more_rejects = 1; return 0; } if (tk->state == ST_CANDIDATE && rv == RLT_YOU_OUTDATED) { set_leader(tk, leader); tk->expect_more_rejects = 1; if (leader && leader != no_leader) { tk_log_warn("our ticket is outdated, granted to %s", site_string(leader)); become_follower(tk, msg); } else { tk_log_warn("our ticket is outdated and revoked"); update_ticket_from_msg(tk, sender, msg); set_state(tk, ST_INIT); } return 0; } if (!tk->expect_more_rejects) { tk_log_warn("from %s: in state %s, got %s (unexpected reject)", site_string(sender), state_to_string(tk->state), state_to_string(rv)); } return 0; } static int ticket_seems_ok(struct ticket_config *tk) { int left; left = term_time_left(tk); if (!left) return 0; /* quite sure */ if (tk->state == ST_CANDIDATE) return 0; /* in state of flux */ if (tk->state == ST_LEADER) return 1; /* quite sure */ if (tk->state == ST_FOLLOWER && left >= tk->term_duration/3) return 1; /* almost quite sure */ return 0; } static int test_reason( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int reason; reason = ntohl(msg->header.reason); if (reason == OR_TKT_LOST) { if (tk->state == ST_INIT && tk->leader == no_leader) { tk_log_warn("%s claims that the ticket is lost, " "but it's in %s state (reject sent)", site_string(sender), state_to_string(tk->state) ); return RLT_YOU_OUTDATED; } if (ticket_seems_ok(tk)) { tk_log_warn("%s claims that the ticket is lost, " "but it is ok here (reject sent)", site_string(sender)); return RLT_TERM_STILL_VALID; } } return 0; } /* §5.2 */ static int answer_REQ_VOTE( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int valid; struct boothc_ticket_msg omsg; cmd_result_t inappr_reason; int reason; inappr_reason = test_reason(tk, sender, leader, msg); if (inappr_reason) return send_reject(sender, tk, inappr_reason, msg); valid = term_time_left(tk); reason = ntohl(msg->header.reason); /* valid tickets are not allowed only if the sender thinks * the ticket got lost */ if (sender != tk->leader && valid && reason != OR_STEPDOWN) { tk_log_warn("election from %s with reason %s rejected " "(we have %s as ticket owner), ticket still valid for %ds", site_string(sender), state_to_string(reason), site_string(tk->leader), valid); return send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); } if (term_too_low(tk, sender, leader, msg)) return 0; /* set this, so that we know not to send status for the * ticket */ tk->in_election = 1; /* reset ticket's leader on not valid tickets */ if (!valid) set_leader(tk, NULL); /* if it's a newer term or ... */ if (newer_term(tk, sender, leader, msg, 1)) { clear_election(tk); goto vote_for_sender; } /* ... we didn't vote yet, then vote for the sender */ /* §5.2, §5.4 */ if (!tk->voted_for) { vote_for_sender: tk->voted_for = sender; record_vote(tk, sender, leader); } init_ticket_msg(&omsg, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, 0, tk); omsg.ticket.leader = htonl(get_node_id(tk->voted_for)); - return booth_udp_send(sender, &omsg, sizeof(omsg)); + return booth_udp_send_auth(sender, &omsg, sendmsglen(&omsg)); } int new_election(struct ticket_config *tk, struct booth_site *preference, int update_term, cmd_reason_t reason) { struct booth_site *new_leader; if (local->type != SITE) return 0; /* elections were already started, but not yet finished/timed out */ if (is_time_set(&tk->election_end) && !is_past(&tk->election_end)) return 1; if (ANYDEBUG) { int tdiff; if (is_time_set(&tk->election_end)) { tdiff = -time_left(&tk->election_end); tk_log_debug("starting elections, previous finished since " intfmt(tdiff)); } else { tk_log_debug("starting elections"); } } /* §5.2 */ /* If there was _no_ answer, don't keep incrementing the term number * indefinitely. If there was no peer, there'll probably be no one * listening now either. However, we don't know if we were * invoked due to a timeout (caller does). */ /* increment the term only if either the current term was * valid or if there was a tie (in that case update_term > 1) */ if ((update_term > 1) || (update_term && tk->last_valid_tk->current_term && tk->last_valid_tk->current_term >= tk->current_term)) { /* save the previous term, we may need to send out the * MY_INDEX message */ if (tk->state != ST_CANDIDATE) { save_committed_tkt(tk); } tk->current_term++; } set_future_time(&tk->election_end, tk->timeout); tk->in_election = 1; tk_log_info("starting new election (term=%d)", tk->current_term); clear_election(tk); if(preference) new_leader = preference; else new_leader = (local->type == SITE) ? local : NULL; record_vote(tk, local, new_leader); tk->voted_for = new_leader; set_state(tk, ST_CANDIDATE); /* some callers may want just to repeat on timeout */ if (reason == OR_AGAIN) { reason = tk->election_reason; } else { tk->election_reason = reason; } ticket_broadcast(tk, OP_REQ_VOTE, OP_VOTE_FOR, RLT_SUCCESS, reason); add_random_delay(tk); return 0; } /* we were a leader and somebody says that they have a more up * to date ticket * there was probably connectivity loss * tricky */ static int leader_handle_newer_ticket( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { update_term_from_msg(tk, msg); if (leader != no_leader && leader && leader != local) { /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ tk_log_error("from %s: ticket granted to %s! (revoking locally)", site_string(sender), site_string(leader) ); } else if (term_time_left(tk)) { /* eek, two leaders, split brain */ /* normally shouldn't happen; run election */ tk_log_error("from %s: ticket granted to %s! (revoking locally)", site_string(sender), site_string(leader) ); } tk->next_state = ST_LEADER; return 0; } /* reply to STATUS */ static int process_MY_INDEX ( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int i; int expired; expired = !msg_term_time(msg); /* test against the last valid(!) ticket we have */ i = my_last_term(tk) - ntohl(msg->ticket.term); if (i > 0) { /* let them know about our newer ticket */ send_msg(OP_MY_INDEX, tk, sender, msg); if (tk->state == ST_LEADER) { tk_log_info("sending ticket update to %s", site_string(sender)); return send_msg(OP_UPDATE, tk, sender, msg); } } /* we have a newer or equal ticket and theirs is expired, * nothing more to do here */ if (i >= 0 && expired) { return 0; } if (tk->state == ST_LEADER) { /* we're the leader, thread carefully */ if (expired) { /* if their ticket is expired, * nothing more to do */ return 0; } if (i < 0) { /* they have a newer ticket, trouble if we're already leader * for it */ tk_log_warn("from %s: more up to date ticket at %s", site_string(sender), site_string(leader) ); return leader_handle_newer_ticket(tk, sender, leader, msg); } else { /* we have the ticket and we don't care */ return 0; } } else if (tk->state == ST_CANDIDATE) { if (leader == local) { /* a belated MY_INDEX, we're already trying to get the * ticket */ return 0; } } /* their ticket is either newer or not expired, don't * ignore it */ update_ticket_from_msg(tk, sender, msg); set_leader(tk, leader); update_ticket_state(tk, sender); save_committed_tkt(tk); set_ticket_wakeup(tk); return 0; } int raft_answer( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { int cmd, req; int rv; rv = 0; cmd = ntohl(msg->header.cmd); req = ntohl(msg->header.request); if (req) tk_log_debug("got %s (req %s) from %s", state_to_string(cmd), state_to_string(req), site_string(sender)); else tk_log_debug("got %s from %s", state_to_string(cmd), site_string(sender)); /* don't process tickets with invalid term */ if (cmd != OP_STATUS && msg_term_invalid(tk, sender, leader, msg)) return 0; switch (cmd) { case OP_REQ_VOTE: rv = answer_REQ_VOTE(tk, sender, leader, msg); break; case OP_VOTE_FOR: rv = process_VOTE_FOR(tk, sender, leader, msg); break; case OP_ACK: if (tk->leader == local && tk->state == ST_LEADER) rv = process_ACK(tk, sender, leader, msg); break; case OP_HEARTBEAT: if ((tk->leader != local || !term_time_left(tk)) && (tk->state == ST_INIT || tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) rv = answer_HEARTBEAT(tk, sender, leader, msg); else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(sender)); if (ticket_seems_ok(tk)) send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); rv = -EINVAL; } break; case OP_UPDATE: if (((tk->leader != local && tk->leader == leader) || !is_owned(tk)) && (tk->state == ST_INIT || tk->state == ST_FOLLOWER || tk->state == ST_CANDIDATE)) { rv = process_UPDATE(tk, sender, leader, msg); } else { tk_log_warn("unexpected message %s, from %s", state_to_string(cmd), site_string(sender)); if (ticket_seems_ok(tk)) send_reject(sender, tk, RLT_TERM_STILL_VALID, msg); rv = -EINVAL; } break; case OP_REJECTED: rv = process_REJECTED(tk, sender, leader, msg); break; case OP_REVOKE: rv = process_REVOKE(tk, sender, leader, msg); break; case OP_MY_INDEX: rv = process_MY_INDEX(tk, sender, leader, msg); break; case OP_STATUS: if (!tk->in_election) rv = send_msg(OP_MY_INDEX, tk, sender, msg); break; default: tk_log_error("unknown message %s, from %s", state_to_string(cmd), site_string(sender)); rv = -EINVAL; } return rv; } diff --git a/src/ticket.c b/src/ticket.c index 66abc4b..d094fa5 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,1152 +1,1157 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include #include #include #include +#include "b_config.h" #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "raft.h" #include "handler.h" #include "request.h" #define TK_LINE 256 extern int TIME_RES; /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node, 0)) { *is_local = node->local; return 1; } return 0; } /* is it safe to commit the grant? * if we didn't hear from all sites on the initial grant, we may * need to delay the commit * * TODO: investigate possibility to devise from history whether a * missing site could be holding a ticket or not */ static int ticket_dangerous(struct ticket_config *tk) { int tdiff; /* we may be invoked often, don't spam the log unnecessarily */ static int no_log_delay_msg; if (!is_time_set(&tk->delay_commit)) return 0; if (is_past(&tk->delay_commit) || all_sites_replied(tk)) { if (tk->leader == local) { tk_log_info("%s, committing to CIB", is_past(&tk->delay_commit) ? "ticket delay expired" : "all sites replied"); } time_reset(&tk->delay_commit); no_log_delay_msg = 0; return 0; } tdiff = time_left(&tk->delay_commit); tk_log_debug("delay ticket commit for another " intfmt(tdiff)); if (!no_log_delay_msg) { tk_log_info("delaying ticket commit to CIB for " intfmt(tdiff)); tk_log_info("(or all sites are reached)"); no_log_delay_msg = 1; } return 1; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; if (ticket_dangerous(tk)) return 1; if (tk->leader == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } tk->update_cib = 0; return 0; } /* Ask an external program whether getting the ticket * makes sense. * Eg. if the services have a failcount of INFINITY, * we can't serve here anyway. */ int test_external_prog(struct ticket_config *tk, int start_election) { int rv; rv = run_handler(tk, tk->ext_verifier, 1); if (rv) { tk_log_warn("we are not allowed to acquire ticket"); /* Give it to somebody else. * Just send a VOTE_FOR message, so the * others can start elections. */ if (leader_and_valid(tk)) { save_committed_tkt(tk); reset_ticket(tk); ticket_write(tk); if (start_election) { ticket_broadcast(tk, OP_VOTE_FOR, OP_REQ_VOTE, RLT_SUCCESS, OR_LOCAL_FAIL); } } } return rv; } /* Try to acquire a ticket * Could be manual grant or after ticket loss */ int acquire_ticket(struct ticket_config *tk, cmd_reason_t reason) { int rc; if (test_external_prog(tk, 0)) return RLT_EXT_FAILED; rc = new_election(tk, local, 1, reason); return rc ? RLT_SYNC_FAIL : 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk, int options) { int rv; tk_log_info("granting ticket"); if (tk->leader == local) return RLT_SUCCESS; if (is_owned(tk)) return RLT_OVERGRANT; set_future_time(&tk->delay_commit, tk->term_duration + tk->acquire_after); if (options & OPT_IMMEDIATE) { tk_log_warn("granting ticket immediately! If there are " "unreachable sites, _hope_ you are sure that they don't " "have the ticket!"); time_reset(&tk->delay_commit); } rv = acquire_ticket(tk, OR_ADMIN); if (rv) { time_reset(&tk->delay_commit); return rv; } else { return RLT_MORE; } } static void start_revoke_ticket(struct ticket_config *tk) { tk_log_info("revoking ticket"); save_committed_tkt(tk); reset_ticket(tk); set_leader(tk, no_leader); ticket_write(tk); ticket_broadcast(tk, OP_REVOKE, OP_ACK, RLT_SUCCESS, OR_ADMIN); } /** Ticket revoke. * Only to be started from the leader. */ int do_revoke_ticket(struct ticket_config *tk) { if (tk->acks_expected) { tk_log_info("delay ticket revoke until the current operation finishes"); tk->next_state = ST_INIT; return RLT_MORE; } else { start_revoke_ticket(tk); return RLT_SUCCESS; } } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char pending_str[64]; char *data, *cp; int i, alloc; time_t ts; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (is_time_set(&tk->term_expires)) { ts = wall_ts(&tk->term_expires); strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&ts)); } else strcpy(timeout_str, "INF"); if (tk->leader == local && is_time_set(&tk->delay_commit) && !is_past(&tk->delay_commit)) { ts = wall_ts(&tk->delay_commit); strcpy(pending_str, " (commit pending until "); strftime(pending_str + strlen(" (commit pending until "), sizeof(pending_str) - strlen(" (commit pending until ") - 1, "%F %T", localtime(&ts)); strcat(pending_str, ")"); } else *pending_str = '\0'; cp += snprintf(cp, alloc - (cp - data), "ticket: %s, leader: %s", tk->name, ticket_leader_string(tk)); if (is_owned(tk)) { cp += snprintf(cp, alloc - (cp - data), ", expires: %s%s\n", timeout_str, pending_str); } else { cp += snprintf(cp, alloc - (cp - data), "\n"); } if (alloc - (cp - data) <= 0) return -ENOMEM; } *pdata = data; *len = cp - data; return 0; } void disown_ticket(struct ticket_config *tk) { set_leader(tk, NULL); tk->is_granted = 0; get_time(&tk->term_expires); } int disown_if_expired(struct ticket_config *tk) { if (is_past(&tk->term_expires) || !tk->leader) { disown_ticket(tk); return 1; } return 0; } void reset_ticket(struct ticket_config *tk) { disown_ticket(tk); no_resends(tk); set_state(tk, ST_INIT); tk->voted_for = NULL; } static void reacquire_ticket(struct ticket_config *tk) { int valid; const char *where_granted = "\0"; char buff[64]; valid = is_time_set(&tk->term_expires) && !is_past(&tk->term_expires); if (tk->leader == local) { where_granted = "granted here"; } else { snprintf(buff, sizeof(buff), "granted to %s", site_string(tk->leader)); where_granted = buff; } if (!valid) { tk_log_warn("%s, but not valid " "anymore (will try to reacquire)", where_granted); } if (tk->is_granted && tk->leader != local) { if (tk->leader && tk->leader != no_leader) { tk_log_error("granted here, but also %s, " "that's really too bad (will try to reacquire)", where_granted); } else { tk_log_warn("granted here, but we're " "not recorded as the grantee (will try to reacquire)"); } } /* try to acquire the * ticket through new elections */ acquire_ticket(tk, OR_REACQUIRE); } void update_ticket_state(struct ticket_config *tk, struct booth_site *sender) { if (tk->state == ST_CANDIDATE) { tk_log_info("learned from %s about " "newer ticket, stopping elections", site_string(sender)); /* there could be rejects coming from others; don't log * warnings unnecessarily */ tk->expect_more_rejects = 1; } if (tk->leader == local || tk->is_granted) { /* message from a live leader with valid ticket? */ if (sender == tk->leader && term_time_left(tk)) { if (tk->is_granted) { tk_log_warn("ticket was granted here, " "but it's live at %s (revoking here)", site_string(sender)); } else { tk_log_info("ticket live at %s", site_string(sender)); } disown_ticket(tk); ticket_write(tk); set_state(tk, ST_FOLLOWER); tk->next_state = ST_FOLLOWER; } else { if (tk->state == ST_CANDIDATE) { set_state(tk, ST_FOLLOWER); } tk->next_state = ST_LEADER; } } else { if (!tk->leader || tk->leader == no_leader) { if (sender) tk_log_info("ticket is not granted"); else tk_log_info("ticket is not granted (from CIB)"); set_state(tk, ST_INIT); } else { if (sender) tk_log_info("ticket granted to %s (says %s)", site_string(tk->leader), site_string(sender)); else tk_log_info("ticket granted to %s (from CIB)", site_string(tk->leader)); set_state(tk, ST_FOLLOWER); /* just make sure that we check the ticket soon */ tk->next_state = ST_FOLLOWER; } } } int setup_ticket(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { reset_ticket(tk); if (local->type == SITE) { if (!pcmk_handler.load_ticket(tk)) { update_ticket_state(tk, NULL); } tk->update_cib = 1; } tk_log_info("broadcasting state query"); /* wait until all send their status (or the first * timeout) */ tk->start_postpone = 1; ticket_broadcast(tk, OP_STATUS, OP_MY_INDEX, RLT_SUCCESS, 0); } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; - struct boothc_header hdr; + struct boothc_hdr_msg hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; - init_header(&hdr, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); + init_header(&hdr.header, CL_LIST, 0, 0, RLT_SUCCESS, 0, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int process_client_request(struct client *req_client, struct boothc_ticket_msg *msg) { int rv, rc = 1; struct ticket_config *tk; int cmd; struct boothc_ticket_msg omsg; cmd = ntohl(msg->header.cmd); if (!check_ticket(msg->ticket.id, &tk)) { log_warn("client referenced unknown ticket %s", msg->ticket.id); rv = RLT_INVALID_ARG; goto reply_now; } if ((cmd == CMD_GRANT) && is_owned(tk)) { log_warn("client wants to grant an (already granted!) ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply_now; } if ((cmd == CMD_REVOKE) && !is_owned(tk)) { log_info("client wants to revoke a free ticket %s", msg->ticket.id); rv = RLT_TICKET_IDLE; goto reply_now; } if ((cmd == CMD_REVOKE) && tk->leader != local) { log_info("the ticket %s is not granted here, " "redirect to %s", msg->ticket.id, ticket_leader_string(tk)); rv = RLT_REDIRECT; goto reply_now; } if (cmd == CMD_REVOKE) rv = do_revoke_ticket(tk); else rv = do_grant_ticket(tk, ntohl(msg->header.options)); if (rv == RLT_MORE) { /* client may receive further notifications, save the * request for further processing */ add_req(tk, req_client, msg); tk_log_debug("queue request %s for client %d", state_to_string(cmd), req_client->fd); rc = 0; /* we're not yet done with the message */ } reply_now: init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk); - send_ticket_msg(req_client->fd, &omsg); + send_client_msg(req_client->fd, &omsg); return rc; } int notify_client(struct ticket_config *tk, struct client *req_client, struct boothc_ticket_msg *msg) { struct boothc_ticket_msg omsg; void (*deadfn) (int ci); int rv, rc, ci; int cmd, options; cmd = ntohl(msg->header.cmd); options = ntohl(msg->header.options); rv = tk->outcome; tk_log_debug("notifying client %d (request %s)", req_client->fd, state_to_string(cmd)); init_ticket_msg(&omsg, CL_RESULT, 0, rv, 0, tk); - rc = send_ticket_msg(req_client->fd, &omsg); + rc = send_client_msg(req_client->fd, &omsg); if (rc == 0 && ((rv == RLT_MORE) || (rv == RLT_CIB_PENDING && (options & OPT_WAIT_COMMIT)))) { /* more to do here, keep the request */ return 1; } else { /* we sent a definite answer or there was a write error, drop * the client */ if (!rc) { tk_log_debug("failed to notify client %d (request %s)", req_client->fd, state_to_string(cmd)); } deadfn = req_client->deadfn; if(deadfn) { ci = find_client_by_fd(req_client->fd); if (ci >= 0) deadfn(ci); } free(msg); return 0; /* we're done with this request */ } } int ticket_broadcast(struct ticket_config *tk, cmd_request_t cmd, cmd_request_t expected_reply, cmd_result_t res, cmd_reason_t reason) { struct boothc_ticket_msg msg; init_ticket_msg(&msg, cmd, 0, res, reason, tk); tk_log_debug("broadcasting '%s' (term=%d, valid=%d)", state_to_string(cmd), ntohl(msg.ticket.term), msg_term_time(&msg)); tk->last_request = cmd; if (expected_reply) { expect_replies(tk, expected_reply); } ticket_activate_timeout(tk); - return transport()->broadcast(&msg, sizeof(msg)); + return transport()->broadcast_auth(&msg, sendmsglen(&msg)); } /* update the ticket on the leader, write it to the CIB, and send out the update message to others with the new expiry time */ int leader_update_ticket(struct ticket_config *tk) { int rv = 0, rv2; timetype now; if (tk->ticket_updated >= 2) return 0; if (tk->ticket_updated < 1) { tk->ticket_updated = 1; get_time(&now); copy_time(&now, &tk->last_renewal); set_future_time(&tk->term_expires, tk->term_duration); rv = ticket_broadcast(tk, OP_UPDATE, OP_ACK, RLT_SUCCESS, 0); } if (tk->ticket_updated < 2) { rv2 = ticket_write(tk); switch(rv2) { case 0: tk->ticket_updated = 2; tk->outcome = RLT_SUCCESS; foreach_tkt_req(tk, notify_client); break; case 1: if (tk->outcome != RLT_CIB_PENDING) { tk->outcome = RLT_CIB_PENDING; foreach_tkt_req(tk, notify_client); } break; default: break; } } return rv; } static void log_lost_servers(struct ticket_config *tk) { struct booth_site *n; int i; if (tk->retry_number > 1) /* log those that we couldn't reach, but do * that only on the first retry */ return; for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (!(tk->acks_received & n->bitmask)) { tk_log_warn("%s %s didn't acknowledge our %s, " "will retry %d times", (n->type == ARBITRATOR ? "arbitrator" : "site"), site_string(n), state_to_string(tk->last_request), tk->retries); } } } static void resend_msg(struct ticket_config *tk) { struct booth_site *n; int i; if (!(tk->acks_received ^ local->bitmask)) { ticket_broadcast(tk, tk->last_request, 0, RLT_SUCCESS, 0); } else { for (i = 0; i < booth_conf->site_count; i++) { n = booth_conf->site + i; if (!(tk->acks_received & n->bitmask)) { tk_log_debug("resending %s to %s", state_to_string(tk->last_request), site_string(n) ); send_msg(tk->last_request, tk, n, NULL); } } ticket_activate_timeout(tk); } } static void handle_resends(struct ticket_config *tk) { int ack_cnt; if (++tk->retry_number > tk->retries) { tk_log_debug("giving up on sending retries"); no_resends(tk); set_ticket_wakeup(tk); return; } /* try to reach some sites again if we just stepped down */ if (tk->last_request == OP_VOTE_FOR) { tk_log_warn("no answers to our request (try #%d), " "we are alone", tk->retry_number); goto just_resend; } if (!majority_of_bits(tk, tk->acks_received)) { ack_cnt = count_bits(tk->acks_received) - 1; if (!ack_cnt) { tk_log_warn("no answers to our request (try #%d), " "we are alone", tk->retry_number); } else { tk_log_warn("not enough answers to our request (try #%d): " "only got %d answers", tk->retry_number, ack_cnt); } } else { log_lost_servers(tk); } just_resend: resend_msg(tk); } int postpone_ticket_processing(struct ticket_config *tk) { extern timetype start_time; return tk->start_postpone && (-time_left(&start_time) < tk->timeout); } static void process_next_state(struct ticket_config *tk) { switch(tk->next_state) { case ST_LEADER: reacquire_ticket(tk); break; case ST_INIT: no_resends(tk); start_revoke_ticket(tk); tk->outcome = RLT_SUCCESS; foreach_tkt_req(tk, notify_client); break; /* wanting to be follower is not much of an ambition; no * processing, just return; don't reset start_postpone until * we got some replies to status */ case ST_FOLLOWER: return; default: break; } tk->start_postpone = 0; } static void ticket_lost(struct ticket_config *tk) { if (tk->leader != local) { tk_log_warn("lost at %s", site_string(tk->leader)); } else { tk_log_warn("lost majority (revoking locally)"); } tk->lost_leader = tk->leader; save_committed_tkt(tk); reset_ticket(tk); set_state(tk, ST_FOLLOWER); if (local->type == SITE) { ticket_write(tk); schedule_election(tk, OR_TKT_LOST); } } static void next_action(struct ticket_config *tk) { switch(tk->state) { case ST_INIT: /* init state, handle resends for ticket revoke */ /* and rebroadcast if stepping down */ if (tk->acks_expected) { handle_resends(tk); } break; case ST_FOLLOWER: /* leader/ticket lost? and we didn't vote yet */ tk_log_debug("leader: %s, voted_for: %s", site_string(tk->leader), site_string(tk->voted_for)); if (!tk->leader) { if (!tk->voted_for || !tk->in_election) { disown_ticket(tk); if (!new_election(tk, NULL, 1, OR_AGAIN)) { ticket_activate_timeout(tk); } } else { /* we should restart elections in case nothing * happens in the meantime */ tk->in_election = 0; ticket_activate_timeout(tk); } } break; case ST_CANDIDATE: /* elections timed out? */ elections_end(tk); break; case ST_LEADER: /* timeout or ticket renewal? */ if (tk->acks_expected) { handle_resends(tk); if (majority_of_bits(tk, tk->acks_received)) { leader_update_ticket(tk); } } else { /* this is ticket renewal, run local test */ if (!test_external_prog(tk, 1)) { ticket_broadcast(tk, OP_HEARTBEAT, OP_ACK, RLT_SUCCESS, 0); tk->ticket_updated = 0; } } break; default: break; } } static void ticket_cron(struct ticket_config *tk) { /* don't process the tickets too early after start */ if (postpone_ticket_processing(tk)) { tk_log_debug("ticket processing postponed (start_postpone=%d)", tk->start_postpone); /* but run again soon */ ticket_activate_timeout(tk); return; } /* no need for status resends, we hope we got at least one * my_index back */ if (tk->acks_expected == OP_MY_INDEX) { no_resends(tk); } /* after startup, we need to decide what to do based on the * current ticket state; tk->next_state has a hint * also used for revokes which had to be delayed */ if (tk->next_state) { process_next_state(tk); goto out; } /* Has an owner, has an expiry date, and expiry date in the past? * Losing the ticket must happen in _every_ state. */ if (is_owned(tk) && is_time_set(&tk->term_expires) && is_past(&tk->term_expires)) { ticket_lost(tk); goto out; } next_action(tk); out: tk->next_state = 0; if (!tk->in_election && tk->update_cib) ticket_write(tk); } void process_tickets(void) { struct ticket_config *tk; int i; timetype last_cron; foreach_ticket(i, tk) { if (is_time_set(&tk->next_cron) && !is_past(&tk->next_cron)) continue; tk_log_debug("ticket cron"); copy_time(&tk->next_cron, &last_cron); ticket_cron(tk); if (time_cmp(&last_cron, &tk->next_cron, ==)) { tk_log_debug("nobody set ticket wakeup"); set_ticket_wakeup(tk); } } } void tickets_log_info(void) { struct ticket_config *tk; int i; time_t ts; foreach_ticket(i, tk) { ts = wall_ts(&tk->term_expires); tk_log_info("state '%s' " "term %d " "leader %s " "expires %-24.24s", state_to_string(tk->state), tk->current_term, ticket_leader_string(tk), ctime(&ts)); } } static void update_acks( struct ticket_config *tk, struct booth_site *sender, struct booth_site *leader, struct boothc_ticket_msg *msg ) { uint32_t cmd; uint32_t req; cmd = ntohl(msg->header.cmd); req = ntohl(msg->header.request); if (req != tk->last_request || (tk->acks_expected != cmd && tk->acks_expected != OP_REJECTED)) return; /* got an ack! */ tk->acks_received |= sender->bitmask; if (all_replied(tk) || /* we just stepped down, need only one site to start * elections */ (cmd == OP_REQ_VOTE && tk->last_request == OP_VOTE_FOR)) { no_resends(tk); tk->start_postpone = 0; set_ticket_wakeup(tk); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { uint32_t from; struct booth_site *source; struct ticket_config *tk; struct booth_site *leader; uint32_t leader_u; - if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || - msglen != sizeof(*msg)) { - log_error("message receive error"); - return -1; - } - from = ntohl(msg->header.from); if (!find_site_by_id(from, &source) || !source) { log_error("unknown sender: %08x", from); return -1; } + if (check_boothc_header(&msg->header, msglen) < 0) { + log_error("message from %s receive error", site_string(source)); + return -1; + } + + if (check_auth(source, msg, msglen)) { + log_error("%s failed to authenticate", site_string(source)); + return -1; + } + if (!check_ticket(msg->ticket.id, &tk)) { log_warn("got invalid ticket name %s from %s", msg->ticket.id, site_string(source)); return -EINVAL; } leader_u = ntohl(msg->ticket.leader); if (!find_site_by_id(leader_u, &leader)) { tk_log_error("message with unknown leader %u received", leader_u); return -EINVAL; } update_acks(tk, source, leader, msg); return raft_answer(tk, source, leader, msg); } static void log_next_wakeup(struct ticket_config *tk) { int left; left = time_left(&tk->next_cron); tk_log_debug("set ticket wakeup in " intfmt(left)); } /* New vote round; §5.2 */ /* delay the next election start for some random time * (up to 1 second) */ void add_random_delay(struct ticket_config *tk) { timetype tv; interval_add(&tk->next_cron, rand_time(min(1000, tk->timeout)), &tv); ticket_next_cron_at(tk, &tv); if (ANYDEBUG) { log_next_wakeup(tk); } } void set_ticket_wakeup(struct ticket_config *tk) { timetype near_future, tv, next_vote; /* At least every hour, perhaps sooner (default) */ ticket_next_cron_in(tk, 3600*TIME_RES); set_future_time(&near_future, 10); switch (tk->state) { case ST_LEADER: assert(tk->leader == local); get_next_election_time(tk, &next_vote); /* If timestamp is in the past, wakeup in * near future */ if (!is_time_set(&next_vote)) { tk_log_debug("next ts unset, wakeup soon"); ticket_next_cron_at(tk, &near_future); } else if (is_past(&next_vote)) { int tdiff = time_left(&next_vote); tk_log_debug("next ts in the past " intfmt(tdiff)); ticket_next_cron_at(tk, &near_future); } else { ticket_next_cron_at(tk, &next_vote); } break; case ST_CANDIDATE: assert(is_time_set(&tk->election_end)); ticket_next_cron_at(tk, &tk->election_end); break; case ST_INIT: case ST_FOLLOWER: /* If there is (or should be) some owner, check on it later on. * If no one is interested - don't care. */ if (is_owned(tk) && (local->type == SITE)) { interval_add(&tk->term_expires, tk->acquire_after, &tv); ticket_next_cron_at(tk, &tv); } break; default: tk_log_error("unknown ticket state: %d", tk->state); } if (tk->next_state) { /* we need to do something soon here */ if (!tk->acks_expected) { ticket_next_cron_at(tk, &near_future); } else { ticket_activate_timeout(tk); } } if (ANYDEBUG) { log_next_wakeup(tk); } } void schedule_election(struct ticket_config *tk, cmd_reason_t reason) { if (local->type != SITE) return; tk->election_reason = reason; get_time(&tk->next_cron); /* introduce a short delay before starting election */ add_random_delay(tk); } /* Given a state (in host byte order), return a human-readable (char*). * An array is used so that multiple states can be printed in a single printf(). */ char *state_to_string(uint32_t state_ho) { union mu { cmd_request_t s; char c[5]; }; static union mu cache[6] = { { 0 } }, *cur; static int current = 0; current ++; if (current >= sizeof(cache)/sizeof(cache[0])) current = 0; cur = cache + current; cur->s = htonl(state_ho); /* Shouldn't be necessary, union array is initialized with zeroes, and * these bytes never get written. */ cur->c[4] = 0; return cur->c; } int send_reject(struct booth_site *dest, struct ticket_config *tk, cmd_result_t code, struct boothc_ticket_msg *in_msg) { int req = ntohl(in_msg->header.cmd); struct boothc_ticket_msg msg; tk_log_debug("sending reject to %s", site_string(dest)); init_ticket_msg(&msg, OP_REJECTED, req, code, 0, tk); - return booth_udp_send(dest, &msg, sizeof(msg)); + return booth_udp_send_auth(dest, &msg, sendmsglen(&msg)); } int send_msg ( int cmd, struct ticket_config *current_tk, struct booth_site *dest, struct boothc_ticket_msg *in_msg ) { int req = 0; struct ticket_config *tk = current_tk; struct boothc_ticket_msg msg; if (cmd == OP_MY_INDEX) { if (current_tk->state == ST_CANDIDATE && current_tk->last_valid_tk->current_term) { tk = current_tk->last_valid_tk; } tk_log_info("sending status to %s", site_string(dest)); } if (in_msg) req = ntohl(in_msg->header.cmd); init_ticket_msg(&msg, cmd, req, RLT_SUCCESS, 0, tk); - return booth_udp_send(dest, &msg, sizeof(msg)); + return booth_udp_send_auth(dest, &msg, sendmsglen(&msg)); } diff --git a/src/transport.c b/src/transport.c index ebaf556..51caecd 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,742 +1,826 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013-2014 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include #include #include #include #include #include #include #include +#include "b_config.h" #include "booth.h" #include "inline-fn.h" #include "log.h" #include "config.h" #include "ticket.h" #include "transport.h" +#include "auth.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } enum match_type { NO_MATCH = 0, FUZZY_MATCH, EXACT_MATCH, }; static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me, int *address_bits_matched) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; int matched; enum match_type did_match = NO_MATCH; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); for(matched = 0; matched < node->addrlen; matched++) if (ipaddr[matched] != n_a[matched]) break; if (matched == node->addrlen) { /* Full match. */ *address_bits_matched = matched * 8; found: *me = node; did_match = EXACT_MATCH; continue; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (matched < bytes) continue; if (matched * 8 < *address_bits_matched) continue; if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) { /* _At_least_ prefixlen bits matched. */ *address_bits_matched = prefixlen; if (did_match < EXACT_MATCH) { *me = node; did_match = FUZZY_MATCH; } } } return did_match; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; int address_bits_matched; if (local) goto found; me = NULL; address_bits_matched = 0; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); /* prefer IFA_LOCAL if it exists, for p-t-p * interfaces, otherwise use IFA_ADDRESS */ if (tb[IFA_LOCAL]) { memcpy(ipaddr, RTA_DATA(tb[IFA_LOCAL]), BOOTH_IPADDR_LEN); } else { memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); } /* First try with exact addresses, then optionally with subnet matching. */ if (ifa->ifa_prefixlen > address_bits_matched) find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me, &address_bits_matched); } h = NLMSG_NEXT(h, status); } } close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", - l, len_incl_data); + len_incl_data, l); return -EINVAL; } return len_incl_data; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } int setup_tcp_listener(int test_only) { int s, rv; int one = 1; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); return rv; } rv = bind(s, &local->sa6, local->saddrlen); if (test_only) { rv = (rv == -1) ? errno : 0; close(s); return rv; } if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(0); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connect to %s got a timeout", site_string(to)); else log_error("connect to %s got an error: %s", site_string(to), strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { - return do_write(to->tcp_fd, buf, len); + int rv; + + rv = add_hmac(buf, len); + if (!rv) + rv = do_write(to->tcp_fd, buf, len); + + return rv; } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) { log_error("read failed (%d): %s", errno, strerror(errno)); return got; } return len; } +static int booth_tcp_recv_auth(struct booth_site *from, void *buf, int len) +{ + int got, total; + int payload_len; + /* Needs timeouts! */ + + payload_len = len - sizeof(struct hmac); + got = booth_tcp_recv(from, buf, payload_len); + if (got < 0) { + return got; + } + total = got; + if (is_auth_req()) { + got = booth_tcp_recv(from, (unsigned char *)buf+payload_len, sizeof(struct hmac)); + if (got != sizeof(struct hmac) || check_auth(from, buf, len)) { + return -1; + } + total += got; + } + return total; +} + static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } static int setup_udp_server(void) { int rv, fd; int one = 1; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); if (rv == -1) { log_error("failed to set the SO_REUSEADDR option"); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", site_string(local), booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; char buffer[256]; /* Used for unit tests */ struct boothc_ticket_msg *msg; sa_len = sizeof(sa); msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; deliver_fn(msg, rv); } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); if (rv == len) { rv = 0; } else if (rv < 0) { log_error("Cannot send to %s: %d %s", site_string(to), errno, strerror(errno)); } else { rv = -1; log_error("Packet sent to %s got truncated", site_string(to)); } return rv; } -static int booth_udp_broadcast(void *buf, int len) +int booth_udp_send_auth(struct booth_site *to, void *buf, int len) +{ + int rv; + + rv = add_hmac(buf, len); + if (rv < 0) + return rv; + return booth_udp_send(to, buf, len); +} + +static int booth_udp_broadcast_auth(void *buf, int len) { int i, rv, rvs; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; + rv = add_hmac(buf, len); + if (rv < 0) + return rv; + rvs = 0; foreach_node(i, site) { if (site != local) { rv = booth_udp_send(site, buf, len); if (!rvs) rvs = rv; } } return rvs; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, + .recv_auth = booth_tcp_recv_auth, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, + .send_auth = booth_udp_send_auth, .close = return_0_booth_site, - .broadcast = booth_udp_broadcast, + .broadcast_auth = booth_udp_broadcast_auth, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; const struct booth_transport *local_transport = booth_transport+TCP; -int send_header_only(int fd, struct boothc_header *hdr) +/* data + (datalen-sizeof(struct hmac)) points to struct hmac + * i.e. struct hmac is always tacked on the payload + */ +int add_hmac(void *data, int len) { - int rv; + int rv = 0; +#if HAVE_LIBMHASH + int payload_len; + struct hmac *hp; - rv = do_write(fd, hdr, sizeof(*hdr)); + if (!is_auth_req()) + return 0; + + rv = update_authkey(); + if (rv < 0) { + return rv; + } + payload_len = len - sizeof(struct hmac); + hp = (struct hmac *)((unsigned char *)data + payload_len); + hp->hid = htonl(BOOTH_HASH); + memset(hp->hash, 0, BOOTH_MAC_SIZE); + rv = calc_hmac(data, payload_len, BOOTH_HASH, hp->hash, + booth_conf->authkey, booth_conf->authkey_len); + if (rv < 0) { + log_error("internal error: cannot calculate mac"); + } +#endif return rv; } - -int send_ticket_msg(int fd, struct boothc_ticket_msg *msg) +int check_auth(struct booth_site *from, void *buf, int len) { - int rv; + int rv = 0; +#if HAVE_LIBMHASH + int payload_len; + struct hmac *hp; + + if (!is_auth_req()) + return 0; - rv = do_write(fd, msg, sizeof(*msg)); + rv = update_authkey(); + if (rv < 0) { + return rv; + } + payload_len = len - sizeof(struct hmac); + hp = (struct hmac *)((unsigned char *)buf + payload_len); + rv = verify_hmac(buf, payload_len, ntohl(hp->hid), hp->hash, + booth_conf->authkey, booth_conf->authkey_len); + if (rv < 0 && from) { + log_error("%s failed to authenticate", site_string(from)); + } +#endif return rv; } +int send_data(int fd, void *data, int datalen) +{ + int rv = 0; + + rv = add_hmac(data, datalen); + if (!rv) + rv = do_write(fd, data, datalen); + + return rv; +} -int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len) +int send_header_plus(int fd, struct boothc_hdr_msg *msg, void *data, int len) { int rv; - int l; - if (data == hdr->data) { - l = sizeof(*hdr) + len; - assert(l == ntohl(hdr->length)); + rv = send_data(fd, msg, sendmsglen(msg)-len); - /* One struct */ - rv = do_write(fd, hdr, l); - } else { - /* Header and data in two locations */ - rv = send_header_only(fd, hdr); - - if (rv >= 0 && len) - rv = do_write(fd, data, len); - } + if (rv >= 0 && len) + rv = do_write(fd, data, len); return rv; } diff --git a/src/transport.h b/src/transport.h index 9ac7094..b519145 100644 --- a/src/transport.h +++ b/src/transport.h @@ -1,77 +1,84 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef _TRANSPORT_H #define _TRANSPORT_H +#include "b_config.h" #include "booth.h" typedef enum { TCP = 1, UDP, SCTP, TRANSPORT_ENTRIES, } transport_layer_t; typedef enum { ARBITRATOR = 0x50, SITE, CLIENT, DAEMON, STATUS, } action_t; struct booth_transport { const char *name; int (*init) (void *); int (*open) (struct booth_site *); int (*send) (struct booth_site *, void *, int); + int (*send_auth) (struct booth_site *, void *, int); int (*recv) (struct booth_site *, void *, int); + int (*recv_auth) (struct booth_site *, void *, int); int (*broadcast) (void *, int); + int (*broadcast_auth) (void *, int); int (*close) (struct booth_site *); int (*exit) (void); }; extern const struct booth_transport booth_transport[TRANSPORT_ENTRIES]; int find_myself(struct booth_site **me, int fuzzy_allowed); int check_boothc_header(struct boothc_header *data, int len_incl_data); int setup_tcp_listener(int test_only); int booth_udp_send(struct booth_site *to, void *buf, int len); +int booth_udp_send_auth(struct booth_site *to, void *buf, int len); int booth_tcp_open(struct booth_site *to); int booth_tcp_send(struct booth_site *to, void *buf, int len); inline static void * node_to_addr_pointer(struct booth_site *node) { switch (node->family) { case AF_INET: return &node->sa4.sin_addr; case AF_INET6: return &node->sa6.sin6_addr; } return NULL; } extern const struct booth_transport *local_transport; -int send_header_only(int fd, struct boothc_header *hdr); -int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len); -int send_ticket_msg(int fd, struct boothc_ticket_msg *msg); +int send_data(int fd, void *data, int datalen); +int send_header_plus(int fd, struct boothc_hdr_msg *hdr, void *data, int len); +#define send_client_msg(fd, msg) send_data(fd, msg, sendmsglen(msg)) +int add_hmac(void *data, int len); +int check_auth(struct booth_site *from, void *buf, int len); #endif /* _TRANSPORT_H */