diff --git a/README-testing b/README-testing new file mode 100644 index 0000000..3c50858 --- /dev/null +++ b/README-testing @@ -0,0 +1,22 @@ + +=== Simple tests (commandline, config file) + + +Run + + # make check-TESTS + +to run the tests written in python. + + + + +=== Unit tests + +These use gdb and pexpect to set boothd state to some configured value, +injecting some input and looking at the output. + + # python script/unit-test.py src/boothd unit-tests/ + + + diff --git a/script/unit-test.py b/script/unit-test.py new file mode 100644 index 0000000..0363234 --- /dev/null +++ b/script/unit-test.py @@ -0,0 +1,470 @@ +#!/usr/bin/python +# vim: fileencoding=utf-8 +# see http://stackoverflow.com/questions/728891/correct-way-to-define-python-source-code-encoding + +import os, sys, time, signal, tempfile, socket, posix, time +import re, shutil, pexpect, logging +import random, copy, glob, traceback + + +# Don't make that much sense - function/line is write(). +# Would have to use traceback.extract_stack() manually. +# %(funcName)10.10s:%(lineno)3d %(levelname)8s +default_log_format = '%(asctime)s: %(message)s' +default_log_datefmt = '%b %d %H:%M:%S' + + +# {{{ pexpect-logging glue +# needed for use as pexpect.logfile, to relay into existing logfiles +class expect_logging(): + prefix = "" + test = None + + def __init__(self, pre, inst): + self.prefix = pre + self.test = inst + + def flush(self, *arg): + pass + def write(self, stg): + if self.test.dont_log_expect == 0: + # TODO: split by input/output, give program + for line in re.split(r"[\r\n]+", stg): + if line == self.test.prompt: + continue + if line == "": + continue + logging.debug(" " + self.prefix + " " + line) +# }}} + + +class UT(): +# {{{ Members + binary = None + test_base = None + lockfile = None + + defaults = None + + this_port = None + this_site = "127.0.0.1" + this_site_id = None + + gdb = None + booth = None + prompt = "CUSTOM-GDB-PROMPT-%d-%d" % (os.getpid(), time.time()) + + dont_log_expect = 0 + + udp_sock = None +# }}} + + +# {{{ setup functions + @classmethod + def _filename(cls, desc): + return "/tmp/booth-unittest.%s" % desc + return "/tmp/booth-unittest.%d.%s" % (os.getpid(), desc) + + + def __init__(self, bin, dir): + self.binary = os.path.realpath(bin) + self.test_base = os.path.realpath(dir) + "/" + self.defaults = self.read_test_input(self.test_base + "_defaults.txt", state="ticket") + self.lockfile = UT._filename("lock") + self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + + def read_test_input(self, file, state=None, m={ "ticket": {}, "message": {} } ): + fo = open(file, "r") + for line in fo.readlines(): + # comment? + if re.match(r"^\s*#", line): + continue + # empty line + if re.match(r"^\s*$", line): + continue + + # message resp. ticket + res = re.match(r"^\s*(\w+)\s*:\s*$", line) + if res: + state = res.group(1) + if not m.has_key(state): + m[state] = {} + continue + + assert(state) + + res = re.match(r"^\s*(\S+)\s*(.*)\s*$", line) + if res: + assert(state) + if not m[state]: + m[state] = {} + m[state][ res.group(1) ] = res.group(2) + return m + + + def setup_log(self, **args): + global default_log_format + global default_log_datefmt + + this_test_log = logging.FileHandler( mode = "w", **args ) + this_test_log.setFormatter( + logging.Formatter(fmt = default_log_format, + datefmt = default_log_datefmt) ) + + this_test_log.emit( + logging.makeLogRecord( { + "msg": "## vim: set ft=messages : ##", + "lineno": 0, + "levelname": "None", + "level": None,} ) ) + + # in the specific files we want ALL information + this_test_log.setLevel(logging.DEBUG) + + logging.getLogger('').addHandler(this_test_log) + return this_test_log + + + # We want shorthand in descriptions, ie. "state" + # instead of "booth_conf->ticket[0].state". + def translate_shorthand(self, name, context): + if context == 'ticket': + return "booth_conf->ticket[0]." + name + if context == 'message': + return "msg->" + name + assert(False) + + + + + def stop_processes(self): + if os.access(self.lockfile, os.F_OK): + os.unlink(self.lockfile) + # In case the boothd process is already dead, isalive() would still return True + # (because GDB still has it), but terminate() does fail. + # So we just quit GDB, and that might take the boothd with it - + # if not, we terminate it ourselves. + if self.gdb: + self.gdb.close( force=True ); + if self.booth: + self.booth.close( force=self.booth.isalive() ) + + + def start_a_process(self, bin, **args): + name = re.sub(r".*/", "", bin) + # How to get stderr, too? + expct = pexpect.spawn(bin, + env = dict( os.environ.items() + + [('PATH', + self.test_base + "/bin/:" + + os.getenv('PATH')), + ('LC_ALL', 'C'), + ('LANG', 'C')] ), + timeout = 30, + maxread = 32768, + **args) + expct.setecho(False) + expct.logfile_read = expect_logging("<- %s" % name, self) + expct.logfile_send = expect_logging(" -> %s" % name, self) + return expct + + + def start_processes(self): + self.booth = self.start_a_process(self.binary, + args = [ "daemon", "-D", + "-c", self.test_base + "/booth.conf", + "-s", "127.0.0.1", + "-l", self.lockfile, + ]) + logging.info("started booth with PID %d, lockfile %s" % (self.booth.pid, self.lockfile)) + self.booth.expect("BOOTH site daemon is starting", timeout=2) + #print self.booth.before; exit + + self.gdb = self.start_a_process("gdb", + args=["-quiet", + "-p", str(self.booth.pid), + "-nx", "-nh", # don't use .gdbinit + ]) + logging.info("started GDB with PID %d" % self.gdb.pid) + self.gdb.expect("(gdb)") + self.gdb.sendline("set pagination off\n") + self.gdb.sendline("set interactive-mode off\n") + self.gdb.sendline("set verbose off\n") ## sadly to late for the initial "symbol not found" messages + self.gdb.sendline("set prompt " + self.prompt + "\\n\n"); + self.sync(2000) + #os.system("strace -o /tmp/sfdgs -f -tt -s 2000 -p %d &" % self.gdb.pid) + + self.this_site_id = self.query_value("local->site_id") + self.this_port = int(self.query_value("booth_conf->port")) + + # do a self-test + self.check_value("local->site_id", self.this_site_id); + + # Now we're set up. + self.send_cmd("break ticket_cron") + self.send_cmd("break booth_udp_send") + self.send_cmd("break booth_udp_broadcast") + self.send_cmd("break recvfrom") +# }}} + + +# {{{ GDB communication + def sync(self, timeout=-1): + self.gdb.expect(self.prompt, timeout) + + answer = self.gdb.before + + self.dont_log_expect += 1 + # be careful not to use RE characters like +*.[] etc. + r = str(random.randint(2**19, 2**20)) + self.gdb.sendline("print " + r) + self.gdb.expect(r, timeout) + self.gdb.expect(self.prompt, timeout) + self.dont_log_expect -= 1 + return answer # send a command to GDB, returning the GDB answer as string. + + def send_cmd(self, stg, timeout=-1): + # give booth a chance to get its messages out + try: + self.booth.read_nonblocking(64*1024, 0) + except pexpect.TIMEOUT: + pass + finally: + pass + + self.gdb.sendline(stg) + return self.sync(timeout=timeout) + + def _query_value(self, which): + val = self.send_cmd("print " + which) + cleaned = re.search(r"^\$\d+ = (.*\S)\s*$", val, re.MULTILINE) + if not cleaned: + self.user_debug("query failed") + return cleaned.group(1) + + def query_value(self, which): + res = self._query_value(which) + logging.debug("query_value: «%s» evaluates to «%s»" % (which, res)) + return res + + def check_value(self, which, value): + val = self._query_value("(" + which + ") == (" + value + ")") + logging.debug("check_value: «%s» is «%s»: %s" % (which, value, val)) + if val == "1": + return True + # for easier (test) debugging we'll show the _real_ value, too. + has = self._query_value(which) + logging.error("«%s»: expected «%s», got «%s»." % (which, value, has)) + sys.exit(1) + + # Send data to GDB, to inject them into the binary. + # Handles different data types + def set_val(self, name, value, numeric_conv=None): + logging.debug("setting value «%s» to «%s» (num_conv %s)" %(name, value, numeric_conv)) + # string value? + if re.match(r'^"', value): + self.send_cmd("print strcpy(" + name + ", " + value + ")") + # numeric + elif numeric_conv: + self.send_cmd("set variable " + name + " = " + numeric_conv + "(" + value + ")") + else: + self.send_cmd("set variable " + name + " = " + value) + logging.debug("set_val %s done" % name) +# }}} GDB communication + + + # there has to be some event waiting, so that boothd stops again. + def continue_debuggee(self, timeout=30): + return self.send_cmd("continue", timeout) + + +# {{{ High-level functions. +# Generally, GDB is attached to BOOTHD, and has it stopped. + def set_state(self, kv): + #os.system("strace -f -tt -s 2000 -e write -p" + str(self.gdb.pid) + " &") + for n, v in kv.iteritems(): + self.set_val( self.translate_shorthand(n, "ticket"), v) + logging.info("set state") + + + def user_debug(self, txt): + print self.gdb.buffer + print "\n\nProblem detected (%s), entering interactive mode.\n\n" % txt + # can't use send_cmd, doesn't reply with expected prompt anymore. + self.gdb.interact() + #while True: + # sys.stdout.write("GDB> ") + # sys.stdout.flush() + # x = sys.stdin.readline() + # if not x: + # break + # self.send_cmd(x) + self.gdb.sendline("set prompt GDB> \n") + self.gdb.setecho(True) + self.stop_processes() + sys.exit(0) + + + def wait_for_function(self, fn): + while True: + stopped_at = self.continue_debuggee(timeout=3) + if not stopped_at: + self.user_debug("Not stopped at any breakpoint?") + if re.search(r"^Program received signal SIGSEGV,", stopped_at, re.MULTILINE): + self.user_debug("Segfault") + if re.search(r"^Breakpoint \d+, (0x\w+ in )?%s " % fn, stopped_at, re.MULTILINE): + break + logging.info("Now in %s" % fn) + + # We break, change the data, and return the correct size. + def send_message(self, msg): + self.udp_sock.sendto('a', (socket.gethostbyname(self.this_site), self.this_port)) + + self.wait_for_function("recvfrom") + # drain input, but stop afterwards for changing data + self.send_cmd("finish") + # step over length assignment + self.send_cmd("next") + + # push message. + for (n, v) in msg.iteritems(): + self.set_val( "msg->" + n, v, "htonl") + + # set "received" length + self.set_val("rv", "msg->header.length", "ntohl") + + # the next thing should run continue via wait_for_function + + def wait_outgoing(self, msg): + self.wait_for_function("booth_udp_send") + for (n, v) in msg.iteritems(): + self.check_value( "ntohl(((struct boothc_ticket_msg *)buf)->" + n + ")", v) + logging.info("out gone") + #stopped_at = self.sync() + + def merge_dicts(self, base, overlay): + return dict(base.items() + overlay.items()) + + + def loop(self, data): + matches = map(lambda k: re.match(r"^(outgoing|message)(\d+)$", k), data.iterkeys()) + valid_matches = filter(None, matches) + nums = map(lambda m: int(m.group(2)), valid_matches) + loop_max = max(nums) + for counter in range(0, loop_max+1): # incl. last message + logging.info("Part " + str(counter)) + kmsg = 'message%d' % counter + msg = data.get(kmsg) + if msg: + logging.info("sending " + kmsg) + self.send_message(self.merge_dicts(data["message"], msg)) + kout = 'outgoing%d' % counter + out = data.get(kout) + if out: + logging.info("waiting for " + kout) + self.wait_outgoing(out) + logging.info("loop ends") + + def do_finally(self, data): + if not data: + return + + # Allow debuggee to reach a stable state + time.sleep(1) + # stop it + posix.kill(self.booth.pid, signal.SIGINT) + # sync with GDB + self.query_value("42") + + for (n, v) in data.iteritems(): + self.check_value( "booth_conf->ticket[0]." + n, v) + + + def run(self): + os.chdir(self.test_base) + # TODO: sorted, random order + for f in filter( (lambda f: re.match(r"^\d\d\d_.*\.txt$", f)), glob.glob("*")): + log = None + try: + log = self.setup_log(filename = UT._filename(f)) + + log.setLevel(logging.DEBUG) + logging.warn("running test %s" % f) + self.start_processes() + + test = self.read_test_input(f, m=copy.deepcopy(self.defaults)) + self.set_state(test["ticket"]) + self.loop(test) + self.do_finally(test.get("finally")) + logging.warn("test %s ends" % f) + except: + logging.error("Broke in %s: %s" % (f, sys.exc_info())) + for frame in traceback.format_tb(sys.exc_traceback): + logging.info(" - %s " % frame.rstrip()) + finally: + self.stop_processes() + if log: + log.close() + return +# }}} + + +## +##class Message(UT): +## def set_break(): +## "message_recv" +## +## # set data, with automatic htonl() for network messages. +## def send_vals(self, data): +## for n, v in data.iteritems(): +## self.set_val("msg->" + n, v, "htonl") +## +##class Ticket(UT): +## # set ticket data - +## def send_vals(self, data): +## for (n, v) in data: +## self.set_val(n, v) + +#def traceit(frame, event, arg): +# if event == "line": +# lineno = frame.f_lineno +# print frame.f_code.co_filename, ":", "line", lineno +# return traceit + + +# {{{ main +if __name__ == '__main__': + if os.geteuid() == 0: + sys.stderr.write("Must be run non-root; aborting.\n") + sys.exit(1) + + + ut = UT(sys.argv[1], sys.argv[2] + "/") + + # "master" log object needs max level + logging.basicConfig(level = logging.DEBUG, + filename = "/dev/null", + filemode = "a", + format = default_log_format, + datefmt = default_log_datefmt) + + + overview_log = ut.setup_log( filename = UT._filename('seq') ) + overview_log.setLevel(logging.WARN) + + # http://stackoverflow.com/questions/9321741/printing-to-screen-and-writing-to-a-file-at-the-same-time + console = logging.StreamHandler() + console.setFormatter(logging.Formatter(' # %(message)s')) + console.setLevel(logging.WARN) + logging.getLogger('').addHandler(console) + + + logging.info("Starting boothd unit tests.") + + #sys.settrace(traceit) + + ret = ut.run() + sys.exit(ret) +# }}} diff --git a/src/ticket.c b/src/ticket.c index d1313ff..e0e243b 100644 --- a/src/ticket.c +++ b/src/ticket.c @@ -1,608 +1,612 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include "ticket.h" #include "config.h" #include "pacemaker.h" #include "inline-fn.h" #include "log.h" #include "booth.h" #include "paxos.h" #define TK_LINE 256 /* Untrusted input, must fit (incl. \0) in a buffer of max chars. */ int check_max_len_valid(const char *s, int max) { int i; for(i=0; iticket_count; i++) { if (!strcmp(booth_conf->ticket[i].name, ticket)) { if (found) *found = booth_conf->ticket + i; return 1; } } return 0; } int check_ticket(char *ticket, struct ticket_config **found) { if (found) *found = NULL; if (!booth_conf) return 0; if (!check_max_len_valid(ticket, sizeof(booth_conf->ticket[0].name))) return 0; return find_ticket_by_name(ticket, found); } int check_site(char *site, int *is_local) { struct booth_site *node; if (!check_max_len_valid(site, sizeof(node->addr_string))) return 0; if (find_site_by_name(site, &node)) { *is_local = node->local; return 1; } return 0; } /** Find out what others think about this ticket. * * If we're a SITE, we can ask (and have to tell) Pacemaker. * An ARBITRATOR can only ask others. */ static int ticket_send_catchup(struct ticket_config *tk) { int i, rv = 0; struct booth_site *site; struct boothc_ticket_msg msg; foreach_node(i, site) { if (!site->local) { init_ticket_msg(&msg, CMD_CATCHUP, RLT_SUCCESS, tk); log_debug("attempting catchup from %s", site->addr_string); rv = booth_udp_send(site, &msg, sizeof(msg)); } } ticket_activate_timeout(tk); return rv; } int ticket_write(struct ticket_config *tk) { if (local->type != SITE) return -EINVAL; disown_if_expired(tk); pcmk_handler.store_ticket(tk); if (tk->owner == local) { pcmk_handler.grant_ticket(tk); } else { pcmk_handler.revoke_ticket(tk); } return 0; } /** Try to get the ticket for the local site. * */ int do_grant_ticket(struct ticket_config *tk) { int rv; if (tk->owner == local) return RLT_SUCCESS; if (tk->owner) return RLT_OVERGRANT; rv = paxos_start_round(tk, local); return rv; } /** Start a PAXOS round for revoking. * That can be started from any site. */ int do_revoke_ticket(struct ticket_config *tk) { int rv; if (!tk->owner) return RLT_SUCCESS; rv = paxos_start_round(tk, NULL); return rv; } int list_ticket(char **pdata, unsigned int *len) { struct ticket_config *tk; char timeout_str[64]; char *data, *cp; int i, alloc; *pdata = NULL; *len = 0; alloc = 256 + booth_conf->ticket_count * (BOOTH_NAME_LEN * 2 + 128); data = malloc(alloc); if (!data) return -ENOMEM; cp = data; foreach_ticket(i, tk) { if (tk->expires != 0) strftime(timeout_str, sizeof(timeout_str), "%F %T", localtime(&tk->expires)); else strcpy(timeout_str, "INF"); cp += sprintf(cp, "ticket: %s, owner: %s, expires: %s\n", tk->name, tk->owner ? tk->owner->addr_string : "None", timeout_str); *len = cp - data; assert(*len < alloc); } *pdata = data; return 0; } int setup_ticket(void) { struct ticket_config *tk; int i; /* TODO */ foreach_ticket(i, tk) { tk->owner = NULL; tk->expires = 0; abort_proposal(tk); if (local->role & PROPOSER) { pcmk_handler.load_ticket(tk); } } return 0; } int ticket_answer_list(int fd, struct boothc_ticket_msg *msg) { char *data; int olen, rv; struct boothc_header hdr; rv = list_ticket(&data, &olen); if (rv < 0) return rv; init_header(&hdr, CMR_LIST, RLT_SUCCESS, sizeof(hdr) + olen); return send_header_plus(fd, &hdr, data, olen); } int ticket_answer_grant(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (tk->owner) { log_error("client wants to get an granted ticket %s", msg->ticket.id); rv = RLT_OVERGRANT; goto reply; } rv = do_grant_ticket(tk); reply: init_header(&msg->header, CMR_GRANT, rv ?: RLT_ASYNC, sizeof(*msg)); return send_ticket_msg(fd, msg); } int ticket_answer_revoke(int fd, struct boothc_ticket_msg *msg) { int rv; struct ticket_config *tk; if (!check_ticket(msg->ticket.id, &tk)) { log_error("Client asked to grant unknown ticket"); rv = RLT_INVALID_ARG; goto reply; } if (!tk->owner) { log_info("client wants to revoke a free ticket \"%s\"", msg->ticket.id); rv = RLT_SUCCESS; goto reply; } rv = do_revoke_ticket(tk); reply: init_ticket_msg(msg, CMR_REVOKE, rv ?: RLT_ASYNC, tk); return send_ticket_msg(fd, msg); } /** Got a CMD_CATCHUP query. * In this file because it's mostly used during startup. */ static int ticket_answer_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_debug("got CATCHUP query for \"%s\" from %s", msg->ticket.id, from->addr_string); /* We do _always_ answer. * In case all booth daemons are restarted at the same time, nobody * would answer any questions, leading to timeouts and delays. * Just admit we don't know. */ rv = (tk->state == ST_INIT) ? RLT_PROBABLY_SUCCESS : RLT_SUCCESS; init_ticket_msg(msg, CMR_CATCHUP, rv, tk); return booth_udp_send(from, msg, sizeof(*msg)); } /** Got a CMR_CATCHUP message. * Gets handled here because it's not PAXOS per se, * but only needed during startup. */ static int ticket_process_catchup( struct ticket_config *tk, struct booth_site *from, struct boothc_ticket_msg *msg, uint32_t ballot, struct booth_site *new_owner) { int rv; log_info("got CATCHUP answer for \"%s\" from %s; says owner %s with ballot %d", tk->name, from->addr_string, ticket_owner_string(new_owner), ballot); rv = ntohl(msg->header.result); + if (rv != RLT_SUCCESS && + rv != RLT_PROBABLY_SUCCESS) { + log_error("dropped because of wrong rv: 0x%x", rv); + return -EINVAL; + } + if (ballot == tk->new_ballot && ballot == tk->last_ack_ballot && - new_owner == tk->owner && - (rv == RLT_SUCCESS || - rv == RLT_PROBABLY_SUCCESS)) { + new_owner == tk->owner) { /* Peer says the same thing we're believing. */ tk->proposal_acknowledges |= from->bitmask; tk->expires = ntohl(msg->ticket.expiry) + time(NULL); if (promote_ticket_state(tk)) { if (tk->state == ST_INIT) tk->state = ST_STABLE; } disown_if_expired(tk); goto ex; } if (ballot >= tk->new_ballot && ballot >= tk->last_ack_ballot && rv == RLT_SUCCESS) { /* Peers seems to know better, but as yet we only have _her_ * word for that. */ accept: tk->expires = ntohl(msg->ticket.expiry) + time(NULL); tk->new_ballot = ballot; tk->last_ack_ballot = ballot; tk->owner = new_owner; tk->proposal_acknowledges = from->bitmask; /* We stay in ST_INIT and wait for confirmation. */ goto ex; } if (ballot >= tk->last_ack_ballot && rv == RLT_PROBABLY_SUCCESS && tk->state == ST_INIT && tk->retry_number > 3) { /* Peer seems to know better than us, and there's no * convincing other report. Just take it. */ tk->state = ST_STABLE; goto accept; } if (ballot < tk->new_ballot || ballot < tk->last_ack_ballot) { /* Peer seems outdated ... tell it to reload? */ #if 0 init_ticket_msg(&msg, CMD_DO_CATCHUP, RLT_SUCCESS, tk, &tk->current_state); #endif } ex: ticket_write(tk); return 0; } /** Send new state request to all sites. * Perhaps this should take a flag for ACCEPTOR etc.? * No need currently, as all nodes are more or less identical. */ int ticket_broadcast_proposed_state(struct ticket_config *tk, cmd_request_t state) { struct boothc_ticket_msg msg; tk->proposal_acknowledges = local->bitmask; tk->state = state; init_ticket_msg(&msg, state, RLT_SUCCESS, tk); msg.ticket.owner = htonl(get_node_id(tk->proposed_owner)); log_debug("broadcasting %s for ticket \"%s\"", STATE_STRING(state), tk->name); return transport()->broadcast(&msg, sizeof(msg)); } static void ticket_cron(struct ticket_config *tk) { time_t now; now = time(NULL); switch(tk->state) { case ST_INIT: /* Unknown state, ask others. */ ticket_send_catchup(tk); return; case OP_COMMITTED: case ST_STABLE: /* Has an owner, has an expiry date, and expiry date in the past? */ if (tk->expires && tk->owner && now > tk->expires) { log_info("LOST ticket: \"%s\" no longer at %s", tk->name, ticket_owner_string(tk->owner)); /* Couldn't renew in time - ticket lost. */ tk->owner = NULL; abort_proposal(tk); ticket_write(tk); if (tk->acquire_after) ticket_next_cron_in(tk, tk->acquire_after); } /* Do we need to refresh? */ if (tk->owner == local && ticket_valid_for(tk) < tk->expiry/2) { log_info("RENEW ticket \"%s\"", tk->name); paxos_start_round(tk, local); /* TODO: remember when we started, and restart afresh after some retries */ } break; case OP_PREPARING: case OP_PROPOSING: tk->retry_number ++; if (tk->retry_number > RETRIES) { log_info("ABORT %s for ticket \"%s\" - " "not enough answers after %d retries", tk->state == OP_PREPARING ? "prepare" : "propose", tk->name, tk->retry_number); abort_proposal(tk); } else { /* We ask others for a change; retry to get consensus. */ ticket_broadcast_proposed_state(tk, tk->state); ticket_activate_timeout(tk); } break; case OP_PROMISING: case OP_ACCEPTING: case OP_RECOVERY: case OP_REJECTED: break; default: break; } } void process_tickets(void) { struct ticket_config *tk; int i; time_t now; time(&now); foreach_ticket(i, tk) { if (0) log_debug("ticket %s next cron %" PRIx64 ", now %" PRIx64 ", in %" PRIi64, tk->name, (uint64_t)tk->next_cron, (uint64_t)now, (int64_t)tk->next_cron - now); if (tk->next_cron > now) continue; log_debug("ticket cron: doing %s", tk->name); /* Set next value, handler may override. */ tk->next_cron = INT_MAX; ticket_cron(tk); } } void tickets_log_info(void) { struct ticket_config *tk; int i; foreach_ticket(i, tk) { log_info("Ticket %s: state %s " "mask %" PRIx64 "/%" PRIx64 " " "ballot %d (current %d) " "expires %-24.24s", tk->name, STATE_STRING(tk->state), tk->proposal_acknowledges, booth_conf->site_bits, tk->last_ack_ballot, tk->new_ballot, ctime(&tk->expires)); } } /* UDP message receiver. */ int message_recv(struct boothc_ticket_msg *msg, int msglen) { int cmd, rv; uint32_t from; struct booth_site *dest; struct ticket_config *tk; struct booth_site *new_owner_p; uint32_t ballot, new_owner; if (check_boothc_header(&msg->header, sizeof(*msg)) < 0 || msglen != sizeof(*msg)) { log_error("message receive error"); return -1; } from = ntohl(msg->header.from); - if (!find_site_by_id(from, &dest)) { + if (!find_site_by_id(from, &dest) || !dest) { log_error("unknown sender: %08x", from); return -1; } if (!check_ticket(msg->ticket.id, &tk)) { log_error("got invalid ticket name \"%s\" from %s", msg->ticket.id, dest->addr_string); return -EINVAL; } cmd = ntohl(msg->header.cmd); ballot = ntohl(msg->ticket.ballot); new_owner = ntohl(msg->ticket.owner); if (!find_site_by_id(new_owner, &new_owner_p)) { log_error("Message with unknown owner %x received", new_owner); return -EINVAL; } switch (cmd) { case CMD_CATCHUP: return ticket_answer_catchup(tk, dest, msg, ballot, new_owner_p); case CMR_CATCHUP: return ticket_process_catchup(tk, dest, msg, ballot, new_owner_p); default: /* only used in catchup, and not even really there ?? */ assert(ntohl(msg->header.result) == 0); rv = paxos_answer(tk, dest, msg, ballot, new_owner_p); assert((tk->proposal_acknowledges & ~booth_conf->site_bits) == 0); return rv; } return 0; } diff --git a/src/transport.c b/src/transport.c index 9551eeb..fbc9fad 100644 --- a/src/transport.c +++ b/src/transport.c @@ -1,670 +1,673 @@ /* * Copyright (C) 2011 Jiaju Zhang * Copyright (C) 2013 Philipp Marek * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "booth.h" #include "inline-fn.h" #include "log.h" #include "config.h" #include "ticket.h" #include "transport.h" #define BOOTH_IPADDR_LEN (sizeof(struct in6_addr)) #define NETLINK_BUFSIZE 16384 #define SOCKET_BUFFER_SIZE 160000 #define FRAME_SIZE_MAX 10000 struct booth_site *local = NULL; static int (*deliver_fn) (void *msg, int msglen); static void parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) { while (RTA_OK(rta, len)) { if (rta->rta_type <= max) tb[rta->rta_type] = rta; rta = RTA_NEXT(rta,len); } } static int find_address(unsigned char ipaddr[BOOTH_IPADDR_LEN], int family, int prefixlen, int fuzzy_allowed, struct booth_site **me) { int i; struct booth_site *node; int bytes, bits_left, mask; unsigned char node_bits, ip_bits; uint8_t *n_a; bytes = prefixlen / 8; bits_left = prefixlen % 8; /* One bit left to check means ignore 7 lowest bits. */ mask = ~( (1 << (8 - bits_left)) -1); for (i = 0; i < booth_conf->site_count; i++) { node = booth_conf->site + i; if (family != node->family) continue; n_a = node_to_addr_pointer(node); if (memcmp(ipaddr, n_a, node->addrlen) == 0) { found: *me = node; return 1; } if (!fuzzy_allowed) continue; /* Check prefix, whole bytes */ if (memcmp(ipaddr, n_a, bytes) != 0) continue; if (!bits_left) goto found; node_bits = n_a[bytes]; ip_bits = ipaddr[bytes]; if (((node_bits ^ ip_bits) & mask) == 0) goto found; } return 0; } int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed); int _find_myself(int family, struct booth_site **mep, int fuzzy_allowed) { int fd; struct sockaddr_nl nladdr; struct booth_site *me; unsigned char ipaddr[BOOTH_IPADDR_LEN]; static char rcvbuf[NETLINK_BUFSIZE]; struct { struct nlmsghdr nlh; struct rtgenmsg g; } req; if (local) goto found; me = NULL; if (mep) *mep = NULL; fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE); if (fd < 0) { log_error("failed to create netlink socket"); return 0; } setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; memset(&req, 0, sizeof(req)); req.nlh.nlmsg_len = sizeof(req); req.nlh.nlmsg_type = RTM_GETADDR; req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; req.nlh.nlmsg_pid = 0; req.nlh.nlmsg_seq = 1; req.g.rtgen_family = family; if (sendto(fd, (void *)&req, sizeof(req), 0, (struct sockaddr*)&nladdr, sizeof(nladdr)) < 0) { close(fd); log_error("failed to send data to netlink socket"); return 0; } while (1) { int status; struct nlmsghdr *h; struct iovec iov = { rcvbuf, sizeof(rcvbuf) }; struct msghdr msg = { (void *)&nladdr, sizeof(nladdr), &iov, 1, NULL, 0, 0 }; status = recvmsg(fd, &msg, 0); if (!status) { close(fd); log_error("failed to recvmsg from netlink socket"); return 0; } h = (struct nlmsghdr *)rcvbuf; if (h->nlmsg_type == NLMSG_DONE) break; if (h->nlmsg_type == NLMSG_ERROR) { close(fd); log_error("netlink socket recvmsg error"); return 0; } while (NLMSG_OK(h, status)) { if (h->nlmsg_type == RTM_NEWADDR) { struct ifaddrmsg *ifa = NLMSG_DATA(h); struct rtattr *tb[IFA_MAX+1]; int len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa)); memset(tb, 0, sizeof(tb)); parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len); memset(ipaddr, 0, BOOTH_IPADDR_LEN); memcpy(ipaddr, RTA_DATA(tb[IFA_ADDRESS]), BOOTH_IPADDR_LEN); if (find_address(ipaddr, ifa->ifa_family, ifa->ifa_prefixlen, fuzzy_allowed, &me)) goto out; } h = NLMSG_NEXT(h, status); } } out: close(fd); if (!me) return 0; me->local = 1; local = me; found: if (mep) *mep = local; return 1; } int find_myself(struct booth_site **mep, int fuzzy_allowed) { return _find_myself(AF_INET6, mep, fuzzy_allowed) || _find_myself(AF_INET, mep, fuzzy_allowed); } /** Checks the header fields for validity. * cf. init_header(). * For @len_incl_data < 0 the length is not checked. * Return <0 if error, else bytes read. */ int check_boothc_header(struct boothc_header *h, int len_incl_data) { int l; if (h->magic != htonl(BOOTHC_MAGIC)) { log_error("magic error %x", ntohl(h->magic)); return -EINVAL; } if (h->version != htonl(BOOTHC_VERSION)) { log_error("version error %x", ntohl(h->version)); return -EINVAL; } l = ntohl(h->length); if (l < sizeof(*h)) { log_error("length %d out of range", l); return -EINVAL; } if (len_incl_data < 0) return 0; if (l != len_incl_data) { log_error("length error - got %d, wanted %d", l, len_incl_data); return -EINVAL; } return len_incl_data; } static void process_tcp_listener(int ci) { int fd, i, one = 1; socklen_t addrlen = sizeof(struct sockaddr); struct sockaddr addr; fd = accept(clients[ci].fd, &addr, &addrlen); if (fd < 0) { log_error("process_tcp_listener: accept error %d %d", fd, errno); return; } setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); i = client_add(fd, clients[ci].transport, process_connection, NULL); log_debug("client connection %d fd %d", i, fd); } static int setup_tcp_listener(void) { int s, rv; s = socket(local->family, SOCK_STREAM, 0); if (s == -1) { log_error("failed to create tcp socket %s", strerror(errno)); return s; } rv = bind(s, &local->sa6, local->saddrlen); if (rv == -1) { log_error("failed to bind socket %s", strerror(errno)); return rv; } rv = listen(s, 5); if (rv == -1) { log_error("failed to listen on socket %s", strerror(errno)); return rv; } return s; } static int booth_tcp_init(void * unused __attribute__((unused))) { int rv; if (get_local_id() < 0) return -1; rv = setup_tcp_listener(); if (rv < 0) return rv; client_add(rv, booth_transport + TCP, process_tcp_listener, NULL); return 0; } static int connect_nonb(int sockfd, const struct sockaddr *saptr, socklen_t salen, int sec) { int flags, n, error; socklen_t len; fd_set rset, wset; struct timeval tval; flags = fcntl(sockfd, F_GETFL, 0); fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); error = 0; if ( (n = connect(sockfd, saptr, salen)) < 0) if (errno != EINPROGRESS) return -1; if (n == 0) goto done; /* connect completed immediately */ FD_ZERO(&rset); FD_SET(sockfd, &rset); wset = rset; tval.tv_sec = sec; tval.tv_usec = 0; if ((n = select(sockfd + 1, &rset, &wset, NULL, sec ? &tval : NULL)) == 0) { /* leave outside function to close */ /* timeout */ /* close(sockfd); */ errno = ETIMEDOUT; return -1; } if (FD_ISSET(sockfd, &rset) || FD_ISSET(sockfd, &wset)) { len = sizeof(error); if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) return -1; /* Solaris pending error */ } else { log_error("select error: sockfd not set"); return -1; } done: fcntl(sockfd, F_SETFL, flags); /* restore file status flags */ if (error) { /* leave outside function to close */ /* close(sockfd); */ errno = error; return -1; } return 0; } int booth_tcp_open(struct booth_site *to) { int s, rv; if (to->tcp_fd >= STDERR_FILENO) goto found; s = socket(to->family, SOCK_STREAM, 0); if (s == -1) { log_error("cannot create socket of family %d", to->family); return -1; } rv = connect_nonb(s, (struct sockaddr *)&to->sa6, to->saddrlen, 10); if (rv == -1) { if( errno == ETIMEDOUT) log_error("connection to %s timeout", to->addr_string); else log_error("connection to %s error %s", to->addr_string, strerror(errno)); goto error; } to->tcp_fd = s; found: return 1; error: if (s >= 0) close(s); return -1; } int booth_tcp_send(struct booth_site *to, void *buf, int len) { return do_write(to->tcp_fd, buf, len); } static int booth_tcp_recv(struct booth_site *from, void *buf, int len) { int got; /* Needs timeouts! */ got = do_read(from->tcp_fd, buf, len); if (got < 0) return got; return len; } static int booth_tcp_close(struct booth_site *to) { if (to) { if (to->tcp_fd > STDERR_FILENO) close(to->tcp_fd); to->tcp_fd = -1; } return 0; } static int booth_tcp_exit(void) { return 0; } int setup_udp_server(int try_only) { int rv, fd; unsigned int recvbuf_size; fd = socket(local->family, SOCK_DGRAM, 0); if (fd == -1) { log_error("failed to create UDP socket %s", strerror(errno)); goto ex; } rv = fcntl(fd, F_SETFL, O_NONBLOCK); if (rv == -1) { log_error("failed to set non-blocking operation " "on UDP socket: %s", strerror(errno)); goto ex; } rv = bind(fd, (struct sockaddr *)&local->sa6, local->saddrlen); if (try_only) { rv = (rv == -1) ? errno : 0; close(fd); return rv; } if (rv == -1) { log_error("failed to bind UDP socket to [%s]:%d: %s", local->addr_string, booth_conf->port, strerror(errno)); goto ex; } recvbuf_size = SOCKET_BUFFER_SIZE; rv = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, sizeof(recvbuf_size)); if (rv == -1) { log_error("failed to set recvbuf size"); goto ex; } local->udp_fd = fd; return 0; ex: if (fd >= 0) close(fd); return -1; } /* Receive/process callback for UDP */ static void process_recv(int ci) { struct sockaddr_storage sa; int rv; socklen_t sa_len; - char buffer[2048]; + char buffer[256]; + /* Used for unit tests */ + struct boothc_ticket_msg *msg; sa_len = sizeof(sa); + msg = (void*)buffer; rv = recvfrom(clients[ci].fd, buffer, sizeof(buffer), MSG_NOSIGNAL | MSG_DONTWAIT, (struct sockaddr *)&sa, &sa_len); if (rv == -1) return; - deliver_fn(buffer, rv); + deliver_fn(msg, rv); } static int booth_udp_init(void *f) { int rv; rv = setup_udp_server(0); if (rv < 0) return rv; deliver_fn = f; client_add(local->udp_fd, booth_transport + UDP, process_recv, NULL); return 0; } int booth_udp_send(struct booth_site *to, void *buf, int len) { int rv; rv = sendto(local->udp_fd, buf, len, MSG_NOSIGNAL, (struct sockaddr *)&to->sa6, to->saddrlen); return rv; } static int booth_udp_broadcast(void *buf, int len) { int i; struct booth_site *site; if (!booth_conf || !booth_conf->site_count) return -1; foreach_node(i, site) { if (site != local) booth_udp_send(site, buf, len); } return 0; } static int booth_udp_exit(void) { return 0; } /* SCTP transport layer has not been developed yet */ static int booth_sctp_init(void *f __attribute__((unused))) { return 0; } static int booth_sctp_send(struct booth_site * to __attribute__((unused)), void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int booth_sctp_broadcast(void *buf __attribute__((unused)), int len __attribute__((unused))) { return 0; } static int return_0_booth_site(struct booth_site *v __attribute((unused))) { return 0; } static int return_0(void) { return 0; } const struct booth_transport booth_transport[TRANSPORT_ENTRIES] = { [TCP] = { .name = "TCP", .init = booth_tcp_init, .open = booth_tcp_open, .send = booth_tcp_send, .recv = booth_tcp_recv, .close = booth_tcp_close, .exit = booth_tcp_exit }, [UDP] = { .name = "UDP", .init = booth_udp_init, .open = return_0_booth_site, .send = booth_udp_send, .close = return_0_booth_site, .broadcast = booth_udp_broadcast, .exit = booth_udp_exit }, [SCTP] = { .name = "SCTP", .init = booth_sctp_init, .open = return_0_booth_site, .send = booth_sctp_send, .broadcast = booth_sctp_broadcast, .exit = return_0, } }; const struct booth_transport *local_transport = booth_transport+TCP; int send_header_only(int fd, struct boothc_header *hdr) { int rv; rv = do_write(fd, hdr, sizeof(*hdr)); return rv; } int send_ticket_msg(int fd, struct boothc_ticket_msg *msg) { int rv; rv = do_write(fd, msg, sizeof(*msg)); return rv; } int send_header_plus(int fd, struct boothc_header *hdr, void *data, int len) { int rv; int l; if (data == hdr->data) { l = sizeof(*hdr) + len; assert(l == ntohl(hdr->length)); /* One struct */ rv = do_write(fd, hdr, l); } else { /* Header and data in two locations */ rv = send_header_only(fd, hdr); if (rv >= 0 && len) rv = do_write(fd, data, len); } return rv; } diff --git a/unit-tests/_defaults.txt b/unit-tests/_defaults.txt new file mode 100644 index 0000000..b35bb36 --- /dev/null +++ b/unit-tests/_defaults.txt @@ -0,0 +1,52 @@ +# vim: ft=sh et : + + +# ticket defaults, mostly set via config file. +ticket: + + name "ticket" + + ## these would matter if testing via GDB had high latencies + #expiry 60 + #timeout 10 + #acquire_after 30 + + + + # defaults for all tests + state ST_INIT + next_cron 0 +# time(0)+1 + # local is site[0] per convention + + owner booth_conf->site+1 + expires time(0)+1 + last_ack_ballot 242 + + proposer 0 + proposed_owner 0 + new_ballot 0 + proposal_acknowledges 0 + retry_number 0 + + + +# defaults for input message. +# sender is a peer, and it wants something. +message: + + ticket.id "ticket" + # invalid by default + header.cmd -1 + # invalid by default + header.result 1 + # invalid by default + header.from -1 + header.version BOOTHC_VERSION + header.magic BOOTHC_MAGIC + header.length sizeof(struct boothc_ticket_msg) + ticket.owner -1 + ticket.ballot 0 + ticket.prev_ballot 0 + ticket.expiry 0 + diff --git a/unit-tests/bin/crm_ticket b/unit-tests/bin/crm_ticket new file mode 100755 index 0000000..3395065 --- /dev/null +++ b/unit-tests/bin/crm_ticket @@ -0,0 +1,3 @@ +#!/bin/bash + +true diff --git a/unit-tests/booth.conf b/unit-tests/booth.conf new file mode 100644 index 0000000..971138d --- /dev/null +++ b/unit-tests/booth.conf @@ -0,0 +1,15 @@ +# "local" +site=127.0.0.1 + +# these should not exist, although it shouldn't matter much +# because no packets should be sent anyway +arbitrator="127.0.0.243" +site="127.0.0.244" + +# The ticket name, which corresponds to a set of resources which can be +# fail-overed among different sites. +ticket="ticket" + expire = 60 + timeout = 1 + acquire-after = 30 + weights = 1,2,3 diff --git a/unit-tests/init-catchup.txt b/unit-tests/init-catchup.txt new file mode 100644 index 0000000..c681e89 --- /dev/null +++ b/unit-tests/init-catchup.txt @@ -0,0 +1,48 @@ +# vim: ft=sh et : + + +ticket: + state ST_INIT + last_ack_ballot 1 + new_ballot 2012 + +# No message0 +# expecting catchup query + +# outgoing packet: expect this data +outgoing0: + header.cmd CMD_CATCHUP + header.result RLT_SUCCESS + + +# ignore "bad" catchup data +message1: + header.from booth_conf->site[2].site_id + header.cmd CMR_CATCHUP + header.result 243521741 + +outgoing1: + header.cmd CMD_CATCHUP + header.result RLT_SUCCESS + + +# accept good CMR_CATCHUP +message2: + header.cmd CMR_CATCHUP + header.result RLT_SUCCESS + header.from booth_conf->site[2].site_id + ticket.ballot 2062 + ticket.prev_ballot 2052 + ticket.owner -1 + +# nothing goes out + + +# after a delay, check final state +finally: +# should be overwritten + last_ack_ballot 2052 +# should not be - a OP_PREPARING would fetch the new value + new_ballot 2012 +# too low-level +# proposal_acknowledges 5