Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/cts/watcher.py b/cts/watcher.py
index 41b448180e..4b0519dfe1 100644
--- a/cts/watcher.py
+++ b/cts/watcher.py
@@ -1,478 +1,478 @@
'''
Classes related to searching logs
'''
__copyright__='''
Copyright (C) 2014 Andrew Beekhof <andrew@beekhof.net>
Licensed under the GNU GPL.
'''
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
import types, string, select, sys, time, re, os, struct, signal
import time, syslog, random, traceback, base64, pickle, binascii, fcntl
from cts.remote import *
from cts.logging import *
has_log_watcher = {}
log_watcher_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_watcher.py"
log_watcher = """
import sys, os, fcntl
'''
Remote logfile reader for CTS
Reads a specified number of lines from the supplied offset
Returns the current offset
Contains logic for handling truncation
'''
limit = 0
offset = 0
prefix = ''
filename = '/var/log/messages'
skipthis=None
args=sys.argv[1:]
for i in range(0, len(args)):
if skipthis:
skipthis=None
continue
elif args[i] == '-l' or args[i] == '--limit':
skipthis=1
limit = int(args[i+1])
elif args[i] == '-f' or args[i] == '--filename':
skipthis=1
filename = args[i+1]
elif args[i] == '-o' or args[i] == '--offset':
skipthis=1
offset = args[i+1]
elif args[i] == '-p' or args[i] == '--prefix':
skipthis=1
prefix = args[i+1]
elif args[i] == '-t' or args[i] == '--tag':
skipthis=1
if not os.access(filename, os.R_OK):
print prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0)
sys.exit(1)
logfile=open(filename, 'r')
logfile.seek(0, os.SEEK_END)
newsize=logfile.tell()
if offset != 'EOF':
offset = int(offset)
if newsize >= offset:
logfile.seek(offset)
else:
print prefix + ('File truncated from %d to %d' % (offset, newsize))
if (newsize*1.05) < offset:
logfile.seek(0)
# else: we probably just lost a few logs after a fencing op
# continue from the new end
# TODO: accept a timestamp and discard all messages older than it
# Don't block when we reach EOF
fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
count = 0
while True:
if logfile.tell() >= newsize: break
elif limit and count >= limit: break
line = logfile.readline()
if not line: break
print line.strip()
count += 1
print prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count)
logfile.close()
"""
class SearchObj:
def __init__(self, filename, host=None, name=None):
self.logger = LogFactory()
self.host = host
self.name = name
self.filename = filename
self.rsh = RemoteFactory().getInstance()
self.offset = "EOF"
if host == None:
host = "localhost"
def __str__(self):
if self.host:
return "%s:%s" % (self.host, self.filename)
return self.filename
def log(self, args):
message = "lw: %s: %s" % (self, args)
self.logger.log(message)
def debug(self, args):
message = "lw: %s: %s" % (self, args)
self.logger.debug(message)
def next(self):
self.log("Not implemented")
class FileObj(SearchObj):
def __init__(self, filename, host=None, name=None):
global has_log_watcher
SearchObj.__init__(self, filename, host, name)
if not has_log_watcher.has_key(host):
global log_watcher
global log_watcher_bin
self.debug("Installing %s on %s" % (log_watcher_bin, host))
self.rsh(host, '''echo "%s" > %s''' % (log_watcher, log_watcher_bin), silent=True)
has_log_watcher[host] = 1
self.next()
def async_complete(self, pid, returncode, outLines, errLines):
for line in outLines:
match = re.search("^CTSwatcher:Last read: (\d+)", line)
if match:
last_offset = self.offset
self.offset = match.group(1)
#if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(lines), self.offset))
elif re.search("^CTSwatcher:.*truncated", line):
self.log(line)
elif re.search("^CTSwatcher:", line):
self.debug("Got control line: "+ line)
else:
self.cache.append(line)
self.in_progress = False
if self.delegate:
self.delegate.async_complete(pid, returncode, self.cache, errLines)
def next(self, delegate=None):
self.cache = []
self.in_progress = True
dosync = True
if delegate:
dosync = False
self.delegate = delegate
else:
delegate = self
self.delegate = None
global log_watcher_bin
self.rsh(self.host,
"python %s -t %s -p CTSwatcher: -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset),
stdout=None, silent=True, synchronous=dosync, completionDelegate=self)
if delegate:
return []
while self.in_progress:
time.sleep(1)
return self.cache
class JournalObj(SearchObj):
def __init__(self, host=None, name=None):
SearchObj.__init__(self, name, host, name)
self.next()
def async_complete(self, pid, returncode, outLines, errLines):
#print "%d returned on %s" % (pid, self.host)
for line in outLines:
match = re.search("^-- cursor: ([^.]+)", line)
if match:
last_offset = self.offset
self.offset = match.group(1).strip()
self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset))
else:
self.cache.append(line)
self.in_progress = False
if self.delegate:
self.delegate.async_complete(pid, returncode, self.cache, errLines)
def next(self, delegate=None):
self.cache = []
self.in_progress = True
dosync = True
if delegate:
dosync = False
self.delegate = delegate
else:
delegate = self
self.delegate = None
# Use --lines to prevent journalctl from overflowing the Popen input buffer
- command = "journalctl -q --after-cursor='%s' --lines=500 --show-cursor" % (self.offset)
+ command = "journalctl -q --after-cursor='%s' --lines=200 --show-cursor" % (self.offset)
if self.offset == "EOF":
command = "journalctl -q -n 0 --show-cursor"
self.rsh(self.host, command, stdout=None, silent=True, synchronous=dosync, completionDelegate=self)
if delegate:
return []
while self.in_progress:
time.sleep(1)
return self.cache
class LogWatcher(RemoteExec):
'''This class watches logs for messages that fit certain regular
expressions. Watching logs for events isn't the ideal way
to do business, but it's better than nothing :-)
On the other hand, this class is really pretty cool ;-)
The way you use this class is as follows:
Construct a LogWatcher object
Call setwatch() when you want to start watching the log
Call look() to scan the log looking for the patterns
'''
def __init__(self, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False, hosts=None, kind=None):
'''This is the constructor for the LogWatcher class. It takes a
log name to watch, and a list of regular expressions to watch for."
'''
self.logger = LogFactory()
# Validate our arguments. Better sooner than later ;-)
for regex in regexes:
assert re.compile(regex)
if kind:
self.kind = kind
else:
raise
self.kind = self.Env["LogWatcher"]
if log:
self.filename = log
else:
raise
self.filename = self.Env["LogFileName"]
self.name = name
self.regexes = regexes
self.debug_level = debug_level
self.whichmatch = -1
self.unmatched = None
self.file_list = []
self.line_cache = []
if hosts:
self.hosts = hosts
else:
self.hosts = self.Env["nodes"]
if trace_lw:
self.debug_level = 3
silent = False
if not silent:
for regex in self.regexes:
self.debug("Looking for regex: "+regex)
self.Timeout = int(timeout)
self.returnonlymatch = None
def debug(self, args):
message = "lw: %s: %s" % (self.name, args)
self.logger.debug(message)
def setwatch(self):
'''Mark the place to start watching the log from.
'''
if self.kind == "remote":
for node in self.hosts:
self.file_list.append(FileObj(self.filename, node, self.name))
elif self.kind == "journal":
for node in self.hosts:
self.file_list.append(JournalObj(node, self.name))
else:
self.file_list.append(FileObj(self.filename))
def __del__(self):
if self.debug_level > 1: self.debug("Destroy")
def ReturnOnlyMatch(self, onlymatch=1):
'''Specify one or more subgroups of the match to return rather than the whole string
http://www.python.org/doc/2.5.2/lib/match-objects.html
'''
self.returnonlymatch = onlymatch
def async_complete(self, pid, returncode, outLines, errLines):
self.pending = self.pending - 1
if len(outLines):
self.line_cache.extend(outLines)
#print "Got %d lines from %d" % (len(outLines), pid)
def __get_lines(self):
if not len(self.file_list):
raise ValueError("No sources to read from")
self.pending = len(self.file_list)
#print "%s waiting for %d operations" % (self.name, self.pending)
for f in self.file_list:
lines = f.next(delegate=self)
while self.pending > 0:
#print "waiting for %d more" % self.pending
time.sleep(1)
#print "Got %d lines" % len(self.line_cache)
def look(self, timeout=None, silent=False):
'''Examine the log looking for the given patterns.
It starts looking from the place marked by setwatch().
This function looks in the file in the fashion of tail -f.
It properly recovers from log file truncation, but not from
removing and recreating the log. It would be nice if it
recovered from this as well :-)
We return the first line which matches any of our patterns.
'''
if timeout == None: timeout = self.Timeout
if trace_lw:
silent = False
lines=0
needlines=True
begin=time.time()
end=begin+timeout+1
if self.debug_level > 2: self.debug("starting single search: timeout=%d, begin=%d, end=%d" % (timeout, begin, end))
if not self.regexes:
self.debug("Nothing to look for")
return None
while True:
if len(self.line_cache):
lines += 1
line = self.line_cache[0]
self.line_cache.remove(line)
which=-1
if re.search("CTS:", line):
continue
if self.debug_level > 2: self.debug("Processing: "+ line)
for regex in self.regexes:
which=which+1
if self.debug_level > 3: self.debug("Comparing line to: "+ regex)
#matchobj = re.search(string.lower(regex), string.lower(line))
matchobj = re.search(regex, line)
if matchobj:
self.whichmatch=which
if self.returnonlymatch:
return matchobj.group(self.returnonlymatch)
else:
self.debug("Matched: "+line)
if self.debug_level > 1: self.debug("With: "+ regex)
return line
elif timeout > 0 and end > time.time():
if self.debug_level > 1: self.debug("lines during timeout")
time.sleep(1)
self.__get_lines()
elif needlines:
# Grab any relevant messages that might have arrived since
# the last time the buffer was populated
if self.debug_level > 1: self.debug("lines without timeout")
self.__get_lines()
# Don't come back here again
needlines = False
else:
self.debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines))
return None
self.debug("How did we get here")
return None
def lookforall(self, timeout=None, allow_multiple_matches=None, silent=False):
'''Examine the log looking for ALL of the given patterns.
It starts looking from the place marked by setwatch().
We return when the timeout is reached, or when we have found
ALL of the regexes that were part of the watch
'''
if timeout == None: timeout = self.Timeout
save_regexes = self.regexes
returnresult = []
if trace_lw:
silent = False
if not silent:
self.debug("starting search: timeout=%d" % timeout)
for regex in self.regexes:
if self.debug_level > 2: self.debug("Looking for regex: "+regex)
while (len(self.regexes) > 0):
oneresult = self.look(timeout)
if not oneresult:
self.unmatched = self.regexes
self.matched = returnresult
self.regexes = save_regexes
return None
returnresult.append(oneresult)
if not allow_multiple_matches:
del self.regexes[self.whichmatch]
else:
# Allow multiple regexes to match a single line
tmp_regexes = self.regexes
self.regexes = []
which = 0
for regex in tmp_regexes:
matchobj = re.search(regex, oneresult)
if not matchobj:
self.regexes.append(regex)
self.unmatched = None
self.matched = returnresult
self.regexes = save_regexes
return returnresult

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jan 25, 12:20 PM (20 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1322540
Default Alt Text
(15 KB)

Event Timeline