diff --git a/python/pacemaker/_cts/environment.py b/python/pacemaker/_cts/environment.py
index 6f0a4da028..ea4e76251c 100644
--- a/python/pacemaker/_cts/environment.py
+++ b/python/pacemaker/_cts/environment.py
@@ -1,638 +1,641 @@
 """Test environment classes for Pacemaker's Cluster Test Suite (CTS)."""
 
 __all__ = ["EnvFactory"]
 __copyright__ = "Copyright 2014-2024 the Pacemaker project contributors"
 __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
 
 import argparse
 from contextlib import suppress
 import os
 import random
 import socket
 import sys
 import time
 
 from pacemaker.buildoptions import BuildOptions
 from pacemaker._cts.logging import LogFactory
 from pacemaker._cts.remote import RemoteFactory
 from pacemaker._cts.watcher import LogKind
 
 
 class Environment:
     """
     A class for managing the CTS environment.
 
     This consists largely of processing and storing command line parameters.
     """
 
     # pylint doesn't understand that self._rsh is callable (it stores the
     # singleton instance of RemoteExec, as returned by the getInstance method
     # of RemoteFactory).  It's possible we could fix this with type annotations,
     # but those were introduced with python 3.5 and we only support python 3.4.
     # I think we could also fix this by getting rid of the getInstance methods,
     # but that's a project for another day.  For now, just disable the warning.
     # pylint: disable=not-callable
 
     def __init__(self, args):
         """
         Create a new Environment instance.
 
         This class can be treated kind of like a dictionary due to the presence
         of typical dict functions like __contains__, __getitem__, and __setitem__.
         However, it is not a dictionary so do not rely on standard dictionary
         behavior.
 
         Arguments:
         args -- A list of command line parameters, minus the program name.
                 If None, sys.argv will be used.
         """
         self.data = {}
         self._nodes = []
 
         # Set some defaults before processing command line arguments.  These are
         # either not set by any command line parameter, or they need a default
         # that can't be set in add_argument.
         self["DeadTime"] = 300
         self["StartTime"] = 300
         self["StableTime"] = 30
         self["tests"] = []
         self["IPagent"] = "IPaddr2"
         self["DoFencing"] = True
         self["ClobberCIB"] = False
         self["CIBfilename"] = None
         self["CIBResource"] = False
         self["log_kind"] = None
         self["node-limit"] = 0
         self["scenario"] = "random"
 
         self.random_gen = random.Random()
 
         self._logger = LogFactory()
         self._rsh = RemoteFactory().getInstance()
         self._target = "localhost"
 
         self._seed_random()
         self._parse_args(args)
 
         if not self["ListTests"]:
             self._validate()
             self._discover()
 
     def _seed_random(self, seed=None):
         """
         Initialize the random number generator.
 
         Arguments:
         seed -- Use this to see the random number generator, or use the
                 current time if None.
         """
         if not seed:
             seed = int(time.time())
 
         self["RandSeed"] = seed
         self.random_gen.seed(str(seed))
 
     def dump(self):
         """Print the current environment."""
         keys = []
         for key in list(self.data.keys()):
             keys.append(key)
 
         keys.sort()
         for key in keys:
             s = "Environment[%s]" % key
             self._logger.debug("{key:35}: {val}".format(key=s, val=str(self[key])))
 
     def keys(self):
         """Return a list of all environment keys stored in this instance."""
         return list(self.data.keys())
 
     def __contains__(self, key):
         """Return True if the given key exists in the environment."""
         if key == "nodes":
             return True
 
         return key in self.data
 
     def __getitem__(self, key):
         """Return the given environment key, or None if it does not exist."""
         if str(key) == "0":
             raise ValueError("Bad call to 'foo in X', should reference 'foo in X.keys()' instead")
 
         if key == "nodes":
             return self._nodes
 
         if key == "Name":
             return self._get_stack_short()
 
         return self.data.get(key)
 
     def __setitem__(self, key, value):
         """Set the given environment key to the given value, overriding any previous value."""
         if key == "Stack":
             self._set_stack(value)
 
         elif key == "node-limit":
             self.data[key] = value
             self._filter_nodes()
 
         elif key == "nodes":
             self._nodes = []
             for node in value:
                 # I don't think I need the IP address, etc. but this validates
                 # the node name against /etc/hosts and/or DNS, so it's a
                 # GoodThing(tm).
                 try:
                     n = node.strip()
                     socket.gethostbyname_ex(n)
                     self._nodes.append(n)
                 except socket.herror:
                     self._logger.log("%s not found in DNS... aborting" % node)
                     raise
 
             self._filter_nodes()
 
         else:
             self.data[key] = value
 
     def random_node(self):
         """Choose a random node from the cluster."""
         return self.random_gen.choice(self["nodes"])
 
     def get(self, key, default=None):
         """Return the value for key if key is in the environment, else default."""
         if key == "nodes":
             return self._nodes
 
         return self.data.get(key, default)
 
     def _set_stack(self, name):
         """Normalize the given cluster stack name."""
         if name in ["corosync", "cs", "mcp"]:
             self.data["Stack"] = "corosync 2+"
 
         else:
             raise ValueError("Unknown stack: %s" % name)
 
     def _get_stack_short(self):
         """Return the short name for the currently set cluster stack."""
         if "Stack" not in self.data:
             return "unknown"
 
         if self.data["Stack"] == "corosync 2+":
             return "crm-corosync"
 
         LogFactory().log("Unknown stack: %s" % self["stack"])
         raise ValueError("Unknown stack: %s" % self["stack"])
 
     def _detect_systemd(self):
         """Detect whether systemd is in use on the target node."""
         if "have_systemd" not in self.data:
             (rc, _) = self._rsh(self._target, "systemctl list-units", verbose=0)
             self["have_systemd"] = rc == 0
 
     def _detect_syslog(self):
         """Detect the syslog variant in use on the target node (if any)."""
         if "syslogd" in self.data:
             return
 
         if self["have_systemd"]:
             # Systemd
             (_, lines) = self._rsh(self._target, r"systemctl list-units | grep syslog.*\.service.*active.*running | sed 's:.service.*::'", verbose=1)
         else:
             # SYS-V
             (_, lines) = self._rsh(self._target, "chkconfig --list | grep syslog.*on | awk '{print $1}' | head -n 1", verbose=1)
 
         with suppress(IndexError):
             self["syslogd"] = lines[0].strip()
 
     def disable_service(self, node, service):
         """Disable the given service on the given node."""
         if self["have_systemd"]:
             # Systemd
             (rc, _) = self._rsh(node, "systemctl disable %s" % service)
             return rc
 
         # SYS-V
         (rc, _) = self._rsh(node, "chkconfig %s off" % service)
         return rc
 
     def enable_service(self, node, service):
         """Enable the given service on the given node."""
         if self["have_systemd"]:
             # Systemd
             (rc, _) = self._rsh(node, "systemctl enable %s" % service)
             return rc
 
         # SYS-V
         (rc, _) = self._rsh(node, "chkconfig %s on" % service)
         return rc
 
     def service_is_enabled(self, node, service):
         """Return True if the given service is enabled on the given node."""
         if self["have_systemd"]:
             # Systemd
 
             # With "systemctl is-enabled", we should check if the service is
             # explicitly "enabled" instead of the return code. For example it returns
             # 0 if the service is "static" or "indirect", but they don't really count
             # as "enabled".
             (rc, _) = self._rsh(node, "systemctl is-enabled %s | grep enabled" % service)
             return rc == 0
 
         # SYS-V
         (rc, _) = self._rsh(node, "chkconfig --list | grep -e %s.*on" % service)
         return rc == 0
 
     def _detect_at_boot(self):
         """Detect if the cluster starts at boot."""
         if "at-boot" not in self.data:
             self["at-boot"] = self.service_is_enabled(self._target, "corosync") \
                 or self.service_is_enabled(self._target, "pacemaker")
 
     def _detect_ip_offset(self):
         """Detect the offset for IPaddr resources."""
         if self["CIBResource"] and "IPBase" not in self.data:
             (_, lines) = self._rsh(self._target, "ip addr | grep inet | grep -v -e link -e inet6 -e '/32' -e ' lo' | awk '{print $2}'", verbose=0)
             network = lines[0].strip()
 
             (_, lines) = self._rsh(self._target, "nmap -sn -n %s | grep 'scan report' | awk '{print $NF}' | sed 's:(::' | sed 's:)::' | sort -V | tail -n 1" % network, verbose=0)
 
             try:
                 self["IPBase"] = lines[0].strip()
             except (IndexError, TypeError):
                 self["IPBase"] = None
 
             if not self["IPBase"]:
                 self["IPBase"] = " fe80::1234:56:7890:1000"
                 self._logger.log("Could not determine an offset for IPaddr resources.  Perhaps nmap is not installed on the nodes.")
                 self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
                 return
 
             # pylint thinks self["IPBase"] is a list, not a string, which causes it
             # to error out because a list doesn't have split().
             # pylint: disable=no-member
             if int(self["IPBase"].split('.')[3]) >= 240:
                 self._logger.log("Could not determine an offset for IPaddr resources. Upper bound is too high: %s %s"
                                  % (self["IPBase"], self["IPBase"].split('.')[3]))
                 self["IPBase"] = " fe80::1234:56:7890:1000"
                 self._logger.log("Defaulting to '%s', use --test-ip-base to override" % self["IPBase"])
 
     def _filter_nodes(self):
         """
         Filter the list of cluster nodes.
 
         If --limit-nodes is given, keep that many nodes from the front of the
         list of cluster nodes and drop the rest.
         """
         if self["node-limit"] > 0:
             if len(self["nodes"]) > self["node-limit"]:
                 # pylint thinks self["node-limit"] is a list even though we initialize
                 # it as an int in __init__ and treat it as an int everywhere.
                 # pylint: disable=bad-string-format-type
                 self._logger.log("Limiting the number of nodes configured=%d (max=%d)"
                                  % (len(self["nodes"]), self["node-limit"]))
 
                 while len(self["nodes"]) > self["node-limit"]:
                     self["nodes"].pop(len(self["nodes"]) - 1)
 
     def _validate(self):
         """Check that we were given all required command line parameters."""
         if not self["nodes"]:
             raise ValueError("No nodes specified!")
 
     def _discover(self):
         """Probe cluster nodes to figure out how to log and manage services."""
         self._target = random.Random().choice(self["nodes"])
 
         exerciser = socket.gethostname()
 
         # Use the IP where possible to avoid name lookup failures
         for ip in socket.gethostbyname_ex(exerciser)[2]:
             if ip != "127.0.0.1":
                 exerciser = ip
                 break
 
         self["cts-exerciser"] = exerciser
 
         self._detect_systemd()
         self._detect_syslog()
         self._detect_at_boot()
         self._detect_ip_offset()
 
     def _parse_args(self, argv):
         """
         Parse and validate command line parameters.
 
         Set the appropriate values in the environment dictionary.  If argv is
         None, use sys.argv instead.
         """
         if not argv:
             argv = sys.argv[1:]
 
         parser = argparse.ArgumentParser(epilog="%s -g virt1 -r --stonith ssh --schema pacemaker-2.0 500" % sys.argv[0])
 
         grp1 = parser.add_argument_group("Common options")
         grp1.add_argument("-g", "--dsh-group", "--group",
                           metavar="GROUP", dest="group",
                           help="Use the nodes listed in the named DSH group (~/.dsh/groups/$name)")
         grp1.add_argument("-l", "--limit-nodes",
                           type=int, default=0,
                           metavar="MAX",
                           help="Only use the first MAX cluster nodes supplied with --nodes")
         grp1.add_argument("--benchmark",
                           action="store_true",
                           help="Add timing information")
         grp1.add_argument("--list", "--list-tests",
                           action="store_true", dest="list_tests",
                           help="List the valid tests")
         grp1.add_argument("--nodes",
                           metavar="NODES",
                           help="List of cluster nodes separated by whitespace")
         grp1.add_argument("--stack",
                           default="corosync",
                           metavar="STACK",
                           help="Which cluster stack is installed")
 
         grp2 = parser.add_argument_group("Options that CTS will usually auto-detect correctly")
         grp2.add_argument("-L", "--logfile",
                           metavar="PATH",
-                          help="Where to look for logs from cluster nodes")
+                          help="Where to look for logs from cluster nodes (or 'journal' for systemd journal)")
         grp2.add_argument("--at-boot", "--cluster-starts-at-boot",
                           choices=["1", "0", "yes", "no"],
                           help="Does the cluster software start at boot time?")
         grp2.add_argument("--facility", "--syslog-facility",
                           default="daemon",
                           metavar="NAME",
                           help="Which syslog facility to log to")
         grp2.add_argument("--ip", "--test-ip-base",
                           metavar="IP",
                           help="Offset for generated IP address resources")
 
         grp3 = parser.add_argument_group("Options for release testing")
         grp3.add_argument("-r", "--populate-resources",
                           action="store_true",
                           help="Generate a sample configuration")
         grp3.add_argument("--choose",
                           metavar="NAME",
                           help="Run only the named test")
         grp3.add_argument("--fencing", "--stonith",
                           choices=["1", "0", "yes", "no", "lha", "openstack", "rhcs", "rhevm", "scsi", "ssh", "virt", "xvm"],
                           default="1",
                           help="What fencing agent to use")
         grp3.add_argument("--once",
                           action="store_true",
                           help="Run all valid tests once")
 
         grp4 = parser.add_argument_group("Additional (less common) options")
         grp4.add_argument("-c", "--clobber-cib",
                           action="store_true",
                           help="Erase any existing configuration")
         grp4.add_argument("-y", "--yes",
                           action="store_true", dest="always_continue",
                           help="Continue to run whenever prompted")
         grp4.add_argument("--boot",
                           action="store_true",
                           help="")
         grp4.add_argument("--cib-filename",
                           metavar="PATH",
                           help="Install the given CIB file to the cluster")
         grp4.add_argument("--experimental-tests",
                           action="store_true",
                           help="Include experimental tests")
         grp4.add_argument("--loop-minutes",
                           type=int, default=60,
                           help="")
         grp4.add_argument("--no-loop-tests",
                           action="store_true",
                           help="Don't run looping/time-based tests")
         grp4.add_argument("--no-unsafe-tests",
                           action="store_true",
                           help="Don't run tests that are unsafe for use with ocfs2/drbd")
         grp4.add_argument("--notification-agent",
                           metavar="PATH",
                           default="/var/lib/pacemaker/notify.sh",
                           help="Script to configure for Pacemaker alerts")
         grp4.add_argument("--notification-recipient",
                           metavar="R",
                           default="/var/lib/pacemaker/notify.log",
                           help="Recipient to pass to alert script")
         grp4.add_argument("--oprofile",
                           metavar="NODES",
                           help="List of cluster nodes to run oprofile on")
         grp4.add_argument("--outputfile",
                           metavar="PATH",
                           help="Location to write logs to")
         grp4.add_argument("--qarsh",
                           action="store_true",
                           help="Use QARSH to access nodes instead of SSH")
         grp4.add_argument("--schema",
                           metavar="SCHEMA",
                           default="pacemaker-%s" % BuildOptions.CIB_SCHEMA_VERSION,
                           help="Create a CIB conforming to the given schema")
         grp4.add_argument("--seed",
                           metavar="SEED",
                           help="Use the given string as the random number seed")
         grp4.add_argument("--set",
                           action="append",
                           metavar="ARG",
                           default=[],
                           help="Set key=value pairs (can be specified multiple times)")
         grp4.add_argument("--stonith-args",
                           metavar="ARGS",
                           default="hostlist=all,livedangerously=yes",
                           help="")
         grp4.add_argument("--stonith-type",
                           metavar="TYPE",
                           default="external/ssh",
                           help="")
         grp4.add_argument("--trunc",
                           action="store_true", dest="truncate",
                           help="Truncate log file before starting")
         grp4.add_argument("--valgrind-procs",
                           metavar="PROCS",
                           default="pacemaker-attrd pacemaker-based pacemaker-controld pacemaker-execd pacemaker-fenced pacemaker-schedulerd",
                           help="Run valgrind against the given space-separated list of processes")
         grp4.add_argument("--valgrind-tests",
                           action="store_true",
                           help="Include tests using valgrind")
         grp4.add_argument("--warn-inactive",
                           action="store_true",
                           help="Warn if a resource is assigned to an inactive node")
 
         parser.add_argument("iterations",
                             nargs='?',
                             type=int, default=1,
                             help="Number of tests to run")
 
         args = parser.parse_args(args=argv)
 
         # Set values on this object based on what happened with command line
         # processing.  This has to be done in several blocks.
 
         # These values can always be set.  They get a default from the add_argument
         # calls, only do one thing, and they do not have any side effects.
         self["ClobberCIB"] = args.clobber_cib
         self["ListTests"] = args.list_tests
         self["Schema"] = args.schema
         self["Stack"] = args.stack
         self["SyslogFacility"] = args.facility
         self["TruncateLog"] = args.truncate
         self["at-boot"] = args.at_boot in ["1", "yes"]
         self["benchmark"] = args.benchmark
         self["continue"] = args.always_continue
         self["experimental-tests"] = args.experimental_tests
         self["iterations"] = args.iterations
         self["loop-minutes"] = args.loop_minutes
         self["loop-tests"] = not args.no_loop_tests
         self["notification-agent"] = args.notification_agent
         self["notification-recipient"] = args.notification_recipient
         self["node-limit"] = args.limit_nodes
         self["stonith-params"] = args.stonith_args
         self["stonith-type"] = args.stonith_type
         self["unsafe-tests"] = not args.no_unsafe_tests
         self["valgrind-procs"] = args.valgrind_procs
         self["valgrind-tests"] = args.valgrind_tests
         self["warn-inactive"] = args.warn_inactive
 
         # Nodes and groups are mutually exclusive, so their defaults cannot be
         # set in their add_argument calls.  Additionally, groups does more than
         # just set a value.  Here, set nodes first and then if a group is
         # specified, override the previous nodes value.
         if args.nodes:
             self["nodes"] = args.nodes.split(" ")
         else:
             self["nodes"] = []
 
         if args.group:
             self["OutputFile"] = "%s/cluster-%s.log" % (os.environ['HOME'], args.dsh_group)
             LogFactory().add_file(self["OutputFile"], "CTS")
 
             dsh_file = "%s/.dsh/group/%s" % (os.environ['HOME'], args.dsh_group)
 
             if os.path.isfile(dsh_file):
                 self["nodes"] = []
 
                 with open(dsh_file, "r", encoding="utf-8") as f:
                     for line in f:
                         stripped = line.strip()
 
                         if not stripped.startswith('#'):
                             self["nodes"].append(stripped)
             else:
                 print("Unknown DSH group: %s" % args.dsh_group)
 
         # Everything else either can't have a default set in an add_argument
         # call (likely because we don't want to always have a value set for it)
         # or it does something fancier than just set a single value.  However,
         # order does not matter for these as long as the user doesn't provide
         # conflicting arguments on the command line.  So just do Everything
         # alphabetically.
         if args.boot:
             self["scenario"] = "boot"
 
         if args.cib_filename:
             self["CIBfilename"] = args.cib_filename
         else:
             self["CIBfilename"] = None
 
         if args.choose:
             self["scenario"] = "sequence"
             self["tests"].append(args.choose)
 
         if args.fencing:
             if args.fencing in ["0", "no"]:
                 self["DoFencing"] = False
             else:
                 self["DoFencing"] = True
 
                 if args.fencing in ["rhcs", "virt", "xvm"]:
                     self["stonith-type"] = "fence_xvm"
 
                 elif args.fencing == "scsi":
                     self["stonith-type"] = "fence_scsi"
 
                 elif args.fencing in ["lha", "ssh"]:
                     self["stonith-params"] = "hostlist=all,livedangerously=yes"
                     self["stonith-type"] = "external/ssh"
 
                 elif args.fencing == "openstack":
                     self["stonith-type"] = "fence_openstack"
 
                     print("Obtaining OpenStack credentials from the current environment")
                     self["stonith-params"] = "region=%s,tenant=%s,auth=%s,user=%s,password=%s" % (
                         os.environ['OS_REGION_NAME'],
                         os.environ['OS_TENANT_NAME'],
                         os.environ['OS_AUTH_URL'],
                         os.environ['OS_USERNAME'],
                         os.environ['OS_PASSWORD']
                     )
 
                 elif args.fencing == "rhevm":
                     self["stonith-type"] = "fence_rhevm"
 
                     print("Obtaining RHEV-M credentials from the current environment")
                     self["stonith-params"] = "login=%s,passwd=%s,ipaddr=%s,ipport=%s,ssl=1,shell_timeout=10" % (
                         os.environ['RHEVM_USERNAME'],
                         os.environ['RHEVM_PASSWORD'],
                         os.environ['RHEVM_SERVER'],
                         os.environ['RHEVM_PORT'],
                     )
 
         if args.ip:
             self["CIBResource"] = True
             self["ClobberCIB"] = True
             self["IPBase"] = args.ip
 
