diff --git a/cts/patterns.py b/cts/patterns.py
index f4c5366a28..267ab61fda 100644
--- a/cts/patterns.py
+++ b/cts/patterns.py
@@ -1,530 +1,530 @@
import sys, os
from cts.CTSvars import *
patternvariants = {}
class BasePatterns:
def __init__(self, name):
self.name = name
patternvariants[name] = self
self.ignore = []
self.BadNews = []
self.components = {}
self.commands = {
"StatusCmd" : "crmadmin -t 60000 -S %s 2>/dev/null",
"CibQuery" : "cibadmin -Ql",
"CibAddXml" : "cibadmin --modify -c --xml-text %s",
"CibDelXpath" : "cibadmin --delete --xpath %s",
# 300,000 == 5 minutes
"RscRunning" : CTSvars.CRM_DAEMON_DIR + "/lrmd_test -R -r %s",
"CIBfile" : "%s:"+CTSvars.CRM_CONFIG_DIR+"/cib.xml",
"TmpDir" : "/tmp",
"BreakCommCmd" : "iptables -A INPUT -s %s -j DROP >/dev/null 2>&1",
"FixCommCmd" : "iptables -D INPUT -s %s -j DROP >/dev/null 2>&1",
# tc qdisc add dev lo root handle 1: cbq avpkt 1000 bandwidth 1000mbit
# tc class add dev lo parent 1: classid 1:1 cbq rate "$RATE"kbps allot 17000 prio 5 bounded isolated
# tc filter add dev lo parent 1: protocol ip prio 16 u32 match ip dst 127.0.0.1 match ip sport $PORT 0xFFFF flowid 1:1
# tc qdisc add dev lo parent 1: netem delay "$LATENCY"msec "$(($LATENCY/4))"msec 10% 2> /dev/null > /dev/null
"ReduceCommCmd" : "",
"RestoreCommCmd" : "tc qdisc del dev lo root",
"UUIDQueryCmd" : "crmadmin -N",
"MaintenanceModeOn" : "cibadmin --modify -c --xml-text ''",
"MaintenanceModeOff" : "cibadmin --delete --xpath \"//nvpair[@name='maintenance-mode']\"",
"StandbyCmd" : "crm_attribute -VQ -U %s -n standby -l forever -v %s 2>/dev/null",
"StandbyQueryCmd" : "crm_attribute -QG -U %s -n standby -l forever -d off 2>/dev/null",
}
self.search = {
"Pat:DC_IDLE" : "crmd.*State transition.*-> S_IDLE",
# This wont work if we have multiple partitions
"Pat:Local_started" : "%s\W.*The local CRM is operational",
"Pat:Slave_started" : "%s\W.*State transition.*-> S_NOT_DC",
"Pat:Master_started": "%s\W.*State transition.*-> S_IDLE",
"Pat:We_stopped" : "heartbeat.*%s.*Heartbeat shutdown complete",
"Pat:Logd_stopped" : "%s\W.*logd:.*Exiting write process",
"Pat:They_stopped" : "%s\W.*LOST:.* %s ",
"Pat:They_dead" : "node %s.*: is dead",
"Pat:TransitionComplete" : "Transition status: Complete: complete",
"Pat:Fencing_start" : "Initiating remote operation .* for %s",
"Pat:Fencing_ok" : r"stonith.*:\s*Operation .* of %s by .* for .*@.*: OK",
"Pat:Fencing_recover" : r"pengine.*: Recover %s",
"Pat:RscOpOK" : r"crmd.*:\s*Operation %s_%s.*:\s*ok \(.*confirmed=\S+\)",
"Pat:RscRemoteOpOK" : r"crmd.*:\s*Operation %s_%s.*:\s*ok \(node=%s,.*,\s*confirmed=true\)",
"Pat:NodeFenced" : r"crmd.*:\s*Peer\s+%s\s+was\s+terminated\s+\(.*\)\s+by\s+.*\s+for\s+.*:\s+OK",
"Pat:FenceOpOK" : "Operation .* for host '%s' with device .* returned: 0",
}
def get_component(self, key):
if self.components.has_key(key):
return self.components[key]
print "Unknown component '%s' for %s" % (key, self.name)
return []
def get_patterns(self, key):
if key == "BadNews":
return self.BadNews
elif key == "BadNewsIgnore":
return self.ignore
elif key == "Commands":
return self.commands
elif key == "Search":
return self.search
elif key == "Components":
return self.components
def __getitem__(self, key):
if key == "Name":
return self.name
elif self.commands.has_key(key):
return self.commands[key]
elif self.search.has_key(key):
return self.search[key]
else:
print "Unknown template '%s' for %s" % (key, self.name)
return None
class crm_lha(BasePatterns):
def __init__(self, name):
BasePatterns.__init__(self, name)
self.commands.update({
"StartCmd" : "service heartbeat start > /dev/null 2>&1",
"StopCmd" : "service heartbeat stop > /dev/null 2>&1",
"EpochCmd" : "crm_node -H -e",
"QuorumCmd" : "crm_node -H -q",
"PartitionCmd" : "crm_node -H -p",
})
self.search.update({
# Patterns to look for in the log files for various occasions...
"Pat:ChildKilled" : "%s\W.*heartbeat.*%s.*killed by signal 9",
"Pat:ChildRespawn" : "%s\W.*heartbeat.*Respawning client.*%s",
"Pat:ChildExit" : "(ERROR|error): Client .* exited with return code",
})
self.BadNews = [
r"error:",
r"crit:",
r"ERROR:",
r"CRIT:",
r"Shutting down...NOW",
r"Timer I_TERMINATE just popped",
r"input=I_ERROR",
r"input=I_FAIL",
r"input=I_INTEGRATED cause=C_TIMER_POPPED",
r"input=I_FINALIZED cause=C_TIMER_POPPED",
r"input=I_ERROR",
r", exiting\.",
r"WARN.*Ignoring HA message.*vote.*not in our membership list",
r"pengine.*Attempting recovery of resource",
r"is taking more than 2x its timeout",
r"Confirm not received from",
r"Welcome reply not received from",
r"Attempting to schedule .* after a stop",
r"Resource .* was active at shutdown",
r"duplicate entries for call_id",
r"Search terminated:",
r"No need to invoke the TE",
r"global_timer_callback:",
r"Faking parameter digest creation",
r"Parameters to .* action changed:",
r"Parameters to .* changed",
]
self.ignore = [
r"(ERROR|error):.*\s+assert\s+at\s+crm_glib_handler:"
"(ERROR|error): Message hist queue is filling up",
"stonithd.*CRIT: external_hostlist:.*'vmware gethosts' returned an empty hostlist",
"stonithd.*(ERROR|error): Could not list nodes for stonith RA external/vmware.",
"pengine.*Preventing .* from re-starting",
]
class crm_cs_v0(BasePatterns):
def __init__(self, name):
BasePatterns.__init__(self, name)
self.commands.update({
"EpochCmd" : "crm_node -e --openais",
"QuorumCmd" : "crm_node -q --openais",
"PartitionCmd" : "crm_node -p --openais",
"StartCmd" : "service corosync start",
"StopCmd" : "service corosync stop",
})
self.search.update({
# The next pattern is too early
# "Pat:We_stopped" : "%s.*Service engine unloaded: Pacemaker Cluster Manager",
# The next pattern would be preferred, but it doesn't always come out
# "Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting with status",
"Pat:We_stopped" : "%s\W.*Service engine unloaded: corosync cluster quorum service",
"Pat:They_stopped" : "%s\W.*crmd.*Node %s\[.*state is now lost",
"Pat:They_dead" : "corosync:.*Node %s is now: lost",
"Pat:ChildExit" : "Child process .* exited",
"Pat:ChildKilled" : "%s\W.*corosync.*Child process %s terminated with signal 9",
"Pat:ChildRespawn" : "%s\W.*corosync.*Respawning failed child process: %s",
"Pat:InfraUp" : "%s\W.*corosync.*Initializing transport",
"Pat:PacemakerUp" : "%s\W.*pacemakerd.*Starting Pacemaker",
})
self.ignore = [
r"crm_mon:",
r"crmadmin:",
r"update_trace_data",
r"async_notify:.*strange, client not found",
r"Parse error: Ignoring unknown option .*nodename",
r"error.*: Operation 'reboot' .* with device 'FencingFail' returned:",
r"Child process .* terminated with signal 9",
r"getinfo response error: 1$",
"sbd.* error: inquisitor_child: DEBUG MODE IS ACTIVE",
r"sbd.* pcmk:\s*error:.*Connection to cib_ro failed",
r"sbd.* pcmk:\s*error:.*Connection to cib_ro.* closed .I/O condition=17",
]
self.BadNews = [
r"error:",
r"crit:",
r"ERROR:",
r"CRIT:",
r"Shutting down...NOW",
r"Timer I_TERMINATE just popped",
r"input=I_ERROR",
r"input=I_FAIL",
r"input=I_INTEGRATED cause=C_TIMER_POPPED",
r"input=I_FINALIZED cause=C_TIMER_POPPED",
r"input=I_ERROR",
r", exiting\.",
r"(WARN|warn).*Ignoring HA message.*vote.*not in our membership list",
r"pengine.*Attempting recovery of resource",
r"is taking more than 2x its timeout",
r"Confirm not received from",
r"Welcome reply not received from",
r"Attempting to schedule .* after a stop",
r"Resource .* was active at shutdown",
r"duplicate entries for call_id",
r"Search terminated:",
r":global_timer_callback",
r"Faking parameter digest creation",
r"Parameters to .* action changed:",
r"Parameters to .* changed",
r"The .* process .* terminated with signal",
r"Child process .* terminated with signal",
r"pengine:.*Recover .*\(.* -\> .*\)",
r"rsyslogd.* imuxsock lost .* messages from pid .* due to rate-limiting",
r"Peer is not part of our cluster",
r"We appear to be in an election loop",
r"Unknown node -> we will not deliver message",
r"(Blackbox dump requested|Problem detected)",
r"pacemakerd.*Could not connect to Cluster Configuration Database API",
r"Receiving messages from a node we think is dead",
r"share the same cluster nodeid",
r"share the same name",
#r"crm_ipc_send:.*Request .* failed",
#r"crm_ipc_send:.*Sending to .* is disabled until pending reply is received",
# Not inherently bad, but worth tracking
#r"No need to invoke the TE",
#r"ping.*: DEBUG: Updated connected = 0",
#r"Digest mis-match:",
r"crmd:.*Transition failed: terminated",
r"Local CIB .* differs from .*:",
r"warn.*:\s*Continuing but .* will NOT be used",
r"warn.*:\s*Cluster configuration file .* is corrupt",
#r"Executing .* fencing operation",
#r"fence_pcmk.* Call to fence",
#r"fence_pcmk",
r"cman killed by node",
r"Election storm",
r"stalled the FSA with pending inputs",
]
self.components["common-ignore"] = [
"Pending action:",
"error: crm_log_message_adv:",
"resources were active at shutdown",
"pending LRM operations at shutdown",
"Lost connection to the CIB service",
"Connection to the CIB terminated...",
"Sending message to CIB service FAILED",
"apply_xml_diff:.*Diff application failed!",
r"crmd.*:\s*Action A_RECOVER .* not supported",
"unconfirmed_actions:.*Waiting on .* unconfirmed actions",
"cib_native_msgready:.*Message pending on command channel",
r"crmd.*:\s*Performing A_EXIT_1 - forcefully exiting the CRMd",
"verify_stopped:.*Resource .* was active at shutdown. You may ignore this error if it is unmanaged.",
"error: attrd_connection_destroy:.*Lost connection to attrd",
r".*:\s*Executing .* fencing operation \(.*\) on ",
r"(Blackbox dump requested|Problem detected)",
# "error: native_create_actions: Resource .*stonith::.* is active on 2 nodes attempting recovery",
# "error: process_pe_message: Transition .* ERRORs found during PE processing",
]
self.components["corosync-ignore"] = [
r"error:.*Connection to the CPG API failed: Library error",
r"The .* process .* exited",
r"pacemakerd.*error:.*Child process .* exited",
r"cib.*error:.*Corosync connection lost",
r"stonith-ng.*error:.*Corosync connection terminated",
r"The cib process .* exited: Invalid argument",
r"The attrd process .* exited: Transport endpoint is not connected",
r"The crmd process .* exited: Link has been severed",
r"error:.*Child process cib .* exited: Invalid argument",
r"error:.*Child process attrd .* exited: Transport endpoint is not connected",
r"error:.*Child process crmd .* exited: Link has been severed",
r"lrmd.*error:.*Connection to stonith-ng failed",
r"lrmd.*error:.*Connection to stonith-ng.* closed",
r"lrmd.*error:.*LRMD lost STONITH connection",
r"crmd.*State transition .* S_RECOVERY",
r"crmd.*error:.*FSA: Input I_ERROR",
r"crmd.*error:.*FSA: Input I_TERMINATE",
r"crmd.*error:.*Connection to cman failed",
r"crmd.*error:.*Could not recover from internal error",
r"error:.*Connection to cib_shm failed",
r"error:.*Connection to cib_shm.* closed",
r"error:.*STONITH connection failed",
]
self.components["corosync"] = [
r"pacemakerd.*error:.*Connection destroyed",
r"attrd.*:\s*crit:.*Lost connection to Corosync service",
r"stonith.*:\s*Corosync connection terminated",
r"cib.*:\s*Corosync connection lost!\s+Exiting.",
r"crmd.*:\s*connection terminated",
r"pengine.*Scheduling Node .* for STONITH",
r"crmd.*:\s*Peer %s was terminated \(.*\) by .* for .*:\s*OK",
]
self.components["cib-ignore"] = [
"lrmd.*Connection to stonith-ng failed",
"lrmd.*Connection to stonith-ng.* closed",
"lrmd.*LRMD lost STONITH connection",
"lrmd.*STONITH connection failed, finalizing .* pending operations",
]
self.components["cib"] = [
"State transition .* S_RECOVERY",
"Respawning .* crmd",
"Respawning .* attrd",
"Connection to cib_.* failed",
"Connection to cib_.* closed",
"Connection to the CIB terminated...",
"(Child process|The) crmd .* exited: Generic Pacemaker error",
"(Child process|The) attrd .* exited: (Connection reset by peer|Transport endpoint is not connected)",
"Lost connection to CIB service",
"crmd.*Input I_TERMINATE from do_recover",
"crmd.*I_ERROR.*crmd_cib_connection_destroy",
"crmd.*Could not recover from internal error",
]
self.components["lrmd"] = [
"State transition .* S_RECOVERY",
"LRM Connection failed",
"Respawning .* crmd",
"Connection to lrmd failed",
"Connection to lrmd.* closed",
"crmd.*I_ERROR.*lrm_connection_destroy",
"(Child process|The) crmd .* exited: Generic Pacemaker error",
"crmd.*Input I_TERMINATE from do_recover",
"crmd.*Could not recover from internal error",
]
self.components["lrmd-ignore"] = []
self.components["crmd"] = [
# "WARN: determine_online_status: Node .* is unclean",
# "Scheduling Node .* for STONITH",
# "Executing .* fencing operation",
# Only if the node wasn't the DC: "State transition S_IDLE",
"State transition .* -> S_IDLE",
]
self.components["crmd-ignore"] = []
self.components["attrd"] = []
self.components["attrd-ignore"] = []
self.components["pengine"] = [
"State transition .* S_RECOVERY",
"Respawning .* crmd",
"(The|Child process) crmd .* exited: Generic Pacemaker error",
"Connection to pengine failed",
"Connection to pengine.* closed",
"Connection to the Policy Engine failed",
"crmd.*I_ERROR.*save_cib_contents",
"crmd.*Input I_TERMINATE from do_recover",
"crmd.*Could not recover from internal error",
]
self.components["pengine-ignore"] = []
self.components["stonith"] = [
"Connection to stonith-ng failed",
"LRMD lost STONITH connection",
"Connection to stonith-ng.* closed",
"Fencing daemon connection failed",
r"crmd.*:\s*warn.*:\s*Callback already present",
]
self.components["stonith-ignore"] = [
r"pengine.*: Recover Fencing",
"Updating failcount for Fencing",
r"error:.*Connection to stonith-ng failed",
r"error:.*Connection to stonith-ng.*closed \(I/O condition=17\)",
r"crit:.*Fencing daemon connection failed",
r"error:.*Sign-in failed: triggered a retry",
"STONITH connection failed, finalizing .* pending operations.",
r"crmd.*:\s*Operation Fencing.* Error",
]
self.components["stonith-ignore"].extend(self.components["common-ignore"])
class crm_mcp(crm_cs_v0):
'''
The crm version 4 cluster manager class.
It implements the things we need to talk to and manipulate
crm clusters running on top of native corosync (no plugins)
'''
def __init__(self, name):
crm_cs_v0.__init__(self, name)
self.commands.update({
"StartCmd" : "service corosync start && service pacemaker start",
- "StopCmd" : "service pacemaker stop; service pacemaker_remote stop; service corosync stop",
+ "StopCmd" : "service pacemaker stop; [ ! -e /usr/sbin/pacemaker_remoted ] || service pacemaker_remote stop; service corosync stop",
"EpochCmd" : "crm_node -e",
"QuorumCmd" : "crm_node -q",
"PartitionCmd" : "crm_node -p",
})
self.search.update({
# Close enough... "Corosync Cluster Engine exiting normally" isn't printed
# reliably and there's little interest in doing anything about it
"Pat:We_stopped" : "%s\W.*Unloading all Corosync service engines",
"Pat:They_stopped" : "%s\W.*crmd.*Node %s\[.*state is now lost",
"Pat:They_dead" : "crmd.*Node %s\[.*state is now lost",
"Pat:ChildExit" : "The .* process exited",
"Pat:ChildKilled" : "%s\W.*pacemakerd.*The %s process .* terminated with signal 9",
"Pat:ChildRespawn" : "%s\W.*pacemakerd.*Respawning failed child process: %s",
})
# if self.Env["have_systemd"]:
# self.update({
# # When systemd is in use, we can look for this instead
# "Pat:We_stopped" : "%s.*Stopped Corosync Cluster Engine",
# })
class crm_mcp_docker(crm_mcp):
'''
The crm version 4 cluster manager class.
It implements the things we need to talk to and manipulate
crm clusters running on top of native corosync (no plugins)
'''
def __init__(self, name):
crm_mcp.__init__(self, name)
self.commands.update({
"StartCmd" : "pcmk_start",
"StopCmd" : "pcmk_stop",
})
class crm_cman(crm_cs_v0):
'''
The crm version 3 cluster manager class.
It implements the things we need to talk to and manipulate
crm clusters running on top of openais
'''
def __init__(self, name):
crm_cs_v0.__init__(self, name)
self.commands.update({
"StartCmd" : "service pacemaker start",
- "StopCmd" : "service pacemaker stop; service pacemaker_remote stop",
+ "StopCmd" : "service pacemaker stop; [ ! -e /usr/sbin/pacemaker_remoted ] || service pacemaker_remote stop",
"EpochCmd" : "crm_node -e --cman",
"QuorumCmd" : "crm_node -q --cman",
"PartitionCmd" : "crm_node -p --cman",
"Pat:We_stopped" : "%s.*Unloading all Corosync service engines",
"Pat:They_stopped" : "%s\W.*crmd.*Node %s\[.*state is now lost",
"Pat:They_dead" : "crmd.*Node %s\[.*state is now lost",
"Pat:ChildKilled" : "%s\W.*pacemakerd.*The %s process .* terminated with signal 9",
"Pat:ChildRespawn" : "%s\W.*pacemakerd.*Respawning failed child process: %s",
})
class PatternSelector:
def __init__(self, name=None):
self.name = name
self.base = BasePatterns("crm-base")
if not name:
crm_cs_v0("crm-plugin-v0")
crm_cman("crm-cman")
crm_mcp("crm-mcp")
crm_lha("crm-lha")
elif name == "crm-lha":
crm_lha(name)
elif name == "crm-plugin-v0":
crm_cs_v0(name)
elif name == "crm-cman":
crm_cman(name)
elif name == "crm-mcp":
crm_mcp(name)
elif name == "crm-mcp-docker":
crm_mcp_docker(name)
def get_variant(self, variant):
if patternvariants.has_key(variant):
return patternvariants[variant]
print "defaulting to crm-base for %s" % variant
return self.base
def get_patterns(self, variant, kind):
return self.get_variant(variant).get_patterns(kind)
def get_template(self, variant, key):
v = self.get_variant(variant)
return v[key]
def get_component(self, variant, kind):
return self.get_variant(variant).get_component(kind)
def __getitem__(self, key):
return self.get_template(self.name, key)
# python cts/CTSpatt.py -k crm-mcp -t StartCmd
if __name__ == '__main__':
pdir=os.path.dirname(sys.path[0])
sys.path.insert(0, pdir) # So that things work from the source directory
kind=None
template=None
skipthis=None
args=sys.argv[1:]
for i in range(0, len(args)):
if skipthis:
skipthis=None
continue
elif args[i] == "-k" or args[i] == "--kind":
skipthis=1
kind = args[i+1]
elif args[i] == "-t" or args[i] == "--template":
skipthis=1
template = args[i+1]
else:
print "Illegal argument " + args[i]
print PatternSelector(kind)[template]
diff --git a/cts/watcher.py b/cts/watcher.py
index c4ea1b02fe..1182c8bb00 100644
--- a/cts/watcher.py
+++ b/cts/watcher.py
@@ -1,548 +1,550 @@
'''
Classes related to searching logs
'''
__copyright__='''
Copyright (C) 2014 Andrew Beekhof
Licensed under the GNU GPL.
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
import time, re, os, threading
from cts.remote import *
from cts.logging import *
has_log_watcher = {}
log_watcher_file = "cts_log_watcher.py"
log_watcher_bin = CTSvars.CRM_DAEMON_DIR + "/" + log_watcher_file
log_watcher = """
import sys, os, fcntl
'''
Remote logfile reader for CTS
Reads a specified number of lines from the supplied offset
Returns the current offset
Contains logic for handling truncation
'''
limit = 0
offset = 0
prefix = ''
filename = '/var/log/messages'
skipthis=None
args=sys.argv[1:]
for i in range(0, len(args)):
if skipthis:
skipthis=None
continue
elif args[i] == '-l' or args[i] == '--limit':
skipthis=1
limit = int(args[i+1])
elif args[i] == '-f' or args[i] == '--filename':
skipthis=1
filename = args[i+1]
elif args[i] == '-o' or args[i] == '--offset':
skipthis=1
offset = args[i+1]
elif args[i] == '-p' or args[i] == '--prefix':
skipthis=1
prefix = args[i+1]
elif args[i] == '-t' or args[i] == '--tag':
skipthis=1
if not os.access(filename, os.R_OK):
print prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0)
sys.exit(1)
logfile=open(filename, 'r')
logfile.seek(0, os.SEEK_END)
newsize=logfile.tell()
if offset != 'EOF':
offset = int(offset)
if newsize >= offset:
logfile.seek(offset)
else:
print prefix + ('File truncated from %d to %d' % (offset, newsize))
if (newsize*1.05) < offset:
logfile.seek(0)
# else: we probably just lost a few logs after a fencing op
# continue from the new end
# TODO: accept a timestamp and discard all messages older than it
# Don't block when we reach EOF
fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
count = 0
while True:
if logfile.tell() >= newsize: break
elif limit and count >= limit: break
line = logfile.readline()
if not line: break
print line.strip()
count += 1
print prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count)
logfile.close()
"""
class SearchObj:
def __init__(self, filename, host=None, name=None):
self.limit = None
self.cache = []
self.logger = LogFactory()
self.host = host
self.name = name
self.filename = filename
self.rsh = RemoteFactory().getInstance()
self.offset = "EOF"
if host == None:
host = "localhost"
def __str__(self):
if self.host:
return "%s:%s" % (self.host, self.filename)
return self.filename
def log(self, args):
message = "lw: %s: %s" % (self, args)
self.logger.log(message)
def debug(self, args):
message = "lw: %s: %s" % (self, args)
self.logger.debug(message)
def harvest(self, delegate=None):
async = self.harvest_async(delegate)
async.join()
def harvest_async(self, delegate=None):
self.log("Not implemented")
raise
def end(self):
self.debug("Unsetting the limit")
# Unset the limit
self.limit = None
class FileObj(SearchObj):
def __init__(self, filename, host=None, name=None):
global has_log_watcher
SearchObj.__init__(self, filename, host, name)
if host is not None:
if not has_log_watcher.has_key(host):
global log_watcher
global log_watcher_bin
self.debug("Installing %s on %s" % (log_watcher_file, host))
os.system("cat << END >> %s\n%s\nEND" %(log_watcher_file, log_watcher))
os.system("chmod 755 %s" %(log_watcher_file))
self.rsh.cp(log_watcher_file, "root@%s:%s" % (host, log_watcher_bin))
has_log_watcher[host] = 1
os.system("rm -f %s" %(log_watcher_file))
self.harvest()
def async_complete(self, pid, returncode, outLines, errLines):
for line in outLines:
match = re.search("^CTSwatcher:Last read: (\d+)", line)
if match:
last_offset = self.offset
self.offset = match.group(1)
#if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(outLines), self.offset))
self.debug("Got %d lines, new offset: %s %s" % (len(outLines), self.offset, repr(self.delegate)))
elif re.search("^CTSwatcher:.*truncated", line):
self.log(line)
elif re.search("^CTSwatcher:", line):
self.debug("Got control line: "+ line)
else:
self.cache.append(line)
if self.delegate:
self.delegate.async_complete(pid, returncode, self.cache, errLines)
def harvest_async(self, delegate=None):
self.delegate = delegate
self.cache = []
if self.limit != None and self.offset > self.limit:
if self.delegate:
self.delegate.async_complete(-1, -1, [], [])
return None
global log_watcher_bin
return self.rsh.call_async(self.host,
"python %s -t %s -p CTSwatcher: -l 200 -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset),
completionDelegate=self)
def setend(self):
if self.limit:
return
global log_watcher_bin
(rc, lines) = self.rsh(self.host,
"python %s -t %s -p CTSwatcher: -l 2 -f %s -o %s" % (log_watcher_bin, self.name, self.filename, "EOF"),
None, silent=True)
for line in lines:
match = re.search("^CTSwatcher:Last read: (\d+)", line)
if match:
last_offset = self.offset
self.limit = int(match.group(1))
#if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(lines), self.offset))
self.debug("Set limit to: %d" % self.limit)
return
class JournalObj(SearchObj):
def __init__(self, host=None, name=None):
SearchObj.__init__(self, name, host, name)
self.harvest()
def async_complete(self, pid, returncode, outLines, errLines):
#self.log( "%d returned on %s" % (pid, self.host))
foundCursor = False
for line in outLines:
match = re.search("^-- cursor: ([^.]+)", line)
if match:
foundCursor = True
last_offset = self.offset
self.offset = match.group(1).strip()
self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset))
else:
self.cache.append(line)
if self.limit and not foundCursor:
self.hitLimit = True
self.debug("Got %d lines but no cursor: %s" % (len(outLines), self.offset))
# Get the current cursor
(rc, outLines) = self.rsh(self.host, "journalctl -q -n 0 --show-cursor", stdout=None, silent=True, synchronous=True)
for line in outLines:
match = re.search("^-- cursor: ([^.]+)", line)
if match:
last_offset = self.offset
self.offset = match.group(1).strip()
self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset))
else:
self.log("Not a new cursor: %s" % line)
self.cache.append(line)
if self.delegate:
self.delegate.async_complete(pid, returncode, self.cache, errLines)
def harvest_async(self, delegate=None):
self.delegate = delegate
self.cache = []
# Use --lines to prevent journalctl from overflowing the Popen input buffer
if self.limit and self.hitLimit:
return None
elif self.limit:
command = "journalctl -q --after-cursor='%s' --until '%s' --lines=200 --show-cursor" % (self.offset, self.limit)
else:
command = "journalctl -q --after-cursor='%s' --lines=200 --show-cursor" % (self.offset)
if self.offset == "EOF":
command = "journalctl -q -n 0 --show-cursor"
return self.rsh.call_async(self.host, command, completionDelegate=self)
def setend(self):
if self.limit:
return
self.hitLimit = False
(rc, lines) = self.rsh(self.host, "date +'%Y-%m-%d %H:%M:%S'", stdout=None, silent=True)
- for line in lines:
- self.limit = line.strip()
+ if (rc == 0) and (len(lines) == 1):
+ self.limit = lines[0].strip()
self.debug("Set limit to: %s" % self.limit)
-
+ else:
+ self.debug("Unable to set limit for %s because date returned %d lines with status %d" % (self.host,
+ len(lines), rc))
return
class LogWatcher(RemoteExec):
'''This class watches logs for messages that fit certain regular
expressions. Watching logs for events isn't the ideal way
to do business, but it's better than nothing :-)
On the other hand, this class is really pretty cool ;-)
The way you use this class is as follows:
Construct a LogWatcher object
Call setwatch() when you want to start watching the log
Call look() to scan the log looking for the patterns
'''
def __init__(self, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False, hosts=None, kind=None):
'''This is the constructor for the LogWatcher class. It takes a
log name to watch, and a list of regular expressions to watch for."
'''
self.logger = LogFactory()
self.name = name
self.regexes = regexes
self.debug_level = debug_level
self.whichmatch = -1
self.unmatched = None
self.cache_lock = threading.Lock()
self.file_list = []
self.line_cache = []
# Validate our arguments. Better sooner than later ;-)
for regex in regexes:
assert re.compile(regex)
if kind:
self.kind = kind
else:
raise
self.kind = self.Env["LogWatcher"]
if log:
self.filename = log
else:
raise
self.filename = self.Env["LogFileName"]
if hosts:
self.hosts = hosts
else:
raise
self.hosts = self.Env["nodes"]
if trace_lw:
self.debug_level = 3
silent = False
if not silent:
for regex in self.regexes:
self.debug("Looking for regex: "+regex)
self.Timeout = int(timeout)
self.returnonlymatch = None
def debug(self, args):
message = "lw: %s: %s" % (self.name, args)
self.logger.debug(message)
def setwatch(self):
'''Mark the place to start watching the log from.
'''
if self.kind == "remote":
for node in self.hosts:
self.file_list.append(FileObj(self.filename, node, self.name))
elif self.kind == "journal":
for node in self.hosts:
self.file_list.append(JournalObj(node, self.name))
else:
self.file_list.append(FileObj(self.filename))
# print "%s now has %d files" % (self.name, len(self.file_list))
def __del__(self):
if self.debug_level > 1: self.debug("Destroy")
def ReturnOnlyMatch(self, onlymatch=1):
'''Specify one or more subgroups of the match to return rather than the whole string
http://www.python.org/doc/2.5.2/lib/match-objects.html
'''
self.returnonlymatch = onlymatch
def async_complete(self, pid, returncode, outLines, errLines):
# TODO: Probably need a lock for updating self.line_cache
self.logger.debug("%s: Got %d lines from %d (total %d)" % (self.name, len(outLines), pid, len(self.line_cache)))
if len(outLines):
self.cache_lock.acquire()
self.line_cache.extend(outLines)
self.cache_lock.release()
def __get_lines(self, timeout):
count=0
if not len(self.file_list):
raise ValueError("No sources to read from")
pending = []
#print "%s waiting for %d operations" % (self.name, self.pending)
for f in self.file_list:
t = f.harvest_async(self)
if t:
pending.append(t)
for t in pending:
t.join(60.0)
if t.isAlive():
self.logger.log("%s: Aborting after 20s waiting for %s logging commands" % (self.name, repr(t)))
return
#print "Got %d lines" % len(self.line_cache)
def end(self):
for f in self.file_list:
f.end()
def look(self, timeout=None, silent=False):
'''Examine the log looking for the given patterns.
It starts looking from the place marked by setwatch().
This function looks in the file in the fashion of tail -f.
It properly recovers from log file truncation, but not from
removing and recreating the log. It would be nice if it
recovered from this as well :-)
We return the first line which matches any of our patterns.
'''
if timeout == None: timeout = self.Timeout
if trace_lw:
silent = False
lines=0
needlines=True
begin=time.time()
end=begin+timeout+1
if self.debug_level > 2: self.debug("starting single search: timeout=%d, begin=%d, end=%d" % (timeout, begin, end))
if not self.regexes:
self.debug("Nothing to look for")
return None
if timeout == 0:
for f in self.file_list:
f.setend()
while True:
if len(self.line_cache):
lines += 1
self.cache_lock.acquire()
line = self.line_cache[0]
self.line_cache.remove(line)
self.cache_lock.release()
which=-1
if re.search("CTS:", line):
continue
if self.debug_level > 2: self.debug("Processing: "+ line)
for regex in self.regexes:
which=which+1
if self.debug_level > 3: self.debug("Comparing line to: "+ regex)
#import string
#matchobj = re.search(string.lower(regex), string.lower(line))
matchobj = re.search(regex, line)
if matchobj:
self.whichmatch=which
if self.returnonlymatch:
return matchobj.group(self.returnonlymatch)
else:
self.debug("Matched: "+line)
if self.debug_level > 1: self.debug("With: "+ regex)
return line
elif timeout > 0 and end < time.time():
if self.debug_level > 1: self.debug("hit timeout: %d" % timeout)
timeout = 0
for f in self.file_list:
f.setend()
else:
self.__get_lines(timeout)
if len(self.line_cache) == 0 and end < time.time():
self.debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines))
return None
else:
self.debug("Waiting: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), len(self.line_cache)))
time.sleep(1)
self.debug("How did we get here")
return None
def lookforall(self, timeout=None, allow_multiple_matches=None, silent=False):
'''Examine the log looking for ALL of the given patterns.
It starts looking from the place marked by setwatch().
We return when the timeout is reached, or when we have found
ALL of the regexes that were part of the watch
'''
if timeout == None: timeout = self.Timeout
save_regexes = self.regexes
returnresult = []
if trace_lw:
silent = False
if not silent:
self.debug("starting search: timeout=%d" % timeout)
for regex in self.regexes:
if self.debug_level > 2: self.debug("Looking for regex: "+regex)
while (len(self.regexes) > 0):
oneresult = self.look(timeout)
if not oneresult:
self.unmatched = self.regexes
self.matched = returnresult
self.regexes = save_regexes
self.end()
return None
returnresult.append(oneresult)
if not allow_multiple_matches:
del self.regexes[self.whichmatch]
else:
# Allow multiple regexes to match a single line
tmp_regexes = self.regexes
self.regexes = []
which = 0
for regex in tmp_regexes:
matchobj = re.search(regex, oneresult)
if not matchobj:
self.regexes.append(regex)
self.unmatched = None
self.matched = returnresult
self.regexes = save_regexes
return returnresult