diff --git a/script/unit-test.py b/script/unit-test.py index 6871930..399528e 100755 --- a/script/unit-test.py +++ b/script/unit-test.py @@ -1,626 +1,637 @@ #!/usr/bin/python # vim: fileencoding=utf-8 # see http://stackoverflow.com/questions/728891/correct-way-to-define-python-source-code-encoding +# NOTE: setting the encoding is needed as non-ASCII characters are contained +# FIXME: apparently, pexpect.EOF is not being excepted properly import os, sys, time, signal, tempfile, socket, posix, time import re, shutil, pexpect, logging, pprint import random, copy, glob, traceback # Don't make that much sense - function/line is write(). # Would have to use traceback.extract_stack() manually. # %(funcName)10.10s:%(lineno)3d %(levelname)8s # The second ":" is to get correct syntax highlightning, # eg. messages with ERROR etc. are red in vim. default_log_format = '%(asctime)s: : %(message)s' default_log_datefmt = '%b %d %H:%M:%S' +# Compatibility with dictionary methods not present in Python 3; +# https://www.python.org/dev/peps/pep-0469/#migrating-to-the-common-subset-of-python-2-and-3 +try: + dict.iteritems +except AttributeError: # Python 3 + iter_items = lambda d: iter(d.items()) +else: # Python 2 + iter_items = lambda d: d.iteritems() + + # {{{ pexpect-logging glue # needed for use as pexpect.logfile, to relay into existing logfiles class expect_logging(): prefix = "" test = None def __init__(self, pre, inst): self.prefix = pre self.test = inst def flush(self, *arg): pass + def write(self, stg): if self.test.dont_log_expect == 0: # TODO: split by input/output, give program + if sys.version_info[0] >= 3: + stg = str(stg, 'UTF-8') for line in re.split(r"[\r\n]+", stg): if line == self.test.prompt: continue if line == "": continue logging.debug(" " + self.prefix + " " + line) # }}} # {{{ dictionary plus second hash class dict_plus(dict): def __init__(self): self.aux = dict() # def aux(self): # return self.aux # }}} class UT(): # {{{ Members binary = None test_base = None lockfile = None defaults = None this_port = None this_site = "127.0.0.1" this_site_id = None running = False gdb = None booth = None prompt = "CUSTOM-GDB-PROMPT-%d-%d" % (os.getpid(), time.time()) dont_log_expect = 0 current_nr = None udp_sock = None # http://stackoverflow.com/questions/384076/how-can-i-color-python-logging-output BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) # }}} # {{{ setup functions @classmethod def _filename(cls, desc): return "/tmp/booth-unittest.%d.%s" % (os.getpid(), desc) def __init__(self, bin, dir): self.binary = os.path.realpath(bin) self.test_base = os.path.realpath(dir) + "/" self.defaults = self.read_test_input(self.test_base + "_defaults.txt", state="ticket") self.lockfile = UT._filename("lock") self.udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def read_test_input(self, file, state=None, m = dict()): fo = open(file, "r") state = None line_nr = 0 for line in fo.readlines(): line_nr += 1 # comment? if re.match(r"^\s*#", line): continue # empty line if re.match(r"^\s*$", line): continue # message resp. ticket # We allow a comment to have something to write out to screen res = re.match(r"^\s*(\w+)\s*:(?:\s*(#.*?\S))?\s*$", line) if res: state = res.group(1) - if not m.has_key(state): + if state not in m: m[state] = dict_plus() if res.group(2): m[state].aux["comment"] = res.group(2) m[state].aux["line"] = line_nr continue assert(state) res = re.match(r"^\s*(\S+)\s*(.*)\s*$", line) if res: m[state][ res.group(1) ] = res.group(2) return m def setup_log(self, **args): global default_log_format global default_log_datefmt this_test_log = logging.FileHandler( mode = "w", **args ) this_test_log.setFormatter( logging.Formatter(fmt = default_log_format, datefmt = default_log_datefmt) ) this_test_log.emit( logging.makeLogRecord( { "msg": "## vim: set ft=messages : ##", "lineno": 0, "levelname": "None", "level": None,} ) ) # in the specific files we want ALL information this_test_log.setLevel(logging.DEBUG) logging.getLogger('').addHandler(this_test_log) return this_test_log def running_on_console(self): return sys.stdout.isatty() def colored_string(self, stg, color): if self.running_on_console(): return "\033[%dm%s\033[0m" % (30+color, stg) return stg # We want shorthand in descriptions, ie. "state" # instead of "booth_conf->ticket[0].state". def translate_shorthand(self, name, context): if context == 'ticket': return "booth_conf->ticket[0]." + name if context == 'message': return "msg->" + name if context == 'inject': return "ntohl(((struct boothc_ticket_msg *)buf)->" + name + ")" assert(False) def stop_processes(self): if os.access(self.lockfile, os.F_OK): os.unlink(self.lockfile) # In case the boothd process is already dead, isalive() would still return True # (because GDB still has it), but terminate() does fail. # So we just quit GDB, and that might take the boothd with it - # if not, we terminate it ourselves. if self.gdb: self.gdb.close( force=True ); self.drain_booth_log() if self.booth: self.booth.close( force=self.booth.isalive() ) def start_a_process(self, bin, env_add=[], **args): name = re.sub(r".*/", "", bin) # How to get stderr, too? expct = pexpect.spawn(bin, - env = dict( os.environ.items() + - [('PATH', - self.test_base + "/bin/:" + - os.getenv('PATH')), - ('UNIT_TEST_PATH', self.test_base), - ('LC_ALL', 'C'), - ('LANG', 'C')] + - env_add ), - timeout = 30, - maxread = 32768, - **args) + env=dict(os.environ, **dict({ + 'PATH': ':'.join((self.test_base + "/bin/", + os.getenv('PATH'))), + 'UNIT_TEST_PATH': self.test_base, + 'LC_ALL': 'C', + 'LANG': 'C'}, **dict(env_add))), + timeout=30, + maxread=32768, + **args) expct.setecho(False) expct.logfile_read = expect_logging("<- %s" % name, self) expct.logfile_send = expect_logging(" -> %s" % name, self) return expct def start_processes(self, test): self.booth = self.start_a_process(self.binary, args = [ "daemon", "-D", "-c", self.test_base + "/booth.conf", "-s", "127.0.0.1", "-l", self.lockfile, ], env_add=[ ('UNIT_TEST', test), ('UNIT_TEST_FILE', os.path.realpath(test)), # provide some space, so that strcpy(getenv()) works ('UNIT_TEST_AUX', "".zfill(1024)), ]); logging.info("started booth with PID %d, lockfile %s" % (self.booth.pid, self.lockfile)) self.booth.expect("BOOTH site \S+ \(build \S+\) daemon is starting", timeout=2) #print self.booth.before; exit self.gdb = self.start_a_process("gdb", args=["-quiet", "-p", str(self.booth.pid), # Don't use .gdbinit "-nx", "-nh", # Run until the defined point. # This is necessary so that ticket state setting doesn't # happen _before_ the call to pcmk_load_ticket() # (which would overwrite our data) "-ex", "break ticket_cron", "-ex", "continue", ]) logging.info("started GDB with PID %d" % self.gdb.pid) self.gdb.expect("(gdb)") self.gdb.sendline("set pagination off\n") self.gdb.sendline("set interactive-mode off\n") self.gdb.sendline("set verbose off\n") ## sadly to late for the initial "symbol not found" messages self.gdb.sendline("set prompt " + self.prompt + "\\n\n"); self.sync(2000) # Only stop for this recipient, so that broadcasts are not seen multiple times self.send_cmd("break booth_udp_send if to == &(booth_conf->site[1])") self.send_cmd("break recvfrom") # ticket_cron is still a breakpoint # Now we're set up. self.this_site_id = self.query_value("local->site_id") self.this_port = int(self.query_value("booth_conf->port")) # do a self-test assert(self.check_value("local->site_id", self.this_site_id)) self.running = False # }}} # {{{ GDB communication def sync(self, timeout=-1): self.gdb.expect(self.prompt, timeout) answer = self.gdb.before self.dont_log_expect += 1 # be careful not to use RE characters like +*.[] etc. r = str(random.randint(2**19, 2**20)) self.gdb.sendline("print " + r) self.gdb.expect(r, timeout) self.gdb.expect(self.prompt, timeout) self.dont_log_expect -= 1 return answer # send a command to GDB, returning the GDB answer as string. def drain_booth_log(self): try: self.booth.read_nonblocking(64*1024, 0) except pexpect.EOF: pass except pexpect.TIMEOUT: pass finally: pass def send_cmd(self, stg, timeout=-1): # give booth a chance to get its messages out self.drain_booth_log() self.gdb.sendline(stg) return self.sync(timeout=timeout) def _query_value(self, which): val = self.send_cmd("print " + which) cleaned = re.search(r"^\$\d+ = (.*\S)\s*$", val, re.MULTILINE) if not cleaned: self.user_debug("query failed") return cleaned.group(1) def query_value(self, which): res = self._query_value(which) logging.debug("query_value: «%s» evaluates to «%s»" % (which, res)) return res def check_value(self, which, value): val = self._query_value("(" + which + ") == (" + value + ")") logging.debug("check_value: «%s» is «%s»: %s" % (which, value, val)) if val == "1": return True # for easier (test) debugging we'll show the _real_ value, too. want = self._query_value(value) # Order is important, so that next query works!! has = self._query_value(which) # for informational purposes self._query_value('state_to_string($$)') logging.error("«%s»: got «%s», expected «%s». ERROR." % (which, has, want)) return False # Send data to GDB, to inject them into the binary. # Handles different data types def set_val(self, name, value, numeric_conv=None): logging.debug("setting value «%s» to «%s» (num_conv %s)" %(name, value, numeric_conv)) res = None # string value? if re.match(r'^"', value): res = self.send_cmd("print strcpy(" + name + ", " + value + ")") elif re.match(r"^'", value): # single-quoted; GDB only understands double quotes. v1 = re.sub(r"^'", '', value) v2 = re.sub(r"'$", '', v1) # TODO: replace \\\\" etc. v3 = re.sub(r'"', '\\"', v2) res = self.send_cmd("print strcpy(" + name + ', "' + v3 + '")') # numeric elif numeric_conv: res = self.send_cmd("set variable " + name + " = " + numeric_conv + "(" + value + ")") else: res = self.send_cmd("set variable " + name + " = " + value) for r in [r"There is no member named", r"Structure has no component named", r"No symbol .* in current context", ]: assert(not re.search(r, res, re.MULTILINE)) logging.debug("set_val %s done" % name) # }}} GDB communication # there has to be some event waiting, so that boothd stops again. def continue_debuggee(self, timeout=30): res = None if not self.running: res = self.send_cmd("continue", timeout) self.drain_booth_log() return res # {{{ High-level functions. # Generally, GDB is attached to BOOTHD, and has it stopped. def set_state(self, kv): if not kv: return self.current_nr = kv.aux.get("line") #os.system("strace -f -tt -s 2000 -e write -p" + str(self.gdb.pid) + " &") - for n, v in kv.iteritems(): + for n, v in iter_items(kv): self.set_val( self.translate_shorthand(n, "ticket"), v) logging.info("set state") def user_debug(self, txt): logging.error("Problem detected: %s", txt) logging.info(self.gdb.buffer) if not sys.stdin.isatty(): logging.error("Not a terminal, stopping.") else: - print "\n\nEntering interactive mode.\n\n" + print("\n\nEntering interactive mode.\n\n") self.gdb.sendline("set prompt GDB> \n") self.gdb.setecho(True) # can't use send_cmd, doesn't reply with expected prompt anymore. self.gdb.interact() #while True: # sys.stdout.write("GDB> ") # sys.stdout.flush() # x = sys.stdin.readline() # if not x: # break # self.send_cmd(x) self.stop_processes() sys.exit(1) def wait_for_function(self, fn, timeout=20): until = time.time() + timeout while True: stopped_at = self.continue_debuggee(timeout=3) if not stopped_at: self.user_debug("Not stopped at any breakpoint?") if re.search(r"^Program received signal SIGABRT,", stopped_at, re.MULTILINE): self.user_debug("assert() failed") if re.search(r"^Program received signal SIGSEGV,", stopped_at, re.MULTILINE): self.user_debug("Segfault") if re.search(r"^Breakpoint \d+, (0x\w+ in )?%s " % fn, stopped_at, re.MULTILINE): break if time.time() > until: self.user_debug("Didn't stop in function %s" % fn) logging.info("Now in %s" % fn) # We break, change the data, and return the correct size. def send_message(self, msg): self.udp_sock.sendto('a', (socket.gethostbyname(self.this_site), self.this_port)) self.wait_for_function("recvfrom") # drain input, but stop afterwards for changing data self.send_cmd("finish") # step over length assignment self.send_cmd("next") # push message. - for (n, v) in msg.iteritems(): + for (n, v) in iter_items(msg): self.set_val( self.translate_shorthand(n, "message"), v, "htonl") # set "received" length self.set_val("rv", "msg->header.length", "ntohl") # the next thing should run continue via wait_for_function def wait_outgoing(self, msg): self.wait_for_function("booth_udp_send") ok = True - for (n, v) in msg.iteritems(): + for (n, v) in iter_items(msg): if re.search(r"\.", n): ok = self.check_value( self.translate_shorthand(n, "inject"), v) and ok else: ok = self.check_value( self.translate_shorthand(n, "ticket"), v) and ok if not ok: sys.exit(1) logging.info("out gone") #stopped_at = self.sync() def merge_dicts(self, base, overlay): - return dict(base.items() + overlay.items()) + return dict(base, **overlay) def loop(self, fn, data): - matches = map(lambda k: re.match(r"^(outgoing|message)(\d+)$", k), data.iterkeys()) - valid_matches = filter(None, matches) - nums = map(lambda m: int(m.group(2)), valid_matches) - loop_max = max(nums) + matches = (re.match(r"^(outgoing|message)(\d+)$", k) for k in data) + loop_max = max(int(m.group(2)) for m in matches if m) for counter in range(0, loop_max+1): # incl. last message kmsg = 'message%d' % counter msg = data.get(kmsg) ktkt = 'ticket%d' % counter tkt = data.get(ktkt) kout = 'outgoing%d' % counter out = data.get(kout) kgdb = 'gdb%d' % counter gdb = data.get(kgdb) if not any([msg, out, tkt]): continue logging.info("Part %d" % counter) if tkt: self.current_nr = tkt.aux.get("line") comment = tkt.aux.get("comment", "") logging.info("ticket change %s (%s:%d) %s" % (ktkt, fn, self.current_nr, comment)) self.set_state(tkt) if gdb: - for (k, v) in gdb.iteritems(): + for (k, v) in iter_items(gdb): self.send_cmd(k + " " + v.replace("§", "\n")) if msg: self.current_nr = msg.aux.get("line") comment = msg.aux.get("comment", "") logging.info("sending %s (%s:%d) %s" % (kmsg, fn, self.current_nr, comment)) self.send_message(self.merge_dicts(data["message"], msg)) - if data.has_key(kgdb) and len(gdb) == 0: + if kgdb in data and len(gdb) == 0: self.user_debug("manual override") if out: self.current_nr = out.aux.get("line") comment = out.aux.get("comment", "") logging.info("waiting for %s (%s:%d) %s" % (kout, fn, self.current_nr, comment)) self.wait_outgoing(out) logging.info("loop ends") def let_booth_go_a_bit(self): self.drain_booth_log() logging.debug("running: %d" % self.running) if not self.running: self.gdb.sendline("continue") time.sleep(1) self.drain_booth_log() # stop it - via GDB! self.gdb.sendintr() # If we sent the signal to booth, the next # "print state_to_string()" or "continue" # might catch the signal - and fail to do # what we want/need. # # This additional signal seems to be unnecessary. #posix.kill(self.gdb.pid, signal.SIGINT) # In case it's really needed we should drain booth's signals queue, # eg. by sending "print getpid()" twice, before the sync() call. self.running = False self.sync(2000) def do_finally(self, data): if not data: return self.current_nr = data.aux.get("line") # Allow debuggee to reach a stable state self.let_booth_go_a_bit() ok = True - for (n, v) in data.iteritems(): + for (n, v) in iter_items(data): ok = self.check_value( self.translate_shorthand(n, "ticket"), v) and ok if not ok: sys.exit(1) def run(self, start_from="000", end_with="999"): os.chdir(self.test_base) # TODO: sorted, random order - tests = filter( (lambda f: re.match(r"^\d\d\d_.*\.txt$", f)), glob.glob("*")) - tests.sort() + tests = sorted(f for f in glob.glob("*") + if re.match(r"^\d\d\d_.*\.txt$", f)) failed = 0 for f in tests: if f[0:3] < start_from: continue if f[0:3] > end_with: continue log = None logfn = UT._filename(f) if self.running_on_console(): sys.stdout.write("\n") self.current_nr = "setup" try: log = self.setup_log(filename = logfn) log.setLevel(logging.DEBUG) logging.error(self.colored_string("Starting test '%s'" % f, self.BLUE) + ", logfile " + logfn) self.start_processes(f) test = self.read_test_input(f, m=copy.deepcopy(self.defaults)) logging.debug("data: %s" % pprint.pformat(test, width = 200)) self.set_state(test.get("ticket")) self.loop(f, test) self.do_finally(test.get("finally")) self.current_nr = "teardown" logging.warn(self.colored_string("Finished test '%s' - OK" % f, self.GREEN)) except: failed += 1 logging.error(self.colored_string("Broke in %s:%s %s" % (f, self.current_nr, sys.exc_info()), self.RED)) - for frame in traceback.format_tb(sys.exc_traceback): + for frame in traceback.format_tb(sys.exc_info()[2]): logging.info(" - %s " % frame.rstrip()) finally: self.stop_processes() if log: log.close() logging.getLogger("").removeHandler(log) if self.running_on_console(): sys.stdout.write("\n") return failed # }}} #def traceit(frame, event, arg): # if event == "line": # lineno = frame.f_lineno # print frame.f_code.co_filename, ":", "line", lineno # return traceit # {{{ main if __name__ == '__main__': if os.geteuid() == 0: sys.stderr.write("Must be run non-root; aborting.\n") sys.exit(1) ut = UT(sys.argv[1], sys.argv[2] + "/") # "master" log object needs max level logging.basicConfig(level = logging.DEBUG, filename = "/dev/null", filemode = "a", format = default_log_format, datefmt = default_log_datefmt) # make sure no old processes are active anymore os.system("killall boothd > /dev/null 2> /dev/null") overview_log = ut.setup_log( filename = UT._filename('seq') ) overview_log.setLevel(logging.WARN) # http://stackoverflow.com/questions/9321741/printing-to-screen-and-writing-to-a-file-at-the-same-time console = logging.StreamHandler() console.setFormatter(logging.Formatter(' # %(message)s')) console.setLevel(logging.WARN) logging.getLogger('').addHandler(console) logging.info("Starting boothd unit tests.") #sys.settrace(traceit) starting = "0" if len(sys.argv) > 3: starting = sys.argv[3] ending = "999" if len(sys.argv) > 4: ending = sys.argv[4] ret = ut.run(starting, ending) sys.exit(ret) # }}} diff --git a/test/assertions.py b/test/assertions.py index 0b7f995..34333ca 100644 --- a/test/assertions.py +++ b/test/assertions.py @@ -1,43 +1,43 @@ import re class BoothAssertions: def configFileMissingMyIP(self, config_file=None, lock_file=None): (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_file=config_file, lock_file=lock_file, expected_exitcode=1, expected_daemon=False) expected_error = "(ERROR|error): Cannot find myself in the configuration" self.assertRegexpMatches(stderr, expected_error) def assertLockFileError(self, config_file=None, config_text=None, lock_file=True, args=[]): (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=config_text, config_file=config_file, lock_file=lock_file, args=args, expected_exitcode=1) expected_error = 'lockfile open error %s: Permission denied' % runner.lock_file_used() self.assertRegexpMatches(self.read_log(), expected_error) ###################################################################### # backported from 2.7 just in case we're running on an older Python def assertRegexpMatches(self, text, expected_regexp, msg=None): """Fail the test unless the text matches the regular expression.""" - if isinstance(expected_regexp, basestring): + if isinstance(expected_regexp, str): expected_regexp = re.compile(expected_regexp) if not expected_regexp.search(text, MULTILINE): msg = msg or "Regexp didn't match" msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern, text) raise self.failureException(msg) def assertNotRegexpMatches(self, text, unexpected_regexp, msg=None): """Fail the test if the text matches the regular expression.""" - if isinstance(unexpected_regexp, basestring): + if isinstance(unexpected_regexp, str): unexpected_regexp = re.compile(unexpected_regexp) match = unexpected_regexp.search(text) if match: msg = msg or "Regexp matched" msg = '%s: %r matches %r in %r' % (msg, text[match.start():match.end()], unexpected_regexp.pattern, text) raise self.failureException(msg) ###################################################################### diff --git a/test/boothrunner.py b/test/boothrunner.py index d981183..347912b 100644 --- a/test/boothrunner.py +++ b/test/boothrunner.py @@ -1,109 +1,113 @@ import os +import sys import subprocess import time import unittest class BoothRunner: default_config_file = '/etc/booth/booth.conf' default_lock_file = '/var/run/booth.pid' def __init__(self, boothd_path, mode, args): self.boothd_path = boothd_path self.args = [ mode ] self.final_args = args # will be appended to self.args self.mode = mode self.config_file = None self.lock_file = None def set_config_file_arg(self): self.args += [ '-c', self.config_file ] def set_config_file(self, config_file): self.config_file = config_file self.set_config_file_arg() def set_lock_file(self, lock_file): self.lock_file = lock_file self.args += [ '-l', self.lock_file ] def set_debug(self): self.args += [ '-D' ] def set_foreground(self): self.args += [ '-S' ] def all_args(self): return [ self.boothd_path ] + self.args + self.final_args def show_output(self, stdout, stderr): if stdout: - print "STDOUT:" - print "------" - print stdout, + print("STDOUT:") + print("------") + print(stdout.rstrip('\n')) if stderr: - print "STDERR: (N.B. crm_ticket failures indicate daemon started correctly)" - print "------" - print stderr, - print "-" * 70 + print("STDERR: (N.B. crm_ticket failures indicate daemon started correctly)") + print("------") + print(stderr.rstrip('\n')) + print("-" * 70) def subproc_completed_within(self, p, timeout): start = time.time() wait = 0.1 while True: if p.poll() is not None: return True elapsed = time.time() - start if elapsed + wait > timeout: wait = timeout - elapsed - print "Waiting on %d for %.1fs ..." % (p.pid, wait) + print("Waiting on %d for %.1fs ..." % (p.pid, wait)) time.sleep(wait) elapsed = time.time() - start if elapsed >= timeout: return False wait *= 2 def lock_file_used(self): return self.lock_file or self.default_lock_file def config_file_used(self): return self.config_file or self.default_config_file def config_text_used(self): config_file = self.config_file_used() try: c = open(config_file) except: return None text = "".join(c.readlines()) c.close() text = text.replace('\t', '') text = text.replace('\n', '|\n') return text def show_args(self): - print "\n" - print "-" * 70 - print "Running", ' '.join(self.all_args()) + print("\n") + print("-" * 70) + print("Running", ' '.join(self.all_args())) msg = "with config from %s" % self.config_file_used() config_text = self.config_text_used() if config_text is not None: msg += ": [%s]" % config_text - print msg + print(msg) def run(self): p = subprocess.Popen(self.all_args(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) if not p: - raise RuntimeError, "failed to start subprocess" + raise RuntimeError("failed to start subprocess") - print "Started subprocess pid %d" % p.pid + print("Started subprocess pid %d" % p.pid) completed = self.subproc_completed_within(p, 2) if completed: (stdout, stderr) = p.communicate() + if sys.version_info[0] >= 3: + # only expect ASCII/UTF-8 encodings for the obtained input bytes + stdout, stderr = str(stdout, 'UTF-8'), str(stderr, 'UTF-8') self.show_output(stdout, stderr) return (p.pid, p.returncode, stdout, stderr) return (p.pid, None, None, None) diff --git a/test/boothtestenv.py b/test/boothtestenv.py index fcd0c4d..59e25c3 100644 --- a/test/boothtestenv.py +++ b/test/boothtestenv.py @@ -1,69 +1,69 @@ import os import subprocess import time import tempfile import unittest from assertions import BoothAssertions from boothrunner import BoothRunner class BoothTestEnvironment(unittest.TestCase, BoothAssertions): test_src_path = os.path.abspath(os.path.dirname(__file__)) dist_path = os.path.join(test_src_path, '..' ) src_path = os.path.join(dist_path, 'src' ) boothd_path = os.path.join(src_path, 'boothd') conf_path = os.path.join(dist_path, 'conf' ) example_config_path = os.path.join(conf_path, 'booth.conf.example') def setUp(self): if not self._testMethodName.startswith('test_'): - raise RuntimeError, "unexpected test method name: " + self._testMethodName + raise RuntimeError("unexpected test method name: " + self._testMethodName) self.test_name = self._testMethodName[5:] self.test_path = os.path.join(self.test_run_path, self.test_name) os.makedirs(self.test_path) self.ensure_boothd_not_running() def ensure_boothd_not_running(self): # Need to redirect STDERR in case we're not root, in which # case netstat's -p option causes a warning. However we only # want to kill boothd processes which we own; -p will list the # pid for those and only those, which is exactly what we want # here. subprocess.call("netstat -tpln 2>&1 | perl -lne 'm,LISTEN\s+(\d+)/boothd, and kill 15, $1'", shell=True) def get_tempfile(self, identity): tf = tempfile.NamedTemporaryFile( prefix='%s.%d.' % (identity, time.time()), dir=self.test_path, delete=False ) return tf.name def init_log(self): self.log_file = self.get_tempfile('log') os.putenv('HA_debugfile', self.log_file) # See cluster-glue/lib/clplumbing/cl_log.c def read_log(self): if not os.path.exists(self.log_file): return '' l = open(self.log_file) msgs = ''.join(l.readlines()) l.close() return msgs def check_return_code(self, pid, return_code, expected_exitcode): if return_code is None: - print "pid %d still running" % pid + print("pid %d still running" % pid) if expected_exitcode is not None: self.fail("expected exit code %d, not long-running process" % expected_exitcode) else: - print "pid %d exited with code %d" % (pid, return_code) + print("pid %d exited with code %d" % (pid, return_code)) if expected_exitcode is None: msg = "should not exit" else: msg = "should exit with code %s" % expected_exitcode msg += "\nLog follows (see %s)" % self.log_file msg += "\nN.B. expect mlockall/setscheduler errors when running tests non-root" msg += "\n-----------\n%s" % self.read_log() self.assertEqual(return_code, expected_exitcode, msg) diff --git a/test/clienttests.py b/test/clienttests.py index c4b9d8a..512620e 100644 --- a/test/clienttests.py +++ b/test/clienttests.py @@ -1,20 +1,20 @@ import string from clientenv import ClientTestEnvironment class ClientConfigTests(ClientTestEnvironment): mode = 'client' def test_site_buffer_overflow(self): # https://bugzilla.novell.com/show_bug.cgi?id=750256 - longfile = (string.lowercase * 3)[:63] + longfile = (string.ascii_lowercase * 3)[:63] expected_error = "'%s' exceeds maximum site name length" % longfile args = [ 'grant', '-s', longfile, '-t', 'ticket' ] self._test_buffer_overflow(expected_error, args=args) def test_ticket_buffer_overflow(self): # https://bugzilla.novell.com/show_bug.cgi?id=750256 - longfile = (string.lowercase * 3)[:63] + longfile = (string.ascii_lowercase * 3)[:63] expected_error = "'%s' exceeds maximum ticket name length" % longfile args = [ 'grant', '-s', 'site', '-t', longfile ] self._test_buffer_overflow(expected_error, args=args) diff --git a/test/runtests.py b/test/runtests.py index 0532c01..833b1a7 100755 --- a/test/runtests.py +++ b/test/runtests.py @@ -1,57 +1,57 @@ #!/usr/bin/python import os import re import shutil import sys import tempfile import time import unittest from clienttests import ClientConfigTests from sitetests import SiteConfigTests from arbtests import ArbitratorConfigTests if __name__ == '__main__': if os.geteuid() == 0: sys.stderr.write("Must be run non-root; aborting.\n") sys.exit(1) tmp_path = '/tmp/booth-tests' if not os.path.exists(tmp_path): os.makedirs(tmp_path) test_run_path = tempfile.mkdtemp(prefix='%d.' % time.time(), dir=tmp_path) suite = unittest.TestSuite() testclasses = [ SiteConfigTests, #ArbitratorConfigTests, ClientConfigTests, ] for testclass in testclasses: testclass.test_run_path = test_run_path suite.addTests(unittest.TestLoader().loadTestsFromTestCase(testclass)) runner_args = { #'verbosity' : 2, } major, minor, micro, releaselevel, serial = sys.version_info if major > 2 or (major == 2 and minor >= 7): # New in 2.7 runner_args['buffer'] = True runner_args['failfast'] = True pass # not root anymore, so safe # needed because old instances might still use the UDP port. os.system("killall boothd") runner = unittest.TextTestRunner(**runner_args) result = runner.run(suite) if result.wasSuccessful(): shutil.rmtree(test_run_path) sys.exit(0) else: - print "Left %s for debugging" % test_run_path + print("Left %s for debugging" % test_run_path) sys.exit(1) diff --git a/test/serverenv.py b/test/serverenv.py index c6d4e30..5d6c6c4 100644 --- a/test/serverenv.py +++ b/test/serverenv.py @@ -1,206 +1,204 @@ import os import re import time import unittest from assertions import BoothAssertions from boothrunner import BoothRunner from boothtestenv import BoothTestEnvironment from utils import get_IP class ServerTestEnvironment(BoothTestEnvironment): ''' boothd site/arbitrator will hang in setup phase while attempting to connect to an unreachable peer during ticket_catchup(). In a test environment we don't have any reachable peers. Fortunately, we can still successfully launch a daemon by only listing our own IP in the config file. ''' typical_config = """\ # This is like the config in the manual transport="UDP" port="9929" # Here's another comment #arbitrator="147.2.207.14" site="147.4.215.19" #site="147.18.2.1" ticket="ticketA" ticket="ticketB" """ site_re = re.compile('^site=".+"', re.MULTILINE) working_config = re.sub(site_re, 'site="%s"' % get_IP(), typical_config, 1) def run_booth(self, expected_exitcode, expected_daemon, config_text=None, config_file=None, lock_file=True, args=[], debug=False, foreground=False): ''' Runs boothd. Defaults to using a temporary lock file and the standard config file path. There are four possible types of outcome: - boothd exits non-zero without launching a daemon (setup phase failed, e.g. due to invalid configuration file) - boothd exits zero after launching a daemon (successful operation) - boothd does not exit (running in foreground mode) - boothd does not exit (setup phase hangs, e.g. while attempting to connect to peer during ticket_catchup()) Arguments: config_text a string containing the contents of a configuration file to use config_file path to a configuration file to use lock_file False: don't pass a lockfile parameter to booth via -l True: pass a temporary lockfile parameter to booth via -l string: pass the given lockfile path to booth via -l args array of extra args to pass to booth expected_exitcode an integer, or False if booth is not expected to terminate within the timeout expected_daemon True iff a daemon is expected to be launched (this means running the server in foreground mode via -S; even though in this case the server's not technically not a daemon, we still want to treat it like one by checking the lockfile before and after we kill it) debug True means pass the -D parameter foreground True means pass the -S parameter Returns a (pid, return_code, stdout, stderr, runner) tuple, where return_code/stdout/stderr are None iff pid is still running. ''' if expected_daemon and expected_exitcode is not None and expected_exitcode != 0: - raise RuntimeError, \ - "Shouldn't ever expect daemon to start and then failure" + raise RuntimeError("Shouldn't ever expect daemon to start and then failure") if not expected_daemon and expected_exitcode == 0: - raise RuntimeError, \ - "Shouldn't ever expect success without starting daemon" + raise RuntimeError("Shouldn't ever expect success without starting daemon") self.init_log() runner = BoothRunner(self.boothd_path, self.mode, args) if config_text: config_file = self.write_config_file(config_text) if config_file: runner.set_config_file(config_file) if lock_file is True: lock_file = os.path.join(self.test_path, 'boothd-lock.pid') if lock_file: runner.set_lock_file(lock_file) if debug: runner.set_debug() if foreground: runner.set_foreground() runner.show_args() (pid, return_code, stdout, stderr) = runner.run() self.check_return_code(pid, return_code, expected_exitcode) if expected_daemon: self.check_daemon_handling(runner, expected_daemon) elif return_code is None: # This isn't strictly necessary because we ensure no # daemon is running from within test setUp(), but it's # probably a good idea to tidy up after ourselves anyway. self.kill_pid(pid) return (pid, return_code, stdout, stderr, runner) def write_config_file(self, config_text): config_file = self.get_tempfile('config') c = open(config_file, 'w') c.write(config_text) c.close() return config_file def kill_pid(self, pid): - print "killing %d ..." % pid + print("killing %d ..." % pid) os.kill(pid, 15) - print "killed" + print("killed") def check_daemon_handling(self, runner, expected_daemon): ''' Check that the lock file contains a pid referring to a running daemon. Then kill the daemon, and ensure that the lock file vanishes (bnc#749763). ''' daemon_pid = self.get_daemon_pid_from_lock_file(runner.lock_file) err = "lock file should contain pid" if not expected_daemon: err += ", even though we didn't expect a daemon" self.assertTrue(daemon_pid is not None, err) daemon_running = self.is_pid_running_daemon(daemon_pid) err = "pid in lock file should refer to a running daemon" self.assertTrue(daemon_running, err) if daemon_running: self.kill_pid(int(daemon_pid)) time.sleep(1) daemon_pid = self.get_daemon_pid_from_lock_file(runner.lock_file) self.assertTrue(daemon_pid is None, 'bnc#749763: lock file should vanish after daemon is killed') def get_daemon_pid_from_lock_file(self, lock_file): ''' Returns the pid contained in lock_file, or None if it doesn't exist. ''' if not os.path.exists(lock_file): - print "%s does not exist" % lock_file + print("%s does not exist" % lock_file) return None l = open(lock_file) lines = l.readlines() l.close() self.assertEqual(len(lines), 1, "Lock file should contain one line") pid = re.search('\\bbooth_pid="?(\\d+)"?', lines[0]).group(1) - print "lockfile contains: <%s>" % pid + print("lockfile contains: <%s>" % pid) return pid def is_pid_running_daemon(self, pid): ''' Returns true iff the given pid refers to a running boothd process. ''' path = "/proc/%s" % pid pid_running = os.path.isdir(path) # print "======" # import subprocess # print subprocess.check_output(['lsof', '-p', pid]) # print subprocess.check_output(['ls', path]) # print subprocess.check_output(['cat', "/proc/%s/cmdline" % pid]) # print "======" if not pid_running: return False c = open("/proc/%s/cmdline" % pid) cmdline = "".join(c.readlines()) - print cmdline + print(cmdline) c.close() if cmdline.find('boothd') == -1: - print 'no boothd in cmdline:', cmdline + print('no boothd in cmdline:', cmdline) return False # self.assertRegexpMatches( # cmdline, # 'boothd', # "lock file should refer to pid of running boothd" # ) return True def _test_buffer_overflow(self, expected_error, **args): (pid, ret, stdout, stderr, runner) = \ self.run_booth(expected_exitcode=1, expected_daemon=False, **args) self.assertRegexpMatches(stderr, expected_error) diff --git a/test/servertests.py b/test/servertests.py index 71e808e..288d19f 100644 --- a/test/servertests.py +++ b/test/servertests.py @@ -1,150 +1,150 @@ import copy from pprint import pprint, pformat import re import string from serverenv import ServerTestEnvironment class ServerTests(ServerTestEnvironment): # We don't know enough about the build/test system to rely on the # existence, permissions, contents of the default config file. So # we can't predict (and hence test) how booth will behave when -c # is not specified. # # def test_no_args(self): # # If do_server() called lockfile() first then this would be # # the appropriate test: # #self.assertLockFileError(lock_file=False) # # # If do_server() called setup() first, and the default # # config file was readable non-root, then this would be the # # appropriate test: # self.configFileMissingMyIP(lock_file=False) # # def test_custom_lock_file(self): # (pid, ret, stdout, stderr, runner) = \ # self.run_booth(expected_exitcode=1, expected_daemon=False) # self.assertRegexpMatches( # stderr, # 'failed to open %s: ' % runner.config_file_used(), # 'should fail to read default config file' # ) def test_example_config(self): self.configFileMissingMyIP(config_file=self.example_config_path) def test_config_file_buffer_overflow(self): # https://bugzilla.novell.com/show_bug.cgi?id=750256 - longfile = (string.lowercase * 5)[:127] + longfile = (string.ascii_lowercase * 5)[:127] expected_error = "'%s' exceeds maximum config name length" % longfile self._test_buffer_overflow(expected_error, config_file=longfile) def test_lock_file_buffer_overflow(self): # https://bugzilla.novell.com/show_bug.cgi?id=750256 - longfile = (string.lowercase * 5)[:127] + longfile = (string.ascii_lowercase * 5)[:127] expected_error = "'%s' exceeds maximum lock file length" % longfile self._test_buffer_overflow(expected_error, lock_file=longfile) def test_working_config(self): (pid, ret, stdout, stderr, runner) = \ self.run_booth(expected_exitcode=0, expected_daemon=True, config_text=self.working_config) def test_missing_quotes(self): # quotes no longer required return True orig_lines = self.working_config.split("\n") - for i in xrange(len(orig_lines)): + for (i, line) in enumerate(orig_lines): new_lines = copy.copy(orig_lines) - new_lines[i] = new_lines[i].replace('"', '') + new_lines[i] = line.replace('"', '') new_config = "\n".join(new_lines) - line_contains_IP = re.search('^\s*(site|arbitrator)=.*[0-9]\.', orig_lines[i]) + line_contains_IP = re.search('^\s*(site|arbitrator)=.*[0-9]\.', line) if line_contains_IP: # IP addresses need to be surrounded by quotes, # so stripping them should cause it to fail expected_exitcode = 1 expected_daemon = False else: expected_exitcode = 0 expected_daemon = True (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=new_config, expected_exitcode=expected_exitcode, expected_daemon=expected_daemon) if line_contains_IP: self.assertRegexpMatches( self.read_log(), "(ERROR|error): invalid config file format: unquoted '.'", 'IP addresses need to be quoted' ) def test_debug_mode(self): (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=self.working_config, debug=True, expected_exitcode=0, expected_daemon=True) def test_foreground_mode(self): (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=self.working_config, foreground=True, expected_exitcode=None, expected_daemon=True) def test_debug_and_foreground_mode(self): (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=self.working_config, debug=True, foreground=True, expected_exitcode=None, expected_daemon=True) def test_missing_transport(self): # UDP is default -- TODO? return True config = re.sub('transport=.+\n', '', self.typical_config) (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=config, expected_exitcode=1, expected_daemon=False) self.assertRegexpMatches( self.read_log(), 'config file was missing transport line' ) def test_invalid_transport_protocol(self): config = re.sub('transport=.+', 'transport=SNEAKERNET', self.typical_config) (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=config, expected_exitcode=1, expected_daemon=False) self.assertRegexpMatches(stderr, 'invalid transport protocol') def test_missing_final_newline(self): config = re.sub('\n$', '', self.working_config) (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=config, expected_exitcode=0, expected_daemon=True) def test_a_few_trailing_whitespaces(self): for ws in (' ', ' '): new_config = self.working_config.replace("\n", ws + "\n", 3) (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=new_config, expected_exitcode=0, expected_daemon=True) def test_trailing_space_everywhere(self): for ws in (' ', ' '): new_config = self.working_config.replace("\n", ws + "\n") (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=new_config, expected_exitcode=0, expected_daemon=True) def test_unquoted_space(self): for ticket in ('unquoted space', 'unquoted space man'): new_config = re.sub('ticket=.+', 'ticket=' + ticket, self.working_config, 1) (pid, ret, stdout, stderr, runner) = \ self.run_booth(config_text=new_config, expected_exitcode=1, expected_daemon=False) self.assertRegexpMatches(stderr, 'ticket name "' + ticket + '" invalid') def test_unreachable_peer(self): # what should this test do? daemon not expected, but no exitcode either? # booth would now just run, and try to reach that peer... # TCP reachability is not required during startup anymore. return True config = re.sub('#(.+147.+)', lambda m: m.group(1), self.working_config) self.run_booth(config_text=config, expected_exitcode=None, expected_daemon=False) diff --git a/test/utils.py b/test/utils.py index 5b70cfc..aca3592 100644 --- a/test/utils.py +++ b/test/utils.py @@ -1,16 +1,20 @@ import subprocess import re +import sys def run_cmd(cmd): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = p.communicate() return (stdout, stderr, p.returncode) def get_IP(): (stdout, stderr, returncode) = run_cmd(['hostname', '-i']) if returncode != 0: - raise RuntimeError, "Failed to run hostname -i:\n" + stderr + raise RuntimeError("Failed to run hostname -i:\n" + stderr) # in case multiple IP addresses are returned, use only the first - # and also strip '%' part possibly present with IPv6 address - ret = re.sub(r'\s.*', '', stdout) + # and also strip '%' part possibly present with IPv6 address; + # in Python 3 context, only expect ASCII/UTF-8 encodings for the + # obtained input bytes + ret = re.sub(r'\s.*', '', + stdout if sys.version_info[0] < 3 else str(stdout, 'UTF-8')) return "::1" if '%' in ret else ret