-        if args.logfile:
+        if args.logfile == "journal":
+            self["LogAuditDisabled"] = True
+            self["log_kind"] = LogKind.JOURNAL
+        elif args.logfile:
             self["LogAuditDisabled"] = True
             self["LogFileName"] = args.logfile
             self["log_kind"] = LogKind.REMOTE_FILE
         else:
             # We can't set this as the default on the parser.add_argument call
             # for this option because then args.logfile will be set, which means
             # the above branch will be taken and those other values will also be
             # set.
             self["LogFileName"] = "/var/log/messages"
 
         if args.once:
             self["scenario"] = "all-once"
 
         if args.oprofile:
             self["oprofile"] = args.oprofile.split(" ")
         else:
             self["oprofile"] = []
 
         if args.outputfile:
             self["OutputFile"] = args.outputfile
             LogFactory().add_file(self["OutputFile"])
 
         if args.populate_resources:
             self["CIBResource"] = True
             self["ClobberCIB"] = True
 
         if args.qarsh:
             self._rsh.enable_qarsh()
 
         for kv in args.set:
             (name, value) = kv.split("=")
             self[name] = value
             print("Setting %s = %s" % (name, value))
 
 
 class EnvFactory:
     """A class for constructing a singleton instance of an Environment object."""
 
     instance = None
 
     # pylint: disable=invalid-name
     def getInstance(self, args=None):
         """
         Return the previously created instance of Environment.
 
         If no instance exists, create a new instance and return that.
         """
         if not EnvFactory.instance:
             EnvFactory.instance = Environment(args)
 
         return EnvFactory.instance
