diff --git a/heartbeat/azure-events.in b/heartbeat/azure-events.in index cd34a3b0b..1182e09a1 100644 --- a/heartbeat/azure-events.in +++ b/heartbeat/azure-events.in @@ -1,887 +1,820 @@ #!@PYTHON@ -tt # # Resource agent for monitoring Azure Scheduled Events # # License: GNU General Public License (GPL) # (c) 2018 Tobias Niekamp, Microsoft Corp. # and Linux-HA contributors -import os, sys, time, subprocess +import os +import sys +import time +import subprocess import json -import urllib, urllib2, socket -import logging, syslog -from enum import Enum +try: + import urllib2 +except ImportError: + import urllib.request as urllib2 +import socket from collections import defaultdict +OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT")) +sys.path.append(OCF_FUNCTIONS_DIR) +import ocf + ############################################################################## VERSION = "0.10" -OCF_SUCCESS = 0 -OCF_ERR_GENERIC = 1 -OCF_ERR_UNIMPLEMENTED = 3 -OCF_ERR_CONFIGURED = 6 -OCF_NOT_RUNNING = 7 - attr_globalPullState = "azure-events_globalPullState" attr_lastDocVersion = "azure-events_lastDocVersion" attr_curNodeState = "azure-events_curNodeState" attr_pendingEventIDs = "azure-events_pendingEventIDs" -default_loglevel = logging.INFO +default_loglevel = ocf.logging.INFO default_relevantEventTypes = set(["Reboot", "Redeploy"]) global_pullMaxAttempts = 3 global_pullDelaySecs = 1 ############################################################################## -class SyslogLibHandler(logging.StreamHandler): - """ - A handler class that correctly push messages into syslog - """ - def emit(self, record): - syslog_level = { - logging.CRITICAL: syslog.LOG_CRIT, - logging.ERROR: syslog.LOG_ERR, - logging.WARNING: syslog.LOG_WARNING, - logging.INFO: syslog.LOG_INFO, - logging.DEBUG: syslog.LOG_DEBUG, - logging.NOTSET: syslog.LOG_DEBUG, - }[record.levelno] - msg = self.format(record) - # take care of \x00 character - syslog.syslog(syslog_level, msg.replace("\x00", "\n")) - return - -############################################################################## - class attrDict(defaultdict): """ A wrapper for accessing dict keys like an attribute """ def __init__(self, data): super(attrDict, self).__init__(attrDict) for d in data.keys(): self.__setattr__(d, data[d]) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): self[key] = value ############################################################################## class azHelper: """ Helper class for Azure's metadata API (including Scheduled Events) """ metadata_host = "http://169.254.169.254/metadata" instance_api = "instance" events_api = "scheduledevents" api_version = "2017-08-01" @staticmethod def _sendMetadataRequest(endpoint, postData=None): """ Send a request to Azure's Azure Metadata Service API """ url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version) - logging.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData)) - logging.debug("_sendMetadataRequest: url = %s" % url) + ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData)) + ocf.logger.debug("_sendMetadataRequest: url = %s" % url) req = urllib2.Request(url, postData) req.add_header("Metadata", "true") resp = urllib2.urlopen(req) data = resp.read() - logging.debug("_sendMetadataRequest: response = %s" % data) - if len(data) > 0: + ocf.logger.debug("_sendMetadataRequest: response = %s" % data) + if data: data = json.loads(data) - logging.debug("_sendMetadataRequest: finished") + ocf.logger.debug("_sendMetadataRequest: finished") return data @staticmethod def getInstanceInfo(): """ Fetch details about the current VM from Azure's Azure Metadata Service API """ - logging.debug("getInstanceInfo: begin") + ocf.logger.debug("getInstanceInfo: begin") - json = azHelper._sendMetadataRequest(azHelper.instance_api) - logging.info("getInstanceInfo: json = %s" % json) - logging.debug("getInstanceInfo: finished") + jsondata = azHelper._sendMetadataRequest(azHelper.instance_api) + ocf.logger.info("getInstanceInfo: json = %s" % jsondata) - return attrDict(json["compute"]) + ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"])) + return attrDict(jsondata["compute"]) @staticmethod def pullScheduledEvents(): """ Retrieve all currently scheduled events via Azure Metadata Service API """ - logging.debug("pullScheduledEvents: begin") + ocf.logger.debug("pullScheduledEvents: begin") - json = azHelper._sendMetadataRequest(azHelper.events_api) - logging.info("pullScheduledEvents: json = %s" % json) + jsondata = azHelper._sendMetadataRequest(azHelper.events_api) + ocf.logger.info("pullScheduledEvents: json = %s" % jsondata) - logging.debug("pullScheduledEvents: finished") - return attrDict(json) + ocf.logger.debug("pullScheduledEvents: finished") + return attrDict(jsondata) @staticmethod def forceEvents(eventIDs): """ Force a set of events to start immediately """ - logging.debug("forceEvents: begin") + ocf.logger.debug("forceEvents: begin") events = [] for e in eventIDs: events.append({ "EventId": e, }) postData = { "StartRequests" : events } - logging.info("forceEvents: postData = %s" % postData) + ocf.logger.info("forceEvents: postData = %s" % postData) resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData)) - logging.debug("forceEvents: finished") + ocf.logger.debug("forceEvents: finished") return ############################################################################## -class pcsHelper: +class clusterHelper: """ Helper functions for Pacemaker control via crm """ @staticmethod def _getLocation(node): """ Helper function to retrieve local/global attributes """ if node: return ["--node", node] else: return ["--type", "crm_config"] @staticmethod - def _exec(command, args): + def _exec(command, *args): """ Helper function to execute a UNIX command """ - logging.debug("_exec: begin; command = %s, args = %s" % (command, str(args))) + args = list(args) + ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args))) - flatten = lambda *n: (str(e) for a in n - for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),))) + def flatten(*n): + return (str(e) for a in n + for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),))) command = list(flatten([command] + args)) - logging.debug("_exec: cmd = %s" % " ".join(command)) + ocf.logger.debug("_exec: cmd = %s" % " ".join(command)) try: ret = subprocess.check_output(command) - logging.debug("_exec: return = %s" % ret) + ocf.logger.debug("_exec: return = %s" % ret) return ret.rstrip() - except Exception: - logging.warning("_exec: %s" % sys.exc_info()[0]) + except Exception as err: + ocf.logger.exception(err) return None @staticmethod def setAttr(key, value, node=None): """ Set the value of a specific global/local attribute in the Pacemaker cluster """ - logging.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node)) + ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node)) if value: - ret = pcsHelper._exec( - "crm_attribute", - ["--name", key, - "--update", value, - pcsHelper._getLocation(node)]) + ret = clusterHelper._exec("crm_attribute", + "--name", key, + "--update", value, + clusterHelper._getLocation(node)) else: - ret = pcsHelper._exec( - "crm_attribute", - ["--name", key, - "--delete", - pcsHelper._getLocation(node)]) + ret = clusterHelper._exec("crm_attribute", + "--name", key, + "--delete", + clusterHelper._getLocation(node)) - logging.debug("setAttr: finished") + ocf.logger.debug("setAttr: finished") return len(ret) == 0 @staticmethod def getAttr(key, node=None): """ Retrieve a global/local attribute from the Pacemaker cluster """ - logging.debug("getAttr: begin; key = %s, node = %s" % (key, node)) + ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node)) - val = pcsHelper._exec( - "crm_attribute", - ["--name", key, - "--query", "--quiet", - pcsHelper._getLocation(node)]) + val = clusterHelper._exec("crm_attribute", + "--name", key, + "--query", "--quiet", + clusterHelper._getLocation(node)) + ocf.logger.debug("getAttr: finished") if not val: - ret = None - else: - ret = val if not val.isdigit() else int(val) - - logging.debug("getAttr: finished") - return ret + return None + return val if not val.isdigit() else int(val) @staticmethod def getAllNodes(): """ Get a list of hostnames for all nodes in the Pacemaker cluster """ - logging.debug("getAllNodes: begin") + ocf.logger.debug("getAllNodes: begin") nodes = [] - nodeList = pcsHelper._exec( - "crm_node", - ["--list"]) + nodeList = clusterHelper._exec("crm_node", "--list") for n in nodeList.split("\n"): nodes.append(n.split()[1]) - logging.debug("getAllNodes: finished; return %s" % str(nodes)) + ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes)) return nodes @staticmethod def getHostNameFromAzName(azName): """ Helper function to get the actual host name from an Azure node name """ - return pcsHelper.getAttr("hostName_%s" % azName) + return clusterHelper.getAttr("hostName_%s" % azName) @staticmethod def removeHoldFromNodes(): """ Remove the ON_HOLD state from all nodes in the Pacemaker cluster """ - logging.debug("removeHoldFromNodes: begin") + ocf.logger.debug("removeHoldFromNodes: begin") - for n in pcsHelper.getAllNodes(): - if pcsHelper.getAttr(attr_curNodeState, node=n) == pcsNodeState.ON_HOLD.name: - pcsHelper.setAttr(attr_curNodeState, pcsNodeState.AVAILABLE.name, node=n) - logging.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n) + for n in clusterHelper.getAllNodes(): + if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD": + clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n) + ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n) - logging.debug("removeHoldFromNodes: finished") + ocf.logger.debug("removeHoldFromNodes: finished") return False @staticmethod def otherNodesAvailable(exceptNode): """ Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE """ - logging.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode) + ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode) - for n in pcsHelper.getAllNodes(): - state = pcsHelper.getAttr(attr_curNodeState, node=n) - if state: - state = pcsNodeState[state] - else: - state = pcsNodeState.AVAILABLE - if state == pcsNodeState.AVAILABLE and n != exceptNode.hostName: - logging.info("otherNodesAvailable: at least %s is available" % n) - logging.debug("otherNodesAvailable: finished") + for n in clusterHelper.getAllNodes(): + state = clusterHelper.getAttr(attr_curNodeState, node=n) + state = stringToNodeState(state) if state else AVAILABLE + if state == AVAILABLE and n != exceptNode.hostName: + ocf.logger.info("otherNodesAvailable: at least %s is available" % n) + ocf.logger.debug("otherNodesAvailable: finished") return True - logging.info("otherNodesAvailable: no other nodes are available") - logging.debug("otherNodesAvailable: finished") + ocf.logger.info("otherNodesAvailable: no other nodes are available") + ocf.logger.debug("otherNodesAvailable: finished") return False @staticmethod def transitionSummary(): """ Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby) """ # Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node? # # crm_simulate -Ls # Transition Summary: # * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1) # * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0) # * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1) # * Start rsc_nc_HN1_HDB03 (hsr3-db1) # # Excepted result when there are no pending actions: # Transition Summary: - logging.debug("transitionSummary: begin") + ocf.logger.debug("transitionSummary: begin") - summary = pcsHelper._exec( - "crm_simulate", - ["-Ls"] - ) + summary = clusterHelper._exec("crm_simulate", "-Ls") if not summary: - logging.warning("transitionSummary: could not load transition summary") + ocf.logger.warning("transitionSummary: could not load transition summary") return False if summary.find("Transition Summary:") < 0: - logging.warning("transitionSummary: received unexpected transition summary: %s" % summary) + ocf.logger.warning("transitionSummary: received unexpected transition summary: %s" % summary) return False summary = summary.split("Transition Summary:")[1] ret = summary.split("\n").pop(0) - logging.debug("transitionSummary: finished; return = %s" % str(ret)) + ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret)) return ret @staticmethod def listOperationsOnNode(node): """ Get a list of all current operations for a given node (used to check if any resources are pending) """ # hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0 # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete # rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending # rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete - logging.debug("listOperationsOnNode: begin; node = %s" % node) + ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node) - resources = pcsHelper._exec( - "crm_resource", - ["--list-operations", - "-N", node] - ) + resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node) if len(resources) == 0: ret = [] else: ret = resources.split("\n") - logging.debug("listOperationsOnNode: finished; return = %s" % str(ret)) + ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret)) return ret @staticmethod def noPendingResourcesOnNode(node): """ Check that there are no pending resources on a given node """ - logging.debug("noPendingResourcesOnNode: begin; node = %s" % node) + ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node) - for r in pcsHelper.listOperationsOnNode(node): - logging.debug("noPendingResourcesOnNode: * %s" % r) + for r in clusterHelper.listOperationsOnNode(node): + ocf.logger.debug("noPendingResourcesOnNode: * %s" % r) resource = r.split()[-1] if resource == "pending": - logging.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource) - logging.debug("noPendingResourcesOnNode: finished; return = False") + ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource) + ocf.logger.debug("noPendingResourcesOnNode: finished; return = False") return False - logging.info("noPendingResourcesOnNode: no pending resources on node %s" % node) - logging.debug("noPendingResourcesOnNode: finished; return = True") + ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node) + ocf.logger.debug("noPendingResourcesOnNode: finished; return = True") return True @staticmethod def allResourcesStoppedOnNode(node): """ Check that all resources on a given node are stopped """ - logging.debug("allResourcesStoppedOnNode: begin; node = %s" % node) + ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node) - if pcsHelper.noPendingResourcesOnNode(node): - if len(pcsHelper.transitionSummary()) == 0: - logging.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node) - logging.debug("allResourcesStoppedOnNode: finished; return = True") + if clusterHelper.noPendingResourcesOnNode(node): + if len(clusterHelper.transitionSummary()) == 0: + ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node) + ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True") return True - else: - logging.info("allResourcesStoppedOnNode: transition summary is not empty") - logging.debug("allResourcesStoppedOnNode: finished; return = False") - return False + ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty") + ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") + return False - logging.info("allResourcesStoppedOnNode: still pending resources on node %s" % node) - logging.debug("allResourcesStoppedOnNode: finished; return = False") - return False + ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node) + ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") + return False ############################################################################## -class pcsNodeState(Enum): - AVAILABLE = 0 # Node is online and ready to handle events - STOPPING = 1 # Standby has been triggered, but some resources are still running - IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service - ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available +AVAILABLE = 0 # Node is online and ready to handle events +STOPPING = 1 # Standby has been triggered, but some resources are still running +IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service +ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available + +def stringToNodeState(name): + if type(name) == int: return name + if name == "STOPPING": return STOPPING + if name == "IN_EVENT": return IN_EVENT + if name == "ON_HOLD": return ON_HOLD + return AVAILABLE + +def nodeStateToString(state): + if state == STOPPING: return "STOPPING" + if state == IN_EVENT: return "IN_EVENT" + if state == ON_HOLD: return "ON_HOLD" + return "AVAILABLE" ############################################################################## -class pcsNode: +class Node: """ Core class implementing logic for a cluster node """ def __init__(self, ra): - self.raOwner = ra + self.raOwner = ra self.azInfo = azHelper.getInstanceInfo() self.azName = self.azInfo.name self.hostName = socket.gethostname() self.setAttr("azName", self.azName) - pcsHelper.setAttr("hostName_%s" % self.azName, self.hostName) + clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName) def getAttr(self, key): """ Get a local attribute """ - return pcsHelper.getAttr(key, node=self.hostName) + return clusterHelper.getAttr(key, node=self.hostName) def setAttr(self, key, value): """ Set a local attribute """ - return pcsHelper.setAttr(key, value, node=self.hostName) + return clusterHelper.setAttr(key, value, node=self.hostName) def selfOrOtherNode(self, node): """ Helper function to distinguish self/other node """ - if not node: - return self.hostName - else: - return node + return node if node else self.hostName def setState(self, state, node=None): """ Set the state for a given node (or self) """ node = self.selfOrOtherNode(node) - logging.debug("setState: begin; node = %s, state = %s" % (node, state.name)) + ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state))) - pcsHelper.setAttr(attr_curNodeState, state.name, node=node) + clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node) - logging.debug("setState: finished") - return + ocf.logger.debug("setState: finished") def getState(self, node=None): """ Get the state for a given node (or self) """ node = self.selfOrOtherNode(node) - logging.debug("getState: begin; node = %s" % node) + ocf.logger.debug("getState: begin; node = %s" % node) - state = pcsHelper.getAttr(attr_curNodeState, node=node) - logging.debug("getState: state = %s" % state) - logging.debug("getState: finished") + state = clusterHelper.getAttr(attr_curNodeState, node=node) + ocf.logger.debug("getState: state = %s" % state) + ocf.logger.debug("getState: finished") if not state: - return pcsNodeState(pcsNodeState.AVAILABLE) - else: - return pcsNodeState[state] + return AVAILABLE + return stringToNodeState(state) def setEventIDs(self, eventIDs, node=None): """ Set pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) - logging.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs))) + ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs))) if eventIDs: eventIDStr = ",".join(eventIDs) else: eventIDStr = None - pcsHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node) + clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node) - logging.debug("setEventIDs: finished") + ocf.logger.debug("setEventIDs: finished") return def getEventIDs(self, node=None): """ Get pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) - logging.debug("getEventIDs: begin; node = %s" % node) + ocf.logger.debug("getEventIDs: begin; node = %s" % node) - eventIDStr = pcsHelper.getAttr(attr_pendingEventIDs, node=node) + eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node) if eventIDStr: eventIDs = eventIDStr.split(",") else: eventIDs = None - logging.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs)) + ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs)) return eventIDs def updateNodeStateAndEvents(self, state, eventIDs, node=None): """ Set the state and pending EventIDs for a given node (or self) """ - logging.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, state.name, str(eventIDs))) + ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs))) self.setState(state, node=node) self.setEventIDs(eventIDs, node=node) - logging.debug("updateNodeStateAndEvents: finished") + ocf.logger.debug("updateNodeStateAndEvents: finished") return state def putNodeStandby(self, node=None): """ Put self to standby """ node = self.selfOrOtherNode(node) - logging.debug("putNodeStandby: begin; node = %s" % node) + ocf.logger.debug("putNodeStandby: begin; node = %s" % node) - # crm_attribute -t nodes -N -n standby -v on --lifetime=forever - pcsHelper._exec("crm_attribute", - ["-t", "nodes", "-N", node, "-n", "standby", "-v", "on", "--lifetime=forever"]) + clusterHelper._exec("crm_attribute", + "-t", "nodes", + "-N", node, + "-n", "standby", + "-v", "on", + "--lifetime=forever") - logging.debug("putNodeStandby: finished") - return + ocf.logger.debug("putNodeStandby: finished") def putNodeOnline(self, node=None): """ Put self back online """ node = self.selfOrOtherNode(node) - logging.debug("putNodeOnline: begin; node = %s" % node) + ocf.logger.debug("putNodeOnline: begin; node = %s" % node) - # crm_attribute -t nodes -N -n standby -v off --lifetime=forever - pcsHelper._exec("crm_attribute", - ["-t", "nodes", "-N", node, "-n", "standby", "-v", "off", "--lifetime=forever"]) + clusterHelper._exec("crm_attribute", + "-t", "nodes", + "-N", node, + "-n", "standby", + "-v", "off", + "--lifetime=forever") - logging.debug("putNodeOnline: finished") - return + ocf.logger.debug("putNodeOnline: finished") def separateEvents(self, events): """ Split own/other nodes' events """ - logging.debug("separateEvents: begin; events = %s" % str(events)) + ocf.logger.debug("separateEvents: begin; events = %s" % str(events)) localEvents = [] remoteEvents = [] for e in events: e = attrDict(e) - if e.EventType not in self.raOwner.config.relevantEventTypes: + if e.EventType not in self.raOwner.relevantEventTypes: continue if self.azName in e.Resources: localEvents.append(e) else: remoteEvents.append(e) - logging.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents))) + ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents))) return (localEvents, remoteEvents) def removeOrphanedEvents(self, azEvents): """ Remove remote events that are already finished """ - logging.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents)) + ocf.logger.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents)) azEventIDs = set() for e in azEvents: azEventIDs.add(e.EventId) # for all nodes except self ... - for n in pcsHelper.getAllNodes(): + for n in clusterHelper.getAllNodes(): if n == self.hostName: continue curState = self.getState(node=n) # ... that still show in an event or shutting down resources ... - if curState in (pcsNodeState.STOPPING, pcsNodeState.IN_EVENT): - logging.info("removeOrphanedEvents: node %s has state %s" % (n, curState)) - pcsEventIDs = self.getEventIDs(node=n) + if curState in (STOPPING, IN_EVENT): + ocf.logger.info("removeOrphanedEvents: node %s has state %s" % (n, curState)) + clusterEventIDs = self.getEventIDs(node=n) stillActive = False # ... but don't have any more events running according to Azure, ... - for p in pcsEventIDs: + for p in clusterEventIDs: if p in azEventIDs: - logging.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n)) + ocf.logger.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n)) stillActive = True break if not stillActive: # ... put them back online. - logging.info("removeOrphanedEvents: pcsEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(pcsEventIDs), n, str(azEventIDs))) + ocf.logger.info("removeOrphanedEvents: clusterEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(clusterEventIDs), n, str(azEventIDs))) self.putNodeOnline(node=n) - logging.debug("removeOrphanedEvents: finished") - return + ocf.logger.debug("removeOrphanedEvents: finished") def handleRemoteEvents(self, azEvents): """ Handle a list of events (as provided by Azure Metadata Service) for other nodes """ - logging.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents))) + ocf.logger.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents))) if len(azEvents) == 0: - logging.info("handleRemoteEvents: no remote events to handle") - logging.debug("handleRemoteEvents: finished") + ocf.logger.info("handleRemoteEvents: no remote events to handle") + ocf.logger.debug("handleRemoteEvents: finished") return eventIDsForNode = {} # iterate through all current events as per Azure for e in azEvents: - logging.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources))) + ocf.logger.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources))) # before we can force an event to start, we need to ensure all nodes involved have stopped their resources if e.EventStatus == "Scheduled": allNodesStopped = True for azName in e.Resources: - hostName = pcsHelper.getHostNameFromAzName(azName) + hostName = clusterHelper.getHostNameFromAzName(azName) state = self.getState(node=hostName) - if state == pcsNodeState.STOPPING: + if state == STOPPING: # the only way we can continue is when node state is STOPPING, but all resources have been stopped - if not pcsHelper.allResourcesStoppedOnNode(hostName): - logging.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName) + if not clusterHelper.allResourcesStoppedOnNode(hostName): + ocf.logger.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName) allNodesStopped = False break - elif state in (pcsNodeState.AVAILABLE, pcsNodeState.IN_EVENT, pcsNodeState.ON_HOLD): - logging.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, state.name)) + elif state in (AVAILABLE, IN_EVENT, ON_HOLD): + ocf.logger.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state))) allNodesStopped = False break if allNodesStopped: - logging.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId)) + ocf.logger.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId)) for n in e.Resources: - hostName = pcsHelper.getHostNameFromAzName(n) - if eventIDsForNode.has_key(hostName): + hostName = clusterHelper.getHostNameFromAzName(n) + if hostName in eventIDsForNode: eventIDsForNode[hostName].append(e.EventId) else: eventIDsForNode[hostName] = [e.EventId] elif e.EventStatus == "Started": - logging.info("handleRemoteEvents: remote event already started") + ocf.logger.info("handleRemoteEvents: remote event already started") - # force the start of all events whose nodes are ready (i.e. have no more resources running) + # force the start of all events whose nodes are ready (i.e. have no more resources running) if len(eventIDsForNode.keys()) > 0: eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist]) - logging.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce))) - for n in eventIDsForNode.keys(): - self.updateNodeStateAndEvents(pcsNodeState.IN_EVENT, eventIDsForNode[n], node=n) + ocf.logger.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce))) + for node, eventId in eventIDsForNode.items(): + self.updateNodeStateAndEvents(IN_EVENT, eventId, node=node) azHelper.forceEvents(eventIDsToForce) - logging.debug("handleRemoteEvents: finished") - return + ocf.logger.debug("handleRemoteEvents: finished") def handleLocalEvents(self, azEvents): """ Handle a list of own events (as provided by Azure Metadata Service) """ - logging.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents))) + ocf.logger.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents))) azEventIDs = set() for e in azEvents: azEventIDs.add(e.EventId) curState = self.getState() - pcsEventIDs = self.getEventIDs() + clusterEventIDs = self.getEventIDs() mayUpdateDocVersion = False - logging.info("handleLocalEvents: current state = %s; pending local pcsEvents = %s" % (curState.name, str(pcsEventIDs))) - + ocf.logger.info("handleLocalEvents: current state = %s; pending local clusterEvents = %s" % (nodeStateToString(curState), str(clusterEventIDs))) + # check if there are currently/still events set for the node - if pcsEventIDs: + if clusterEventIDs: # there are pending events set, so our state must be STOPPING or IN_EVENT i = 0; touchedEventIDs = False - while i < len(pcsEventIDs): - # clean up pending events that are already finished according to AZ - if pcsEventIDs[i] not in azEventIDs: - logging.info("handleLocalEvents: remove finished local pcsEvent %s" % (pcsEventIDs[i])) - pcsEventIDs.pop(i) + while i < len(clusterEventIDs): + # clean up pending events that are already finished according to AZ + if clusterEventIDs[i] not in azEventIDs: + ocf.logger.info("handleLocalEvents: remove finished local clusterEvent %s" % (clusterEventIDs[i])) + clusterEventIDs.pop(i) touchedEventIDs = True else: i += 1 - if len(pcsEventIDs) > 0: + if len(clusterEventIDs) > 0: # there are still pending events (either because we're still stopping, or because the event is still in place) # either way, we need to wait if touchedEventIDs: - logging.info("handleLocalEvents: added new local pcsEvent %s" % str(pcsEventIDs)) - self.setEventIDs(pcsEventIDs) + ocf.logger.info("handleLocalEvents: added new local clusterEvent %s" % str(clusterEventIDs)) + self.setEventIDs(clusterEventIDs) else: - logging.info("handleLocalEvents: no local pcsEvents were updated") + ocf.logger.info("handleLocalEvents: no local clusterEvents were updated") else: # there are no more pending events left after cleanup - if pcsHelper.noPendingResourcesOnNode(self.hostName): + if clusterHelper.noPendingResourcesOnNode(self.hostName): # and no pending resources on the node -> set it back online - logging.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE") - curState = self.updateNodeStateAndEvents(pcsNodeState.AVAILABLE, None) + ocf.logger.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE") + curState = self.updateNodeStateAndEvents(AVAILABLE, None) self.putNodeOnline() - pcsHelper.removeHoldFromNodes() - # repeat handleLocalEvents() since we changed status to AVAILABLE + clusterHelper.removeHoldFromNodes() + # repeat handleLocalEvents() since we changed status to AVAILABLE else: - logging.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait") + ocf.logger.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait") else: # there are no pending events set for us (yet) - if curState == pcsNodeState.AVAILABLE: + if curState == AVAILABLE: if len(azEventIDs) > 0: - if pcsHelper.otherNodesAvailable(self): - logging.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs))) + if clusterHelper.otherNodesAvailable(self): + ocf.logger.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs))) # this will also set mayUpdateDocVersion = True - curState = self.updateNodeStateAndEvents(pcsNodeState.STOPPING, azEventIDs) + curState = self.updateNodeStateAndEvents(STOPPING, azEventIDs) else: - logging.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs)) - self.setState(pcsNodeState.ON_HOLD) + ocf.logger.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs)) + self.setState(ON_HOLD) else: - logging.info("handleLocalEvents: no local azEvents to handle") - if curState == pcsNodeState.STOPPING: - if pcsHelper.noPendingResourcesOnNode(self.hostName): - logging.info("handleLocalEvents: all local resources are started properly -> put node standby") + ocf.logger.info("handleLocalEvents: no local azEvents to handle") + if curState == STOPPING: + if clusterHelper.noPendingResourcesOnNode(self.hostName): + ocf.logger.info("handleLocalEvents: all local resources are started properly -> put node standby") self.putNodeStandby() mayUpdateDocVersion = True else: - logging.info("handleLocalEvents: some local resources are not clean yet -> wait") + ocf.logger.info("handleLocalEvents: some local resources are not clean yet -> wait") - logging.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion)) + ocf.logger.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion)) return mayUpdateDocVersion ############################################################################## -class raGlobalPullState(Enum): - """ - Pull state to avoid two azure-events resource agents pulling from Azure Metadata Service API concurrently - """ - IDLE = 0 - PULLING = 1 - -############################################################################## - -class raConfig: - verbose = None - relevantEventTypes = default_relevantEventTypes - -############################################################################## - class raAzEvents: """ Main class for resource agent - """ - def __init__(self, config): - self.node = pcsNode(self) - self.config = config + """ + def __init__(self, relevantEventTypes): + self.node = Node(self) + self.relevantEventTypes = relevantEventTypes def monitor(self): - logging.debug("monitor: begin") + ocf.logger.debug("monitor: begin") pullFailedAttemps = 0 while True: # check if another node is pulling at the same time; # this should only be a concern for the first pull, as setting up Scheduled Events may take up to 2 minutes. - if pcsHelper.getAttr(attr_globalPullState) == raGlobalPullState.PULLING.name: + if clusterHelper.getAttr(attr_globalPullState) == "PULLING": pullFailedAttemps += 1 if pullFailedAttemps == global_pullMaxAttempts: - logging.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts) - logging.debug("monitor: finished") - return OCF_SUCCESS + ocf.logger.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts) + ocf.logger.debug("monitor: finished") + return ocf.OCF_SUCCESS else: - logging.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs) + ocf.logger.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs) time.sleep(global_pullDelaySecs) continue # we can pull safely from Azure Metadata Service - pcsHelper.setAttr(attr_globalPullState, raGlobalPullState.PULLING.name) + clusterHelper.setAttr(attr_globalPullState, "PULLING") events = azHelper.pullScheduledEvents() - pcsHelper.setAttr(attr_globalPullState, raGlobalPullState.IDLE.name) + clusterHelper.setAttr(attr_globalPullState, "IDLE") # get current document version curDocVersion = events.DocumentIncarnation lastDocVersion = self.node.getAttr(attr_lastDocVersion) - logging.info("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion)) + ocf.logger.info("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion)) # split events local/remote (localEvents, remoteEvents) = self.node.separateEvents(events.Events) # ensure local events are only executing once if curDocVersion != lastDocVersion: - logging.info("monitor: curDocVersion has not been handled yet") + ocf.logger.info("monitor: curDocVersion has not been handled yet") # handleLocalEvents() returns True if mayUpdateDocVersion is True; # this is only the case if we can ensure there are no pending events if self.node.handleLocalEvents(localEvents): - logging.info("monitor: handleLocalEvents completed successfully -> update curDocVersion") + ocf.logger.info("monitor: handleLocalEvents completed successfully -> update curDocVersion") self.node.setAttr(attr_lastDocVersion, curDocVersion) else: - logging.info("monitor: handleLocalEvents still waiting -> keep curDocVersion") + ocf.logger.info("monitor: handleLocalEvents still waiting -> keep curDocVersion") else: - logging.info("monitor: already handled curDocVersion, skip") + ocf.logger.info("monitor: already handled curDocVersion, skip") # remove orphaned remote events and then handle the remaining remote events self.node.removeOrphanedEvents(remoteEvents) self.node.handleRemoteEvents(remoteEvents) break - logging.debug("monitor: finished") - return OCF_SUCCESS + ocf.logger.debug("monitor: finished") + return ocf.OCF_SUCCESS ############################################################################## -############################################################################## -def help(): - print("""This resource agent implements a monitor for scheduled +def setLoglevel(verbose): + # set up writing into syslog + loglevel = default_loglevel + if verbose: + opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1)) + urllib2.install_opener(opener) + loglevel = ocf.logging.DEBUG + ocf.log.setLevel(loglevel) + +description = ( + "Microsoft Azure Scheduled Events monitoring agent", + """This resource agent implements a monitor for scheduled (maintenance) events for a Microsoft Azure VM. If any relevant events are found, it moves all Pacemaker resources away from the affected node to allow for a graceful shutdown. Usage: - azure-events [eventTypes=] [verbose=] + [OCF_RESKEY_eventTypes=VAL] [OCF_RESKEY_verbose=VAL] azure-events ACTION action (required): Supported values: monitor, help, meta-data eventTypes (optional): List of event types to be considered relevant by the resource agent (comma-separated). Supported values: Freeze,Reboot,Redeploy Default = Reboot,Redeploy - verbose (optional): If set to true, displays debug info. +/ verbose (optional): If set to true, displays debug info. Default = false Deployment: crm configure primitive rsc_azure-events ocf:heartbeat:azure-events \ op monitor interval=10s crm configure clone cln_azure-events rsc_azure-events For further information on Microsoft Azure Scheduled Events, please refer to the following documentation: -https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events""") - -def metadata(): - print(""" - - -%s -Resource agent to handle Microsoft Azure Scheduled Events - -The azure-events resource agent is to be used nodes inside a Pacemaker cluster that run Microsoft Azure. It periodically checks if maintenance events (for example, reboots or redploys) are scheduled and takes preemptive action by moving all resources away from the affected node. - - - - A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy; Default = Reboot,Redeploy) - List of resources to be considered - - - - - - - - - -""" % VERSION) - -def getConfig(): - # get resource agent config via env variables - config = raConfig() - verbose = os.environ.get("OCF_RESKEY_verbose") - if verbose and verbose.lower() == "true": - config.verbose = True - relevantEventTypes = os.environ.get("OCF_RESKEY_eventTypes") - if relevantEventTypes: - config.relevantEventTypes = set(relevantEventTypes.split(",")) - return config - -def setLoglevel(verbose): - # set up writing into syslog - if verbose: - opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel = 1)) - urllib2.install_opener(opener) - loglevel = logging.DEBUG - else: - loglevel = default_loglevel - logging.getLogger().setLevel(loglevel) - logging.getLogger().addHandler(SyslogLibHandler()) - logging.getLogger().addHandler(logging.StreamHandler(sys.stderr)) - return +https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events +""") + +def monitor_action(eventTypes): + relevantEventTypes = set(eventTypes.split(",") if eventTypes else []) + ra = raAzEvents(relevantEventTypes) + return ra.monitor() + +def validate_action(eventTypes): + if eventTypes: + for event in eventTypes.split(","): + if event not in ("Freeze", "Reboot", "Redeploy"): + ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes) + return ocf.OCF_ERR_CONFIGURED + return ocf.OCF_SUCCESS def main(): - config = getConfig() - setLoglevel(config.verbose) - - result = OCF_ERR_UNIMPLEMENTED - action = sys.argv[1].lower() if len(sys.argv) > 1 else None - logging.debug("main: begin; action = %s" % action) - if action == "meta-data": - result = metadata() - elif action == "help": - help() - elif action: - ra = raAzEvents(config) - if action == "monitor": - result = ra.monitor() - elif action in ("start", "stop"): - result = OCF_SUCCESS - else: - logging.error("main: Unsupported action %s" % action) - - logging.debug("main: finished; result = %s" % result) - sys.exit(result) + agent = ocf.Agent("azure-events", shortdesc=description[0], longdesc=description[1]) + agent.add_parameter( + "eventTypes", + shortdesc="List of resources to be considered", + longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)", + content_type="string", + default="Reboot,Redeploy") + agent.add_parameter( + "verbose", + shortdesc="Enable verbose agent logging", + longdesc="Set to true to enable verbose logging", + content_type="boolean", + default="false") + agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS) + agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS) + agent.add_action("validate-all", timeout=20, handler=validate_action) + agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action) + setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false"))) + agent.run() if __name__ == '__main__': main()