diff --git a/cts/remote.py b/cts/remote.py index 4358ad528c..4ce671019b 100644 --- a/cts/remote.py +++ b/cts/remote.py @@ -1,219 +1,229 @@ ''' Classes related to running command remotely ''' __copyright__=''' Copyright (C) 2014 Andrew Beekhof Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. import types, string, select, sys, time, re, os, struct, signal import time, syslog, random, traceback, base64, pickle, binascii, fcntl from cts.logging import LogFactory from socket import gethostbyname_ex from UserDict import UserDict from subprocess import Popen,PIPE pdir=os.path.dirname(sys.path[0]) sys.path.insert(0, pdir) # So that things work from the source directory from cts.CTSvars import * from cts.logging import * from threading import Thread trace_rsh=None trace_lw=None class AsyncWaitProc(Thread): - def __init__(self, proc, node, command): + def __init__(self, proc, node, command, completionDelegate=None): self.proc = proc self.node = node self.command = command self.logger = LogFactory() + self.delegate = completionDelegate; Thread.__init__(self) def run(self): + outLines = None + errLines = None self.logger.debug("cmd: async: target=%s, pid=%d: %s" % (self.node, self.proc.pid, self.command)) self.proc.wait() self.logger.debug("cmd: pid %d returned %d" % (self.proc.pid, self.proc.returncode)) if self.proc.stderr: - lines = self.proc.stderr.readlines() + errLines = self.proc.stderr.readlines() self.proc.stderr.close() - for line in lines: + for line in errLines: self.logger.debug("cmd: stderr[%d]: %s" % (self.proc.pid, line)) if self.proc.stdout: - lines = self.proc.stdout.readlines() + outLines = self.proc.stdout.readlines() self.proc.stdout.close() - for line in lines: + for line in outLines: self.logger.debug("cmd: stdout[%d]: %s" % (self.proc.pid, line)) + if self.delegate: + self.delegate.async_complete(self.proc.pid, self.proc.returncode, outLines, errLines) + class RemotePrimitives: def __init__(self, Command=None, CpCommand=None): if CpCommand: self.CpCommand = CpCommand else: # -B: batch mode, -q: no stats (quiet) self.CpCommand = "scp -B -q" if Command: self.Command = Command else: # -n: no stdin, -x: no X11, # -o ServerAliveInterval=5 disconnect after 3*5s if the server stops responding self.Command = "ssh -l root -n -x -o ServerAliveInterval=5 -o ConnectTimeout=10 -o TCPKeepAlive=yes -o ServerAliveCountMax=3 " class RemoteExec: '''This is an abstract remote execution class. It runs a command on another machine - somehow. The somehow is up to us. This particular class uses ssh. Most of the work is done by fork/exec of ssh or scp. ''' def __init__(self, rsh, silent=False): print repr(self) self.async = [] self.rsh = rsh self.silent = silent self.logger = LogFactory() if trace_rsh: self.silent = False self.OurNode=string.lower(os.uname()[1]) def _fixcmd(self, cmd): return re.sub("\'", "'\\''", cmd) def _cmd(self, *args): '''Compute the string that will run the given command on the given remote system''' args= args[0] sysname = args[0] command = args[1] #print "sysname: %s, us: %s" % (sysname, self.OurNode) if sysname == None or string.lower(sysname) == self.OurNode or sysname == "localhost": ret = command else: ret = self.rsh.Command + " " + sysname + " '" + self._fixcmd(command) + "'" #print ("About to run %s\n" % ret) return ret def log(self, args): if not self.silent: self.logger.log(args) def debug(self, args): if not self.silent: self.logger.debug(args) - def __call__(self, node, command, stdout=0, synchronous=1, silent=False, blocking=True): + def __call__(self, node, command, stdout=0, synchronous=1, silent=False, blocking=True, completionDelegate=None): '''Run the given command on the given remote system If you call this class like a function, this is the function that gets called. It just runs it roughly as though it were a system() call on the remote machine. The first argument is name of the machine to run it on. ''' if trace_rsh: silent = False rc = 0 result = None proc = Popen(self._cmd([node, command]), stdout = PIPE, stderr = PIPE, close_fds = True, shell = True) + #if completionDelegate: print "Waiting for %d on %s: %s" % (proc.pid, node, command) if not synchronous and proc.pid > 0 and not self.silent: - aproc = AsyncWaitProc(proc, node, command) + aproc = AsyncWaitProc(proc, node, command, completionDelegate=completionDelegate) aproc.start() return 0 #if not blocking: # fcntl.fcntl(proc.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) if proc.stdout: if stdout == 1: result = proc.stdout.readline() else: result = proc.stdout.readlines() proc.stdout.close() else: self.log("No stdout stream") rc = proc.wait() if not silent: self.debug("cmd: target=%s, rc=%d: %s" % (node, rc, command)) - if stdout == 1: return result if proc.stderr: errors = proc.stderr.readlines() proc.stderr.close() - if not silent: - for err in errors: - if stdout == 3: - result.append("error: "+err) - else: - self.debug("cmd: stderr: %s" % err) + + if completionDelegate: + completionDelegate.async_complete(proc.pid, proc.returncode, result, errors) + + if not silent: + for err in errors: + if stdout == 3: + result.append("error: "+err) + else: + self.debug("cmd: stderr: %s" % err) if stdout == 0: if not silent and result: for line in result: self.debug("cmd: stdout: %s" % line) return rc return (rc, result) def cp(self, source, target, silent=False): '''Perform a remote copy''' cpstring = self.rsh.CpCommand + " \'" + source + "\'" + " \'" + target + "\'" rc = os.system(cpstring) if trace_rsh: silent = False if not silent: self.debug("cmd: rc=%d: %s" % (rc, cpstring)) return rc class RemoteFactory: # Class variables rsh = RemotePrimitives() instance = None def getInstance(self): if not RemoteFactory.instance: RemoteFactory.instance = RemoteExec(RemoteFactory.rsh, False) return RemoteFactory.instance def new(self, silent=False): return RemoteExec(RemoteFactory.rsh, silent) def enable_qarsh(self): # http://nstraz.wordpress.com/2008/12/03/introducing-qarsh/ print "Using QARSH for connections to cluster nodes" rsh.Command = "qarsh -t 300 -l root" rsh.CpCommand = "qacp -q" diff --git a/cts/watcher.py b/cts/watcher.py index 7567cee875..41b448180e 100644 --- a/cts/watcher.py +++ b/cts/watcher.py @@ -1,425 +1,478 @@ ''' Classes related to searching logs ''' __copyright__=''' Copyright (C) 2014 Andrew Beekhof Licensed under the GNU GPL. ''' # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. import types, string, select, sys, time, re, os, struct, signal import time, syslog, random, traceback, base64, pickle, binascii, fcntl from cts.remote import * from cts.logging import * has_log_watcher = {} log_watcher_bin = CTSvars.CRM_DAEMON_DIR + "/cts_log_watcher.py" log_watcher = """ import sys, os, fcntl ''' Remote logfile reader for CTS Reads a specified number of lines from the supplied offset Returns the current offset Contains logic for handling truncation ''' limit = 0 offset = 0 prefix = '' filename = '/var/log/messages' skipthis=None args=sys.argv[1:] for i in range(0, len(args)): if skipthis: skipthis=None continue elif args[i] == '-l' or args[i] == '--limit': skipthis=1 limit = int(args[i+1]) elif args[i] == '-f' or args[i] == '--filename': skipthis=1 filename = args[i+1] elif args[i] == '-o' or args[i] == '--offset': skipthis=1 offset = args[i+1] elif args[i] == '-p' or args[i] == '--prefix': skipthis=1 prefix = args[i+1] elif args[i] == '-t' or args[i] == '--tag': skipthis=1 if not os.access(filename, os.R_OK): print prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0) sys.exit(1) logfile=open(filename, 'r') logfile.seek(0, os.SEEK_END) newsize=logfile.tell() if offset != 'EOF': offset = int(offset) if newsize >= offset: logfile.seek(offset) else: print prefix + ('File truncated from %d to %d' % (offset, newsize)) if (newsize*1.05) < offset: logfile.seek(0) # else: we probably just lost a few logs after a fencing op # continue from the new end # TODO: accept a timestamp and discard all messages older than it # Don't block when we reach EOF fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) count = 0 while True: if logfile.tell() >= newsize: break elif limit and count >= limit: break line = logfile.readline() if not line: break print line.strip() count += 1 print prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count) logfile.close() """ class SearchObj: def __init__(self, filename, host=None, name=None): self.logger = LogFactory() self.host = host self.name = name self.filename = filename self.rsh = RemoteFactory().getInstance() self.offset = "EOF" if host == None: host = "localhost" def __str__(self): if self.host: return "%s:%s" % (self.host, self.filename) return self.filename def log(self, args): message = "lw: %s: %s" % (self, args) self.logger.log(message) def debug(self, args): message = "lw: %s: %s" % (self, args) self.logger.debug(message) def next(self): self.log("Not implemented") class FileObj(SearchObj): def __init__(self, filename, host=None, name=None): global has_log_watcher SearchObj.__init__(self, filename, host, name) if not has_log_watcher.has_key(host): global log_watcher global log_watcher_bin self.debug("Installing %s on %s" % (log_watcher_bin, host)) self.rsh(host, '''echo "%s" > %s''' % (log_watcher, log_watcher_bin), silent=True) has_log_watcher[host] = 1 self.next() - def next(self): - cache = [] - - global log_watcher_bin - (rc, lines) = self.rsh( - self.host, - "python %s -t %s -p CTSwatcher: -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset), - stdout=None, silent=True, blocking=False) - - for line in lines: + def async_complete(self, pid, returncode, outLines, errLines): + for line in outLines: match = re.search("^CTSwatcher:Last read: (\d+)", line) if match: last_offset = self.offset self.offset = match.group(1) #if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(lines), self.offset)) elif re.search("^CTSwatcher:.*truncated", line): self.log(line) elif re.search("^CTSwatcher:", line): self.debug("Got control line: "+ line) else: - cache.append(line) + self.cache.append(line) + + self.in_progress = False + if self.delegate: + self.delegate.async_complete(pid, returncode, self.cache, errLines) + + def next(self, delegate=None): + self.cache = [] + self.in_progress = True + + dosync = True + if delegate: + dosync = False + self.delegate = delegate + else: + delegate = self + self.delegate = None + + global log_watcher_bin + self.rsh(self.host, + "python %s -t %s -p CTSwatcher: -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset), + stdout=None, silent=True, synchronous=dosync, completionDelegate=self) - return cache + if delegate: + return [] + + while self.in_progress: + time.sleep(1) + + return self.cache class JournalObj(SearchObj): def __init__(self, host=None, name=None): SearchObj.__init__(self, name, host, name) self.next() - def next(self): - cache = [] - command = "journalctl -q --after-cursor='%s' --show-cursor" % (self.offset) - if self.offset == "EOF": - command = "journalctl -q -n 0 --show-cursor" - - (rc, lines) = self.rsh(self.host, command, stdout=None, silent=True, blocking=False) - - for line in lines: + def async_complete(self, pid, returncode, outLines, errLines): + #print "%d returned on %s" % (pid, self.host) + for line in outLines: match = re.search("^-- cursor: ([^.]+)", line) if match: last_offset = self.offset - self.offset = match.group(1) - self.debug("Got %d lines, new cursor: %s" % (len(lines), self.offset)) + self.offset = match.group(1).strip() + self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset)) else: - cache.append(line) + self.cache.append(line) + + self.in_progress = False + if self.delegate: + self.delegate.async_complete(pid, returncode, self.cache, errLines) + + def next(self, delegate=None): + self.cache = [] + self.in_progress = True + + dosync = True + if delegate: + dosync = False + self.delegate = delegate + else: + delegate = self + self.delegate = None - return cache + # Use --lines to prevent journalctl from overflowing the Popen input buffer + command = "journalctl -q --after-cursor='%s' --lines=500 --show-cursor" % (self.offset) + if self.offset == "EOF": + command = "journalctl -q -n 0 --show-cursor" + + self.rsh(self.host, command, stdout=None, silent=True, synchronous=dosync, completionDelegate=self) + if delegate: + return [] + + while self.in_progress: + time.sleep(1) + + return self.cache class LogWatcher(RemoteExec): '''This class watches logs for messages that fit certain regular expressions. Watching logs for events isn't the ideal way to do business, but it's better than nothing :-) On the other hand, this class is really pretty cool ;-) The way you use this class is as follows: Construct a LogWatcher object Call setwatch() when you want to start watching the log Call look() to scan the log looking for the patterns ''' def __init__(self, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False, hosts=None, kind=None): '''This is the constructor for the LogWatcher class. It takes a log name to watch, and a list of regular expressions to watch for." ''' self.logger = LogFactory() # Validate our arguments. Better sooner than later ;-) for regex in regexes: assert re.compile(regex) if kind: self.kind = kind else: raise self.kind = self.Env["LogWatcher"] if log: self.filename = log else: raise self.filename = self.Env["LogFileName"] self.name = name self.regexes = regexes self.debug_level = debug_level self.whichmatch = -1 self.unmatched = None self.file_list = [] self.line_cache = [] if hosts: self.hosts = hosts else: self.hosts = self.Env["nodes"] if trace_lw: self.debug_level = 3 silent = False if not silent: for regex in self.regexes: self.debug("Looking for regex: "+regex) self.Timeout = int(timeout) self.returnonlymatch = None def debug(self, args): message = "lw: %s: %s" % (self.name, args) self.logger.debug(message) def setwatch(self): '''Mark the place to start watching the log from. ''' if self.kind == "remote": for node in self.hosts: self.file_list.append(FileObj(self.filename, node, self.name)) elif self.kind == "journal": for node in self.hosts: self.file_list.append(JournalObj(node, self.name)) else: self.file_list.append(FileObj(self.filename)) def __del__(self): if self.debug_level > 1: self.debug("Destroy") def ReturnOnlyMatch(self, onlymatch=1): '''Specify one or more subgroups of the match to return rather than the whole string http://www.python.org/doc/2.5.2/lib/match-objects.html ''' self.returnonlymatch = onlymatch + def async_complete(self, pid, returncode, outLines, errLines): + self.pending = self.pending - 1 + if len(outLines): + self.line_cache.extend(outLines) + #print "Got %d lines from %d" % (len(outLines), pid) + def __get_lines(self): if not len(self.file_list): raise ValueError("No sources to read from") + self.pending = len(self.file_list) + #print "%s waiting for %d operations" % (self.name, self.pending) for f in self.file_list: - lines = f.next() - if len(lines): - self.line_cache.extend(lines) + lines = f.next(delegate=self) + + while self.pending > 0: + #print "waiting for %d more" % self.pending + time.sleep(1) + + #print "Got %d lines" % len(self.line_cache) def look(self, timeout=None, silent=False): '''Examine the log looking for the given patterns. It starts looking from the place marked by setwatch(). This function looks in the file in the fashion of tail -f. It properly recovers from log file truncation, but not from removing and recreating the log. It would be nice if it recovered from this as well :-) We return the first line which matches any of our patterns. ''' if timeout == None: timeout = self.Timeout if trace_lw: silent = False lines=0 needlines=True begin=time.time() end=begin+timeout+1 if self.debug_level > 2: self.debug("starting single search: timeout=%d, begin=%d, end=%d" % (timeout, begin, end)) if not self.regexes: self.debug("Nothing to look for") return None while True: if len(self.line_cache): lines += 1 line = self.line_cache[0] self.line_cache.remove(line) which=-1 if re.search("CTS:", line): continue if self.debug_level > 2: self.debug("Processing: "+ line) for regex in self.regexes: which=which+1 if self.debug_level > 3: self.debug("Comparing line to: "+ regex) #matchobj = re.search(string.lower(regex), string.lower(line)) matchobj = re.search(regex, line) if matchobj: self.whichmatch=which if self.returnonlymatch: return matchobj.group(self.returnonlymatch) else: self.debug("Matched: "+line) if self.debug_level > 1: self.debug("With: "+ regex) return line elif timeout > 0 and end > time.time(): if self.debug_level > 1: self.debug("lines during timeout") time.sleep(1) self.__get_lines() elif needlines: # Grab any relevant messages that might have arrived since # the last time the buffer was populated if self.debug_level > 1: self.debug("lines without timeout") self.__get_lines() # Don't come back here again needlines = False else: self.debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines)) return None self.debug("How did we get here") return None def lookforall(self, timeout=None, allow_multiple_matches=None, silent=False): '''Examine the log looking for ALL of the given patterns. It starts looking from the place marked by setwatch(). We return when the timeout is reached, or when we have found ALL of the regexes that were part of the watch ''' if timeout == None: timeout = self.Timeout save_regexes = self.regexes returnresult = [] if trace_lw: silent = False if not silent: self.debug("starting search: timeout=%d" % timeout) for regex in self.regexes: if self.debug_level > 2: self.debug("Looking for regex: "+regex) while (len(self.regexes) > 0): oneresult = self.look(timeout) if not oneresult: self.unmatched = self.regexes self.matched = returnresult self.regexes = save_regexes return None returnresult.append(oneresult) if not allow_multiple_matches: del self.regexes[self.whichmatch] else: # Allow multiple regexes to match a single line tmp_regexes = self.regexes self.regexes = [] which = 0 for regex in tmp_regexes: matchobj = re.search(regex, oneresult) if not matchobj: self.regexes.append(regex) self.unmatched = None self.matched = returnresult self.regexes = save_regexes return returnresult