diff --git a/python/pacemaker/_cts/watcher.py b/python/pacemaker/_cts/watcher.py
index d964a2fc13..ac83edfe1e 100644
--- a/python/pacemaker/_cts/watcher.py
+++ b/python/pacemaker/_cts/watcher.py
@@ -1,596 +1,596 @@
 """Log searching classes for Pacemaker's Cluster Test Suite (CTS)."""
 
 __all__ = ["LogKind", "LogWatcher"]
 __copyright__ = "Copyright 2014-2024 the Pacemaker project contributors"
 __license__ = "GNU General Public License version 2 or later (GPLv2+) WITHOUT ANY WARRANTY"
 
 from enum import Enum, auto, unique
 import re
 import time
 import threading
 
 from dateutil.parser import isoparser
 
 from pacemaker.buildoptions import BuildOptions
 from pacemaker._cts.errors import OutputNotFoundError
 from pacemaker._cts.logging import LogFactory
 from pacemaker._cts.remote import RemoteFactory
 
 CTS_SUPPORT_BIN = "%s/cts-support" % BuildOptions.DAEMON_DIR
 
 
 @unique
 class LogKind(Enum):
     """The various kinds of log files that can be watched."""
 
     LOCAL_FILE = auto()     # From a local aggregation file on the exerciser
     REMOTE_FILE = auto()    # From a file on each cluster node
     JOURNAL = auto()        # From the systemd journal on each cluster node
 
     def __str__(self):
         """Return a printable string for a LogKind value."""
         return self.name.lower().replace('_', ' ')
 
 
 class SearchObj:
     """
     The base class for various kinds of log watchers.
 
     Log-specific watchers need to be built on top of this one.
     """
 
     def __init__(self, filename, host=None, name=None):
         """
         Create a new SearchObj instance.
 
         Arguments:
         filename -- The log to watch
         host     -- The cluster node on which to watch the log
         name     -- A unique name to use when logging about this watch
         """
         self.filename = filename
         self.limit = None
         self.logger = LogFactory()
         self.name = name
         self.offset = "EOF"
         self.rsh = RemoteFactory().getInstance()
 
         if host:
             self.host = host
         else:
             self.host = "localhost"
 
         self._cache = []
         self._delegate = None
 
         async_task = self.harvest_async()
         async_task.join()
 
     def __str__(self):
         if self.host:
             return "%s:%s" % (self.host, self.filename)
 
         return self.filename
 
     def log(self, args):
         """Log a message."""
         message = "lw: %s: %s" % (self, args)
         self.logger.log(message)
 
     def debug(self, args):
         """Log a debug message."""
         message = "lw: %s: %s" % (self, args)
         self.logger.debug(message)
 
     def harvest_async(self, delegate=None):
         """
         Collect lines from a log asynchronously.
 
         Optionally, also call delegate when complete.  This method must be
         implemented by all subclasses.
         """
         raise NotImplementedError
 
     def harvest_cached(self):
         """Return cached logs from before the limit timestamp."""
         raise NotImplementedError
 
     def end(self):
         """
         Mark that a log is done being watched.
 
         This function also resets internal data structures to the beginning
         of the file.  Subsequent watches will therefore start from the
         beginning again.
         """
         self.debug("Clearing cache and unsetting limit")
         self._cache = []
         self.limit = None
 
 
 class FileObj(SearchObj):
     """A specialized SearchObj subclass for watching log files."""
 
     def __init__(self, filename, host=None, name=None):
         """
         Create a new FileObj instance.
 
         Arguments:
         filename -- The file to watch
         host     -- The cluster node on which to watch the file
         name     -- A unique name to use when logging about this watch
         """
         SearchObj.__init__(self, filename, host, name)
 
     def async_complete(self, pid, returncode, out, err):
         """
         Handle completion of an asynchronous log file read.
 
         This function saves the output from that read for look()/look_for_all()
         to process and records the current position in the journal.  Future
         reads will pick back up from that spot.
 
         Arguments:
         pid         -- The ID of the process that did the read
         returncode  -- The return code of the process that did the read
         out         -- stdout from the file read
         err         -- stderr from the file read
         """
         messages = []
         for line in out:
             match = re.search(r"^CTSwatcher:Last read: (\d+)", line)
 
             if match:
                 self.offset = match.group(1)
                 self.debug("Got %d lines, new offset: %s  %r" % (len(out), self.offset, self._delegate))
             elif re.search(r"^CTSwatcher:.*truncated", line):
                 self.log(line)
             elif re.search(r"^CTSwatcher:", line):
                 self.debug("Got control line: %s" % line)
             else:
                 messages.append(line)
 
         if self._delegate:
             self._delegate.async_complete(pid, returncode, messages, err)
 
     def harvest_async(self, delegate=None):
         """
         Collect lines from the log file on a single host asynchronously.
 
         Optionally, call delegate when complete.  This can be called
         repeatedly, reading a chunk each time or until the end of the
         log file is hit.
         """
         self._delegate = delegate
 
         if self.limit and (self.offset == "EOF" or int(self.offset) > self.limit):
             if self._delegate:
                 self._delegate.async_complete(-1, -1, [], [])
 
             return None
 
         cmd = ("%s watch -p CTSwatcher: -l 200 -f %s -o %s"
                % (CTS_SUPPORT_BIN, self.filename, self.offset))
 
         return self.rsh.call_async(self.host, cmd, delegate=self)
 
     def harvest_cached(self):
         """Return cached logs from before the limit timestamp."""
         # cts-log-watcher script renders caching unnecessary for FileObj.
         # @TODO Caching might be slightly more efficient, if not too complex.
         return []
 
     def set_end(self):
         """
         Internally record where we expect to find the end of a log file.
 
         Calls to harvest from the log file will not go any farther than
         what this function records.
         """
         if self.limit:
             return
 
         cmd = ("%s watch -p CTSwatcher: -l 2 -f %s -o EOF"
                % (CTS_SUPPORT_BIN, self.filename))
 
         # pylint: disable=not-callable
         (_, lines) = self.rsh(self.host, cmd, verbose=0)
 
         for line in lines:
             match = re.search(r"^CTSwatcher:Last read: (\d+)", line)
             if match:
                 self.limit = int(match.group(1))
                 self.debug("Set limit to: %d" % self.limit)
 
 
 class JournalObj(SearchObj):
     """A specialized SearchObj subclass for watching systemd journals."""
 
     def __init__(self, host=None, name=None):
         """
         Create a new JournalObj instance.
 
         Arguments:
         host     -- The cluster node on which to watch the journal
         name     -- A unique name to use when logging about this watch
         """
-        SearchObj.__init__(self, name, host, name)
+        SearchObj.__init__(self, "journal", host, name)
         self._parser = isoparser()
 
     def _msg_after_limit(self, msg):
         """
         Check whether a message was logged after the limit timestamp.
 
         Arguments:
         msg -- Message to check
 
         Returns `True` if `msg` was logged after `self.limit`, or `False`
         otherwise.
         """
         if not self.limit:
             return False
 
         match = re.search(r"^\S+", msg)
         if not match:
             return False
 
         msg_timestamp = match.group(0)
         msg_dt = self._parser.isoparse(msg_timestamp)
         return msg_dt > self.limit
 
     def _split_msgs_by_limit(self, msgs):
         """
         Split a sorted list of messages relative to the limit timestamp.
 
         Arguments:
         msgs -- List of messages to split
 
         Returns a tuple:
         (list of messages logged on or before limit timestamp,
          list of messages logged after limit timestamp).
         """
         # If last message was logged before limit, all messages were
         if msgs and self._msg_after_limit(msgs[-1]):
 
             # Else find index of first message logged after limit
             for idx, msg in enumerate(msgs):
                 if self._msg_after_limit(msg):
                     self.debug("Got %d lines before passing limit timestamp"
                                % idx)
                     return msgs[:idx], msgs[idx:]
 
         self.debug("Got %s lines" % len(msgs))
         return msgs, []
 
     def async_complete(self, pid, returncode, out, err):
         """
         Handle completion of an asynchronous journal read.
 
         This function saves the output from that read for look()/look_for_all()
         to process and records the current position in the journal.  Future
         reads will pick back up from that spot.
 
         Arguments:
         pid         -- The ID of the process that did the journal read
         returncode  -- The return code of the process that did the journal read
         out         -- stdout from the journal read
         err         -- stderr from the journal read
         """
         if out:
             # Cursor should always be last line of journalctl output
             out, cursor_line = out[:-1], out[-1]
             match = re.search(r"^-- cursor: ([^.]+)", cursor_line)
             if not match:
                 raise OutputNotFoundError('Cursor not found at end of output:'
                                           + '\n%s' % out)
 
             self.offset = match.group(1).strip()
             self.debug("Got new cursor: %s" % self.offset)
 
         before, after = self._split_msgs_by_limit(out)
 
         # Save remaining messages after limit for later processing
         self._cache.extend(after)
 
         if self._delegate:
             self._delegate.async_complete(pid, returncode, before, err)
 
     def harvest_async(self, delegate=None):
         """
         Collect lines from the journal on a single host asynchronously.
 
         Optionally, call delegate when complete.  This can be called
         repeatedly, reading a chunk each time or until the end of the journal
         is hit.
         """
         self._delegate = delegate
 
         # Use --lines to prevent journalctl from overflowing the Popen input
         # buffer
         command = "journalctl --quiet --output=short-iso --show-cursor"
         if self.offset == "EOF":
             command += " --lines 0"
         else:
             command += " --after-cursor='%s' --lines=200" % self.offset
 
         return self.rsh.call_async(self.host, command, delegate=self)
 
     def harvest_cached(self):
         """Return cached logs from before the limit timestamp."""
         before, self._cache = self._split_msgs_by_limit(self._cache)
         return before
 
     def set_end(self):
         """
         Internally record where we expect to find the end of a host's journal.
 
         Calls to harvest from the journal will not go any farther than what
         this function records.
         """
         if self.limit:
             return
 
         # --iso-8601=seconds yields YYYY-MM-DDTHH:MM:SSZ, where Z is timezone
         # as offset from UTC
 
         # pylint: disable=not-callable
         (rc, lines) = self.rsh(self.host, "date --iso-8601=seconds", verbose=0)
 
         if rc == 0 and len(lines) == 1:
             self.limit = self._parser.isoparse(lines[0].strip())
             self.debug("Set limit to: %s" % self.limit)
         else:
             self.debug("Unable to set limit for %s because date returned %d lines with status %d"
                        % (self.host, len(lines), rc))
 
 
 class LogWatcher:
     """
     Watch a single log file or journal across multiple hosts.
 
     Instances of this class look for lines that match given regular
     expressions.
 
     The way you use this class is as follows:
         - Construct a LogWatcher object
         - Call set_watch() when you want to start watching the log
         - Call look() to scan the log looking for the patterns
     """
 
     def __init__(self, log, regexes, hosts, kind, name="Anon", timeout=10,
                  silent=False):
         """
         Create a new LogWatcher instance.
 
         Arguments:
         log     -- The log file to watch
         regexes -- A list of regular expressions to match against the log
         hosts   -- A list of cluster nodes on which to watch the log
         kind    -- What type of log is this object watching?
         name    -- A unique name to use when logging about this watch
         timeout -- Default number of seconds to watch a log file at a time;
                    this can be overridden by the timeout= parameter to
                    self.look on an as-needed basis
         silent  -- If False, log extra information
         """
         self.filename = log
         self.hosts = hosts
         self.kind = kind
         self.name = name
         self.regexes = regexes
         self.unmatched = None
         self.whichmatch = -1
 
         self._cache_lock = threading.Lock()
         self._file_list = []
         self._line_cache = []
         self._logger = LogFactory()
         self._timeout = int(timeout)
 
         #  Validate our arguments.  Better sooner than later ;-)
         for regex in regexes:
             re.compile(regex)
 
         if not self.hosts:
             raise ValueError("LogWatcher requires hosts argument")
 
-        if not self.filename:
-            raise ValueError("LogWatcher requires log argument")
+        if self.kind != LogKind.JOURNAL and not self.filename:
+            raise ValueError("LogWatcher requires log file name if not journal")
 
         if not silent:
             for regex in self.regexes:
                 self._debug("Looking for regex: %s" % regex)
 
     def _debug(self, args):
         """Log a debug message."""
         message = "lw: %s: %s" % (self.name, args)
         self._logger.debug(message)
 
     def set_watch(self):
         """Mark the place to start watching the log from."""
         if self.kind == LogKind.LOCAL_FILE:
             self._file_list.append(FileObj(self.filename))
 
         elif self.kind == LogKind.REMOTE_FILE:
             for node in self.hosts:
                 self._file_list.append(FileObj(self.filename, node, self.name))
 
         elif self.kind == LogKind.JOURNAL:
             for node in self.hosts:
                 self._file_list.append(JournalObj(node, self.name))
 
     def async_complete(self, pid, returncode, out, err):
         """
         Handle completion of an asynchronous log file read.
 
         This function saves the output from that read for look()/look_for_all()
         to process and records the current position.  Future reads will pick
         back up from that spot.
 
         Arguments:
         pid         -- The ID of the process that did the read
         returncode  -- The return code of the process that did the read
         out         -- stdout from the file read
         err         -- stderr from the file read
         """
         # Called as delegate through {File,Journal}Obj.async_complete()
         # pylint: disable=unused-argument
 
         # TODO: Probably need a lock for updating self._line_cache
         self._logger.debug("%s: Got %d lines from %d (total %d)" % (self.name, len(out), pid, len(self._line_cache)))
 
         if out:
             with self._cache_lock:
                 self._line_cache.extend(out)
 
     def __get_lines(self):
         """Iterate over all watched log files and collect new lines from each."""
         if not self._file_list:
             raise ValueError("No sources to read from")
 
         pending = []
 
         for f in self._file_list:
             cached = f.harvest_cached()
             if cached:
                 self._debug("Got %d lines from %s cache (total %d)"
                             % (len(cached), f.name, len(self._line_cache)))
                 with self._cache_lock:
                     self._line_cache.extend(cached)
             else:
                 t = f.harvest_async(self)
                 if t:
                     pending.append(t)
 
         for t in pending:
             t.join(60.0)
             if t.is_alive():
                 self._logger.log("%s: Aborting after 20s waiting for %r logging commands" % (self.name, t))
                 return
 
     def end(self):
         """
         Mark that a log is done being watched.
 
         This function also resets internal data structures to the beginning
         of the file.  Subsequent watches will therefore start from the
         beginning again.
         """
         for f in self._file_list:
             f.end()
 
     def look(self, timeout=None):
         """
         Examine the log looking for the regexes in this object.
 
         It starts looking from the place marked by set_watch(), continuing
         through the file in the fashion of `tail -f`.  It properly recovers
         from log file truncation but not from removing and recreating the log.
 
         Arguments:
         timeout -- Number of seconds to watch the log file; defaults to
                    seconds argument passed when this object was created
 
         Returns the first line which matches any regex
         """
         if not timeout:
             timeout = self._timeout
 
         lines = 0
         begin = time.time()
         end = begin + timeout + 1
 
         if not self.regexes:
             self._debug("Nothing to look for")
             return None
 
         if timeout == 0:
             for f in self._file_list:
                 f.set_end()
 
         while True:
             if self._line_cache:
                 lines += 1
 
                 with self._cache_lock:
                     line = self._line_cache[0]
                     self._line_cache.remove(line)
 
                 which = -1
 
                 if re.search("CTS:", line):
                     continue
 
                 for regex in self.regexes:
                     which += 1
 
                     matchobj = re.search(regex, line)
 
                     if matchobj:
                         self.whichmatch = which
                         self._debug("Matched: %s" % line)
                         return line
 
             elif timeout > 0 and end < time.time():
                 timeout = 0
                 for f in self._file_list:
                     f.set_end()
 
             else:
                 self.__get_lines()
 
                 if not self._line_cache and end < time.time():
                     self._debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines))
                     return None
 
                 self._debug("Waiting: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), len(self._line_cache)))
                 time.sleep(1)
 
     def look_for_all(self, allow_multiple_matches=False, silent=False):
         """
         Like look(), but looks for matches for multiple regexes.
 
         This function returns when the timeout is reached or all regexes were
         matched.  As a side effect, self.unmatched will contain regexes that
         were not matched.  This can be inspected by the caller.
 
         Arguments:
         allow_multiple_matches -- If True, allow each regex to match more than
                                   once.  If False (the default), once a regex
                                   matches a line, it will no longer be searched
                                   for.
         silent                 -- If False, log extra information
 
         Returns the matching lines if all regexes are matched, or None.
         """
         save_regexes = self.regexes
         result = []
 
         if not silent:
             self._debug("starting search: timeout=%d" % self._timeout)
 
         while self.regexes:
             one_result = self.look(self._timeout)
             if not one_result:
                 self.unmatched = self.regexes
                 self.regexes = save_regexes
                 self.end()
                 return None
 
             result.append(one_result)
             if not allow_multiple_matches:
                 del self.regexes[self.whichmatch]
 
             else:
                 # Allow multiple regexes to match a single line
                 tmp_regexes = self.regexes
                 self.regexes = []
 
                 for regex in tmp_regexes:
                     matchobj = re.search(regex, one_result)
                     if not matchobj:
                         self.regexes.append(regex)
 
         self.unmatched = None
         self.regexes = save_regexes
         return result