diff --git a/heartbeat/azure-events-az.in b/heartbeat/azure-events-az.in index 6d31e5aba..0ed001037 100644 --- a/heartbeat/azure-events-az.in +++ b/heartbeat/azure-events-az.in @@ -1,819 +1,819 @@ #!@PYTHON@ -tt # # Resource agent for monitoring Azure Scheduled Events # # License: GNU General Public License (GPL) # (c) 2018 Tobias Niekamp, Microsoft Corp. # and Linux-HA contributors import os import sys import time import subprocess import json try: import urllib2 from urllib2 import URLError except ImportError: import urllib.request as urllib2 from urllib.error import URLError import socket from collections import defaultdict OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT")) sys.path.append(OCF_FUNCTIONS_DIR) import ocf ############################################################################## VERSION = "0.20" USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro()) attr_globalPullState = "azure-events-az_globalPullState" attr_lastDocVersion = "azure-events-az_lastDocVersion" attr_curNodeState = "azure-events-az_curNodeState" attr_pendingEventIDs = "azure-events-az_pendingEventIDs" attr_healthstate = "#health-azure" default_loglevel = ocf.logging.INFO default_relevantEventTypes = set(["Reboot", "Redeploy"]) ############################################################################## class attrDict(defaultdict): """ A wrapper for accessing dict keys like an attribute """ def __init__(self, data): super(attrDict, self).__init__(attrDict) for d in data.keys(): self.__setattr__(d, data[d]) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): self[key] = value ############################################################################## class azHelper: """ Helper class for Azure's metadata API (including Scheduled Events) """ metadata_host = "http://169.254.169.254/metadata" instance_api = "instance" events_api = "scheduledevents" events_api_version = "2020-07-01" instance_api_version = "2021-12-13" @staticmethod def _sendMetadataRequest(endpoint, postData=None, api_version="2019-08-01"): """ Send a request to Azure's Azure Metadata Service API """ retryCount = int(ocf.get_parameter("retry_count",3)) retryWaitTime = int(ocf.get_parameter("retry_wait",20)) requestTimeout = int(ocf.get_parameter("request_timeout",15)) url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, api_version) data = "" ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s, retry_count = %s, retry_wait time = %s, request_timeout = %s" % (endpoint, postData, retryCount, retryWaitTime, requestTimeout)) ocf.logger.debug("_sendMetadataRequest: url = %s" % url) if postData and type(postData) != bytes: postData = postData.encode() req = urllib2.Request(url, postData) req.add_header("Metadata", "true") req.add_header("User-Agent", USER_AGENT) if retryCount > 0: ocf.logger.debug("_sendMetadataRequest: retry enabled") successful = None for retry in range(retryCount+1): try: resp = urllib2.urlopen(req, timeout=requestTimeout) except Exception as e: excType = e.__class__.__name__ if excType == TimeoutError.__name__: ocf.logger.warning("Request timed out after %s seconds Error: %s" % (requestTimeout, e)) if excType == URLError.__name__: if hasattr(e, 'reason'): ocf.logger.warning("Failed to reach the server: %s" % e.reason) elif hasattr(e, 'code'): ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) if retryCount > 1 and retry != retryCount: ocf.logger.warning("Request failed, retry (%s/%s) wait %s seconds before retry (wait time)" % (retry + 1,retryCount,retryWaitTime)) time.sleep(retryWaitTime) else: data = resp.read() ocf.logger.debug("_sendMetadataRequest: response = %s" % data) successful = 1 break # When no request was successful also with retry enabled, set the cluster to idle if successful is None: clusterHelper.setAttr(attr_globalPullState, "IDLE") if data: data = json.loads(data) ocf.logger.debug("_sendMetadataRequest: finished") return data @staticmethod def getInstanceInfo(): """ Fetch details about the current VM from Azure's Azure Metadata Service API """ ocf.logger.debug("getInstanceInfo: begin") jsondata = azHelper._sendMetadataRequest(azHelper.instance_api, None, azHelper.instance_api_version) ocf.logger.debug("getInstanceInfo: json = %s" % jsondata) if jsondata: ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"])) return attrDict(jsondata["compute"]) else: apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.instance_api, azHelper.instance_api_version) ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info - call: %s" % apiCall) sys.exit(ocf.OCF_ERR_GENERIC) @staticmethod def pullScheduledEvents(): """ Retrieve all currently scheduled events via Azure Metadata Service API """ ocf.logger.debug("pullScheduledEvents: begin") jsondata = azHelper._sendMetadataRequest(azHelper.events_api, None, azHelper.events_api_version) ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata) if jsondata: ocf.logger.debug("pullScheduledEvents: finished") return attrDict(jsondata) else: apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.events_api, azHelper.events_api_version) ocf.ocf_exit_reason("pullScheduledEvents: Unable to get scheduledevents info - call: %s" % apiCall) sys.exit(ocf.OCF_ERR_GENERIC) @staticmethod def forceEvents(eventIDs): """ Force a set of events to start immediately """ ocf.logger.debug("forceEvents: begin") events = [] for e in eventIDs: events.append({ "EventId": e, }) postData = { "StartRequests" : events } ocf.logger.info("forceEvents: postData = %s" % postData) resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData)) ocf.logger.debug("forceEvents: finished") return ############################################################################## class clusterHelper: """ Helper functions for Pacemaker control via crm """ @staticmethod def _getLocation(node): """ Helper function to retrieve local/global attributes """ if node: return ["--node", node] else: return ["--type", "crm_config"] @staticmethod def _exec(command, *args): """ Helper function to execute a UNIX command """ args = list(args) ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args))) def flatten(*n): return (str(e) for a in n for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),))) command = list(flatten([command] + args)) ocf.logger.debug("_exec: cmd = %s" % " ".join(command)) try: ret = subprocess.check_output(command) if type(ret) != str: ret = ret.decode() ocf.logger.debug("_exec: return = %s" % ret) return ret.rstrip() except Exception as err: ocf.logger.exception(err) return None @staticmethod def setAttr(key, value, node=None): """ Set the value of a specific global/local attribute in the Pacemaker cluster """ ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node)) if value: ret = clusterHelper._exec("crm_attribute", "--name", key, "--update", value, clusterHelper._getLocation(node)) else: ret = clusterHelper._exec("crm_attribute", "--name", key, "--delete", clusterHelper._getLocation(node)) ocf.logger.debug("setAttr: finished") return len(ret) == 0 @staticmethod def getAttr(key, node=None): """ Retrieve a global/local attribute from the Pacemaker cluster """ ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node)) val = clusterHelper._exec("crm_attribute", "--name", key, "--query", "--quiet", "--default", "", clusterHelper._getLocation(node)) ocf.logger.debug("getAttr: finished") if not val: return None return val if not val.isdigit() else int(val) @staticmethod def getAllNodes(): """ Get a list of hostnames for all nodes in the Pacemaker cluster """ ocf.logger.debug("getAllNodes: begin") nodes = [] nodeList = clusterHelper._exec("crm_node", "--list") for n in nodeList.split("\n"): nodes.append(n.split()[1]) ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes)) return nodes @staticmethod def getHostNameFromAzName(azName): """ Helper function to get the actual host name from an Azure node name """ return clusterHelper.getAttr("hostName_%s" % azName) @staticmethod def removeHoldFromNodes(): """ Remove the ON_HOLD state from all nodes in the Pacemaker cluster """ ocf.logger.debug("removeHoldFromNodes: begin") for n in clusterHelper.getAllNodes(): if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD": clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n) ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n) ocf.logger.debug("removeHoldFromNodes: finished") return False @staticmethod def otherNodesAvailable(exceptNode): """ Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE """ ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode) for n in clusterHelper.getAllNodes(): state = clusterHelper.getAttr(attr_curNodeState, node=n) state = stringToNodeState(state) if state else AVAILABLE if state == AVAILABLE and n != exceptNode.hostName: ocf.logger.info("otherNodesAvailable: at least %s is available" % n) ocf.logger.debug("otherNodesAvailable: finished") return True ocf.logger.info("otherNodesAvailable: no other nodes are available") ocf.logger.debug("otherNodesAvailable: finished") return False @staticmethod def transitionSummary(): """ Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby) """ # Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node? # # crm_simulate -LS # Transition Summary: # * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1) # * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0) # * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1) # * Start rsc_nc_HN1_HDB03 (hsr3-db1) # # Excepted result when there are no pending actions: # Transition Summary: ocf.logger.debug("transitionSummary: begin") summary = clusterHelper._exec("crm_simulate", "-LS") if not summary: ocf.logger.warning("transitionSummary: could not load transition summary") return "" if summary.find("Transition Summary:") < 0: ocf.logger.debug("transitionSummary: no transactions: %s" % summary) return "" j=summary.find('Transition Summary:') + len('Transition Summary:') l=summary.lower().find('executing cluster transition:') ret = list(filter(str.strip, summary[j:l].split("\n"))) ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret)) return ret @staticmethod def listOperationsOnNode(node): """ Get a list of all current operations for a given node (used to check if any resources are pending) """ # hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0 # rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete # rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete # rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending # rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node) resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node) if len(resources) == 0: ret = [] else: ret = resources.split("\n") ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret)) return ret @staticmethod def noPendingResourcesOnNode(node): """ Check that there are no pending resources on a given node """ ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node) for r in clusterHelper.listOperationsOnNode(node): ocf.logger.debug("noPendingResourcesOnNode: * %s" % r) resource = r.split()[-1] if resource == "pending": ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource) ocf.logger.debug("noPendingResourcesOnNode: finished; return = False") return False ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node) ocf.logger.debug("noPendingResourcesOnNode: finished; return = True") return True @staticmethod def allResourcesStoppedOnNode(node): """ Check that all resources on a given node are stopped """ ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node) if clusterHelper.noPendingResourcesOnNode(node): if len(clusterHelper.transitionSummary()) == 0: ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True") return True ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty") ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ############################################################################## AVAILABLE = 0 # Node is online and ready to handle events STOPPING = 1 # Standby has been triggered, but some resources are still running IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available def stringToNodeState(name): if type(name) == int: return name if name == "STOPPING": return STOPPING if name == "IN_EVENT": return IN_EVENT if name == "ON_HOLD": return ON_HOLD return AVAILABLE def nodeStateToString(state): if state == STOPPING: return "STOPPING" if state == IN_EVENT: return "IN_EVENT" if state == ON_HOLD: return "ON_HOLD" return "AVAILABLE" ############################################################################## class Node: """ Core class implementing logic for a cluster node """ def __init__(self, ra): self.raOwner = ra self.azInfo = azHelper.getInstanceInfo() self.azName = self.azInfo.name - self.hostName = socket.gethostname() + self.hostName = clusterHelper._exec("crm_node", "-n") self.setAttr("azName", self.azName) clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName) def getAttr(self, key): """ Get a local attribute """ return clusterHelper.getAttr(key, node=self.hostName) def setAttr(self, key, value): """ Set a local attribute """ return clusterHelper.setAttr(key, value, node=self.hostName) def selfOrOtherNode(self, node): """ Helper function to distinguish self/other node """ return node if node else self.hostName def setState(self, state, node=None): """ Set the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state))) clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node) ocf.logger.debug("setState: finished") def getState(self, node=None): """ Get the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getState: begin; node = %s" % node) state = clusterHelper.getAttr(attr_curNodeState, node=node) ocf.logger.debug("getState: state = %s" % state) ocf.logger.debug("getState: finished") if not state: return AVAILABLE return stringToNodeState(state) def setEventIDs(self, eventIDs, node=None): """ Set pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs))) if eventIDs: eventIDStr = ",".join(eventIDs) else: eventIDStr = None clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node) ocf.logger.debug("setEventIDs: finished") return def getEventIDs(self, node=None): """ Get pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getEventIDs: begin; node = %s" % node) eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node) if eventIDStr: eventIDs = eventIDStr.split(",") else: eventIDs = None ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs)) return eventIDs def updateNodeStateAndEvents(self, state, eventIDs, node=None): """ Set the state and pending EventIDs for a given node (or self) """ ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs))) self.setState(state, node=node) self.setEventIDs(eventIDs, node=node) ocf.logger.debug("updateNodeStateAndEvents: finished") return state def putNodeStandby(self, node=None): """ Put self to standby """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeStandby: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "--node", node, "--name", attr_healthstate, "--update", "-1000000", "--lifetime=forever") ocf.logger.debug("putNodeStandby: finished") def isNodeInStandby(self, node=None): """ check if node is in standby """ node = self.selfOrOtherNode(node) ocf.logger.debug("isNodeInStandby: begin; node = %s" % node) isInStandy = False healthAttributeStr = clusterHelper.getAttr(attr_healthstate, node) if healthAttributeStr is not None: try: healthAttribute = int(healthAttributeStr) isInStandy = healthAttribute < 0 except ValueError: # Handle the exception ocf.logger.warn("Health attribute %s on node %s cannot be converted to an integer value" % (healthAttributeStr, node)) ocf.logger.debug("isNodeInStandby: finished - result %s" % isInStandy) return isInStandy def putNodeOnline(self, node=None): """ Put self back online """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeOnline: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "--node", node, "--name", "#health-azure", "--update", "0", "--lifetime=forever") ocf.logger.debug("putNodeOnline: finished") def separateEvents(self, events): """ Split own/other nodes' events """ ocf.logger.debug("separateEvents: begin; events = %s" % str(events)) localEvents = [] remoteEvents = [] for e in events: e = attrDict(e) if e.EventType not in self.raOwner.relevantEventTypes: continue if self.azName in e.Resources: localEvents.append(e) else: remoteEvents.append(e) ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents))) return (localEvents, remoteEvents) ############################################################################## class raAzEvents: """ Main class for resource agent """ def __init__(self, relevantEventTypes): self.node = Node(self) self.relevantEventTypes = relevantEventTypes def monitor(self): ocf.logger.debug("monitor: begin") events = azHelper.pullScheduledEvents() # get current document version curDocVersion = events.DocumentIncarnation lastDocVersion = self.node.getAttr(attr_lastDocVersion) ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion)) # split events local/remote (localEvents, remoteEvents) = self.node.separateEvents(events.Events) # ensure local events are only executing once if curDocVersion == lastDocVersion: ocf.logger.info("monitor: already handled curDocVersion, skip") return ocf.OCF_SUCCESS localAzEventIds = dict() for e in localEvents: localAzEventIds[e.EventId] = json.dumps(e) curState = self.node.getState() clusterEventIDs = self.node.getEventIDs() ocf.logger.debug("monitor: curDocVersion has not been handled yet") if clusterEventIDs: # there are pending events set, so our state must be STOPPING or IN_EVENT i = 0; touchedEventIDs = False while i < len(clusterEventIDs): # clean up pending events that are already finished according to AZ if clusterEventIDs[i] not in localAzEventIds.keys(): ocf.logger.info("monitor: remove finished local clusterEvent %s" % (clusterEventIDs[i])) clusterEventIDs.pop(i) touchedEventIDs = True else: i += 1 if len(clusterEventIDs) > 0: # there are still pending events (either because we're still stopping, or because the event is still in place) # either way, we need to wait if touchedEventIDs: ocf.logger.info("monitor: added new local clusterEvent %s" % str(clusterEventIDs)) self.node.setEventIDs(clusterEventIDs) else: ocf.logger.info("monitor: no local clusterEvents were updated") else: # there are no more pending events left after cleanup if clusterHelper.noPendingResourcesOnNode(self.node.hostName): # and no pending resources on the node -> set it back online ocf.logger.info("monitor: all local events finished -> clean up, put node online and AVAILABLE") curState = self.node.updateNodeStateAndEvents(AVAILABLE, None) self.node.putNodeOnline() clusterHelper.removeHoldFromNodes() # If Azure Scheduled Events are not used for 24 hours (e.g. because the cluster was asleep), it will be disabled for a VM. # When the cluster wakes up and starts using it again, the DocumentIncarnation is reset. # We need to remove it during cleanup, otherwise azure-events-az will not process the event after wakeup self.node.setAttr(attr_lastDocVersion, None) else: ocf.logger.info("monitor: all local events finished, but some resources have not completed startup yet -> wait") else: if curState == AVAILABLE: if len(localAzEventIds) > 0: if clusterHelper.otherNodesAvailable(self.node): ocf.logger.info("monitor: can handle local events %s -> set state STOPPING - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIds.keys()) else: ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) self.node.setState(ON_HOLD) else: ocf.logger.debug("monitor: no local azEvents to handle") if curState == STOPPING: eventIDsForNode = {} if clusterHelper.noPendingResourcesOnNode(self.node.hostName): if not self.node.isNodeInStandby(): ocf.logger.info("monitor: all local resources are started properly -> put node standby and exit") self.node.putNodeStandby() return ocf.OCF_SUCCESS for e in localEvents: ocf.logger.info("monitor: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources))) # before we can force an event to start, we need to ensure all nodes involved have stopped their resources if e.EventStatus == "Scheduled": allNodesStopped = True for azName in e.Resources: hostName = clusterHelper.getHostNameFromAzName(azName) state = self.node.getState(node=hostName) if state == STOPPING: # the only way we can continue is when node state is STOPPING, but all resources have been stopped if not clusterHelper.allResourcesStoppedOnNode(hostName): ocf.logger.info("monitor: (at least) node %s has still resources running -> wait" % hostName) allNodesStopped = False break elif state in (AVAILABLE, IN_EVENT, ON_HOLD): ocf.logger.info("monitor: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state))) allNodesStopped = False break if allNodesStopped: ocf.logger.info("monitor: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId)) for n in e.Resources: hostName = clusterHelper.getHostNameFromAzName(n) if hostName in eventIDsForNode: eventIDsForNode[hostName].append(e.EventId) else: eventIDsForNode[hostName] = [e.EventId] elif e.EventStatus == "Started": ocf.logger.info("monitor: remote event already started") # force the start of all events whose nodes are ready (i.e. have no more resources running) if len(eventIDsForNode.keys()) > 0: eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist]) ocf.logger.info("monitor: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce))) for node, eventId in eventIDsForNode.items(): self.node.updateNodeStateAndEvents(IN_EVENT, eventId, node=node) azHelper.forceEvents(eventIDsToForce) self.node.setAttr(attr_lastDocVersion, curDocVersion) else: ocf.logger.info("monitor: some local resources are not clean yet -> wait") ocf.logger.debug("monitor: finished") return ocf.OCF_SUCCESS ############################################################################## def setLoglevel(verbose): # set up writing into syslog loglevel = default_loglevel if verbose: opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1)) urllib2.install_opener(opener) loglevel = ocf.logging.DEBUG ocf.log.setLevel(loglevel) description = ( "Microsoft Azure Scheduled Events monitoring agent", """This resource agent implements a monitor for scheduled (maintenance) events for a Microsoft Azure VM. If any relevant events are found, it moves all Pacemaker resources away from the affected node to allow for a graceful shutdown. Deployment: crm configure primitive rsc_azure-events-az ocf:heartbeat:azure-events-az \ op monitor interval=10s crm configure clone cln_azure-events-az rsc_azure-events-az For further information on Microsoft Azure Scheduled Events, please refer to the following documentation: https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events """) def monitor_action(eventTypes): relevantEventTypes = set(eventTypes.split(",") if eventTypes else []) ra = raAzEvents(relevantEventTypes) return ra.monitor() def validate_action(eventTypes): if eventTypes: for event in eventTypes.split(","): if event not in ("Freeze", "Reboot", "Redeploy"): ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes) return ocf.OCF_ERR_CONFIGURED return ocf.OCF_SUCCESS def main(): agent = ocf.Agent("azure-events-az", shortdesc=description[0], longdesc=description[1]) agent.add_parameter( "eventTypes", shortdesc="List of resources to be considered", longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)", content_type="string", default="Reboot,Redeploy") agent.add_parameter( "verbose", shortdesc="Enable verbose agent logging", longdesc="Set to true to enable verbose logging", content_type="boolean", default="false") agent.add_parameter( "retry_count", shortdesc="Azure IMDS webservice retry count", longdesc="Set to any number bigger than zero to enable retry count", content_type="integer", default="3") agent.add_parameter( "retry_wait", shortdesc="Configure a retry wait time", longdesc="Set retry wait time in seconds", content_type="integer", default="20") agent.add_parameter( "request_timeout", shortdesc="Configure a request timeout", longdesc="Set request timeout in seconds", content_type="integer", default="15") agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("validate-all", timeout=20, handler=validate_action) agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action) setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false"))) agent.run() if __name__ == '__main__': main() diff --git a/heartbeat/azure-events.in b/heartbeat/azure-events.in index 90acaba62..32f71ee26 100644 --- a/heartbeat/azure-events.in +++ b/heartbeat/azure-events.in @@ -1,847 +1,847 @@ #!@PYTHON@ -tt # # Resource agent for monitoring Azure Scheduled Events # # License: GNU General Public License (GPL) # (c) 2018 Tobias Niekamp, Microsoft Corp. # and Linux-HA contributors import os import sys import time import subprocess import json try: import urllib2 from urllib2 import URLError except ImportError: import urllib.request as urllib2 from urllib.error import URLError import socket from collections import defaultdict OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT")) sys.path.append(OCF_FUNCTIONS_DIR) import ocf ############################################################################## VERSION = "0.10" USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro()) attr_globalPullState = "azure-events_globalPullState" attr_lastDocVersion = "azure-events_lastDocVersion" attr_curNodeState = "azure-events_curNodeState" attr_pendingEventIDs = "azure-events_pendingEventIDs" default_loglevel = ocf.logging.INFO default_relevantEventTypes = set(["Reboot", "Redeploy"]) global_pullMaxAttempts = 3 global_pullDelaySecs = 1 ############################################################################## class attrDict(defaultdict): """ A wrapper for accessing dict keys like an attribute """ def __init__(self, data): super(attrDict, self).__init__(attrDict) for d in data.keys(): self.__setattr__(d, data[d]) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): self[key] = value ############################################################################## class azHelper: """ Helper class for Azure's metadata API (including Scheduled Events) """ metadata_host = "http://169.254.169.254/metadata" instance_api = "instance" events_api = "scheduledevents" api_version = "2019-08-01" @staticmethod def _sendMetadataRequest(endpoint, postData=None): """ Send a request to Azure's Azure Metadata Service API """ url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version) data = "" ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData)) ocf.logger.debug("_sendMetadataRequest: url = %s" % url) if postData and type(postData) != bytes: postData = postData.encode() req = urllib2.Request(url, postData) req.add_header("Metadata", "true") req.add_header("User-Agent", USER_AGENT) try: resp = urllib2.urlopen(req) except URLError as e: if hasattr(e, 'reason'): ocf.logger.warning("Failed to reach the server: %s" % e.reason) clusterHelper.setAttr(attr_globalPullState, "IDLE") elif hasattr(e, 'code'): ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) clusterHelper.setAttr(attr_globalPullState, "IDLE") else: data = resp.read() ocf.logger.debug("_sendMetadataRequest: response = %s" % data) if data: data = json.loads(data) ocf.logger.debug("_sendMetadataRequest: finished") return data @staticmethod def getInstanceInfo(): """ Fetch details about the current VM from Azure's Azure Metadata Service API """ ocf.logger.debug("getInstanceInfo: begin") jsondata = azHelper._sendMetadataRequest(azHelper.instance_api) ocf.logger.debug("getInstanceInfo: json = %s" % jsondata) if jsondata: ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"])) return attrDict(jsondata["compute"]) else: ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info") sys.exit(ocf.OCF_ERR_GENERIC) @staticmethod def pullScheduledEvents(): """ Retrieve all currently scheduled events via Azure Metadata Service API """ ocf.logger.debug("pullScheduledEvents: begin") jsondata = azHelper._sendMetadataRequest(azHelper.events_api) ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata) ocf.logger.debug("pullScheduledEvents: finished") return attrDict(jsondata) @staticmethod def forceEvents(eventIDs): """ Force a set of events to start immediately """ ocf.logger.debug("forceEvents: begin") events = [] for e in eventIDs: events.append({ "EventId": e, }) postData = { "StartRequests" : events } ocf.logger.info("forceEvents: postData = %s" % postData) resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData)) ocf.logger.debug("forceEvents: finished") return ############################################################################## class clusterHelper: """ Helper functions for Pacemaker control via crm """ @staticmethod def _getLocation(node): """ Helper function to retrieve local/global attributes """ if node: return ["--node", node] else: return ["--type", "crm_config"] @staticmethod def _exec(command, *args): """ Helper function to execute a UNIX command """ args = list(args) ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args))) def flatten(*n): return (str(e) for a in n for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),))) command = list(flatten([command] + args)) ocf.logger.debug("_exec: cmd = %s" % " ".join(command)) try: ret = subprocess.check_output(command) if type(ret) != str: ret = ret.decode() ocf.logger.debug("_exec: return = %s" % ret) return ret.rstrip() except Exception as err: ocf.logger.exception(err) return None @staticmethod def setAttr(key, value, node=None): """ Set the value of a specific global/local attribute in the Pacemaker cluster """ ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node)) if value: ret = clusterHelper._exec("crm_attribute", "--name", key, "--update", value, clusterHelper._getLocation(node)) else: ret = clusterHelper._exec("crm_attribute", "--name", key, "--delete", clusterHelper._getLocation(node)) ocf.logger.debug("setAttr: finished") return len(ret) == 0 @staticmethod def getAttr(key, node=None): """ Retrieve a global/local attribute from the Pacemaker cluster """ ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node)) val = clusterHelper._exec("crm_attribute", "--name", key, "--query", "--quiet", "--default", "", clusterHelper._getLocation(node)) ocf.logger.debug("getAttr: finished") if not val: return None return val if not val.isdigit() else int(val) @staticmethod def getAllNodes(): """ Get a list of hostnames for all nodes in the Pacemaker cluster """ ocf.logger.debug("getAllNodes: begin") nodes = [] nodeList = clusterHelper._exec("crm_node", "--list") for n in nodeList.split("\n"): nodes.append(n.split()[1]) ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes)) return nodes @staticmethod def getHostNameFromAzName(azName): """ Helper function to get the actual host name from an Azure node name """ return clusterHelper.getAttr("hostName_%s" % azName) @staticmethod def removeHoldFromNodes(): """ Remove the ON_HOLD state from all nodes in the Pacemaker cluster """ ocf.logger.debug("removeHoldFromNodes: begin") for n in clusterHelper.getAllNodes(): if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD": clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n) ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n) ocf.logger.debug("removeHoldFromNodes: finished") return False @staticmethod def otherNodesAvailable(exceptNode): """ Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE """ ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode) for n in clusterHelper.getAllNodes(): state = clusterHelper.getAttr(attr_curNodeState, node=n) state = stringToNodeState(state) if state else AVAILABLE if state == AVAILABLE and n != exceptNode.hostName: ocf.logger.info("otherNodesAvailable: at least %s is available" % n) ocf.logger.debug("otherNodesAvailable: finished") return True ocf.logger.info("otherNodesAvailable: no other nodes are available") ocf.logger.debug("otherNodesAvailable: finished") return False @staticmethod def transitionSummary(): """ Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby) """ # Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node? # # crm_simulate -LS # Transition Summary: # * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1) # * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0) # * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1) # * Start rsc_nc_HN1_HDB03 (hsr3-db1) # # Excepted result when there are no pending actions: # Transition Summary: ocf.logger.debug("transitionSummary: begin") summary = clusterHelper._exec("crm_simulate", "-LS") if not summary: ocf.logger.warning("transitionSummary: could not load transition summary") return "" if summary.find("Transition Summary:") < 0: ocf.logger.debug("transitionSummary: no transactions: %s" % summary) return "" j=summary.find('Transition Summary:') + len('Transition Summary:') l=summary.lower().find('executing cluster transition:') ret = list(filter(str.strip, summary[j:l].split("\n"))) ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret)) return ret @staticmethod def listOperationsOnNode(node): """ Get a list of all current operations for a given node (used to check if any resources are pending) """ # hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0 # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete # rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending # rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node) resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node) if len(resources) == 0: ret = [] else: ret = resources.split("\n") ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret)) return ret @staticmethod def noPendingResourcesOnNode(node): """ Check that there are no pending resources on a given node """ ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node) for r in clusterHelper.listOperationsOnNode(node): ocf.logger.debug("noPendingResourcesOnNode: * %s" % r) resource = r.split()[-1] if resource == "pending": ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource) ocf.logger.debug("noPendingResourcesOnNode: finished; return = False") return False ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node) ocf.logger.debug("noPendingResourcesOnNode: finished; return = True") return True @staticmethod def allResourcesStoppedOnNode(node): """ Check that all resources on a given node are stopped """ ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node) if clusterHelper.noPendingResourcesOnNode(node): if len(clusterHelper.transitionSummary()) == 0: ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True") return True ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty") ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ############################################################################## AVAILABLE = 0 # Node is online and ready to handle events STOPPING = 1 # Standby has been triggered, but some resources are still running IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available def stringToNodeState(name): if type(name) == int: return name if name == "STOPPING": return STOPPING if name == "IN_EVENT": return IN_EVENT if name == "ON_HOLD": return ON_HOLD return AVAILABLE def nodeStateToString(state): if state == STOPPING: return "STOPPING" if state == IN_EVENT: return "IN_EVENT" if state == ON_HOLD: return "ON_HOLD" return "AVAILABLE" ############################################################################## class Node: """ Core class implementing logic for a cluster node """ def __init__(self, ra): self.raOwner = ra self.azInfo = azHelper.getInstanceInfo() self.azName = self.azInfo.name - self.hostName = socket.gethostname() + self.hostName = clusterHelper._exec("crm_node", "-n") self.setAttr("azName", self.azName) clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName) def getAttr(self, key): """ Get a local attribute """ return clusterHelper.getAttr(key, node=self.hostName) def setAttr(self, key, value): """ Set a local attribute """ return clusterHelper.setAttr(key, value, node=self.hostName) def selfOrOtherNode(self, node): """ Helper function to distinguish self/other node """ return node if node else self.hostName def setState(self, state, node=None): """ Set the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state))) clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node) ocf.logger.debug("setState: finished") def getState(self, node=None): """ Get the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getState: begin; node = %s" % node) state = clusterHelper.getAttr(attr_curNodeState, node=node) ocf.logger.debug("getState: state = %s" % state) ocf.logger.debug("getState: finished") if not state: return AVAILABLE return stringToNodeState(state) def setEventIDs(self, eventIDs, node=None): """ Set pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs))) if eventIDs: eventIDStr = ",".join(eventIDs) else: eventIDStr = None clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node) ocf.logger.debug("setEventIDs: finished") return def getEventIDs(self, node=None): """ Get pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getEventIDs: begin; node = %s" % node) eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node) if eventIDStr: eventIDs = eventIDStr.split(",") else: eventIDs = None ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs)) return eventIDs def updateNodeStateAndEvents(self, state, eventIDs, node=None): """ Set the state and pending EventIDs for a given node (or self) """ ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs))) self.setState(state, node=node) self.setEventIDs(eventIDs, node=node) ocf.logger.debug("updateNodeStateAndEvents: finished") return state def putNodeStandby(self, node=None): """ Put self to standby """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeStandby: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "-t", "nodes", "-N", node, "-n", "standby", "-v", "on", "--lifetime=forever") ocf.logger.debug("putNodeStandby: finished") def putNodeOnline(self, node=None): """ Put self back online """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeOnline: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "-t", "nodes", "-N", node, "-n", "standby", "-v", "off", "--lifetime=forever") ocf.logger.debug("putNodeOnline: finished") def separateEvents(self, events): """ Split own/other nodes' events """ ocf.logger.debug("separateEvents: begin; events = %s" % str(events)) localEvents = [] remoteEvents = [] for e in events: e = attrDict(e) if e.EventType not in self.raOwner.relevantEventTypes: continue if self.azName in e.Resources: localEvents.append(e) else: remoteEvents.append(e) ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents))) return (localEvents, remoteEvents) def removeOrphanedEvents(self, azEvents): """ Remove remote events that are already finished """ ocf.logger.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents)) azEventIDs = set() for e in azEvents: azEventIDs.add(e.EventId) # for all nodes except self ... for n in clusterHelper.getAllNodes(): if n == self.hostName: continue curState = self.getState(node=n) # ... that still show in an event or shutting down resources ... if curState in (STOPPING, IN_EVENT): ocf.logger.info("removeOrphanedEvents: node %s has state %s" % (n, curState)) clusterEventIDs = self.getEventIDs(node=n) stillActive = False # ... but don't have any more events running according to Azure, ... for p in clusterEventIDs: if p in azEventIDs: ocf.logger.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n)) stillActive = True break if not stillActive: # ... put them back online. ocf.logger.info("removeOrphanedEvents: clusterEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(clusterEventIDs), n, str(azEventIDs))) self.putNodeOnline(node=n) ocf.logger.debug("removeOrphanedEvents: finished") def handleRemoteEvents(self, azEvents): """ Handle a list of events (as provided by Azure Metadata Service) for other nodes """ ocf.logger.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents))) if len(azEvents) == 0: ocf.logger.debug("handleRemoteEvents: no remote events to handle") ocf.logger.debug("handleRemoteEvents: finished") return eventIDsForNode = {} # iterate through all current events as per Azure for e in azEvents: ocf.logger.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources))) # before we can force an event to start, we need to ensure all nodes involved have stopped their resources if e.EventStatus == "Scheduled": allNodesStopped = True for azName in e.Resources: hostName = clusterHelper.getHostNameFromAzName(azName) state = self.getState(node=hostName) if state == STOPPING: # the only way we can continue is when node state is STOPPING, but all resources have been stopped if not clusterHelper.allResourcesStoppedOnNode(hostName): ocf.logger.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName) allNodesStopped = False break elif state in (AVAILABLE, IN_EVENT, ON_HOLD): ocf.logger.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state))) allNodesStopped = False break if allNodesStopped: ocf.logger.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId)) for n in e.Resources: hostName = clusterHelper.getHostNameFromAzName(n) if hostName in eventIDsForNode: eventIDsForNode[hostName].append(e.EventId) else: eventIDsForNode[hostName] = [e.EventId] elif e.EventStatus == "Started": ocf.logger.info("handleRemoteEvents: remote event already started") # force the start of all events whose nodes are ready (i.e. have no more resources running) if len(eventIDsForNode.keys()) > 0: eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist]) ocf.logger.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce))) for node, eventId in eventIDsForNode.items(): self.updateNodeStateAndEvents(IN_EVENT, eventId, node=node) azHelper.forceEvents(eventIDsToForce) ocf.logger.debug("handleRemoteEvents: finished") def handleLocalEvents(self, azEvents): """ Handle a list of own events (as provided by Azure Metadata Service) """ ocf.logger.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents))) azEventIDs = set() for e in azEvents: azEventIDs.add(e.EventId) curState = self.getState() clusterEventIDs = self.getEventIDs() mayUpdateDocVersion = False ocf.logger.info("handleLocalEvents: current state = %s; pending local clusterEvents = %s" % (nodeStateToString(curState), str(clusterEventIDs))) # check if there are currently/still events set for the node if clusterEventIDs: # there are pending events set, so our state must be STOPPING or IN_EVENT i = 0; touchedEventIDs = False while i < len(clusterEventIDs): # clean up pending events that are already finished according to AZ if clusterEventIDs[i] not in azEventIDs: ocf.logger.info("handleLocalEvents: remove finished local clusterEvent %s" % (clusterEventIDs[i])) clusterEventIDs.pop(i) touchedEventIDs = True else: i += 1 if len(clusterEventIDs) > 0: # there are still pending events (either because we're still stopping, or because the event is still in place) # either way, we need to wait if touchedEventIDs: ocf.logger.info("handleLocalEvents: added new local clusterEvent %s" % str(clusterEventIDs)) self.setEventIDs(clusterEventIDs) else: ocf.logger.info("handleLocalEvents: no local clusterEvents were updated") else: # there are no more pending events left after cleanup if clusterHelper.noPendingResourcesOnNode(self.hostName): # and no pending resources on the node -> set it back online ocf.logger.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE") curState = self.updateNodeStateAndEvents(AVAILABLE, None) self.putNodeOnline() clusterHelper.removeHoldFromNodes() # repeat handleLocalEvents() since we changed status to AVAILABLE else: ocf.logger.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait") else: # there are no pending events set for us (yet) if curState == AVAILABLE: if len(azEventIDs) > 0: if clusterHelper.otherNodesAvailable(self): ocf.logger.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs))) # this will also set mayUpdateDocVersion = True curState = self.updateNodeStateAndEvents(STOPPING, azEventIDs) else: ocf.logger.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs)) self.setState(ON_HOLD) else: ocf.logger.debug("handleLocalEvents: no local azEvents to handle") if curState == STOPPING: if clusterHelper.noPendingResourcesOnNode(self.hostName): ocf.logger.info("handleLocalEvents: all local resources are started properly -> put node standby") self.putNodeStandby() mayUpdateDocVersion = True else: ocf.logger.info("handleLocalEvents: some local resources are not clean yet -> wait") ocf.logger.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion)) return mayUpdateDocVersion ############################################################################## class raAzEvents: """ Main class for resource agent """ def __init__(self, relevantEventTypes): self.node = Node(self) self.relevantEventTypes = relevantEventTypes def monitor(self): ocf.logger.debug("monitor: begin") pullFailedAttemps = 0 while True: # check if another node is pulling at the same time; # this should only be a concern for the first pull, as setting up Scheduled Events may take up to 2 minutes. if clusterHelper.getAttr(attr_globalPullState) == "PULLING": pullFailedAttemps += 1 if pullFailedAttemps == global_pullMaxAttempts: ocf.logger.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts) ocf.logger.debug("monitor: finished") return ocf.OCF_SUCCESS else: ocf.logger.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs) time.sleep(global_pullDelaySecs) continue # we can pull safely from Azure Metadata Service clusterHelper.setAttr(attr_globalPullState, "PULLING") events = azHelper.pullScheduledEvents() clusterHelper.setAttr(attr_globalPullState, "IDLE") # get current document version curDocVersion = events.DocumentIncarnation lastDocVersion = self.node.getAttr(attr_lastDocVersion) ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion)) # split events local/remote (localEvents, remoteEvents) = self.node.separateEvents(events.Events) # ensure local events are only executing once if curDocVersion != lastDocVersion: ocf.logger.debug("monitor: curDocVersion has not been handled yet") # handleLocalEvents() returns True if mayUpdateDocVersion is True; # this is only the case if we can ensure there are no pending events if self.node.handleLocalEvents(localEvents): ocf.logger.info("monitor: handleLocalEvents completed successfully -> update curDocVersion") self.node.setAttr(attr_lastDocVersion, curDocVersion) else: ocf.logger.debug("monitor: handleLocalEvents still waiting -> keep curDocVersion") else: ocf.logger.info("monitor: already handled curDocVersion, skip") # remove orphaned remote events and then handle the remaining remote events self.node.removeOrphanedEvents(remoteEvents) self.node.handleRemoteEvents(remoteEvents) break ocf.logger.debug("monitor: finished") return ocf.OCF_SUCCESS ############################################################################## def setLoglevel(verbose): # set up writing into syslog loglevel = default_loglevel if verbose: opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1)) urllib2.install_opener(opener) loglevel = ocf.logging.DEBUG ocf.log.setLevel(loglevel) description = ( "Microsoft Azure Scheduled Events monitoring agent", """This resource agent implements a monitor for scheduled (maintenance) events for a Microsoft Azure VM. If any relevant events are found, it moves all Pacemaker resources away from the affected node to allow for a graceful shutdown. Usage: [OCF_RESKEY_eventTypes=VAL] [OCF_RESKEY_verbose=VAL] azure-events ACTION action (required): Supported values: monitor, help, meta-data eventTypes (optional): List of event types to be considered relevant by the resource agent (comma-separated). Supported values: Freeze,Reboot,Redeploy Default = Reboot,Redeploy / verbose (optional): If set to true, displays debug info. Default = false Deployment: crm configure primitive rsc_azure-events ocf:heartbeat:azure-events \ op monitor interval=10s crm configure clone cln_azure-events rsc_azure-events For further information on Microsoft Azure Scheduled Events, please refer to the following documentation: https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events """) def monitor_action(eventTypes): relevantEventTypes = set(eventTypes.split(",") if eventTypes else []) ra = raAzEvents(relevantEventTypes) return ra.monitor() def validate_action(eventTypes): if eventTypes: for event in eventTypes.split(","): if event not in ("Freeze", "Reboot", "Redeploy"): ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes) return ocf.OCF_ERR_CONFIGURED return ocf.OCF_SUCCESS def main(): agent = ocf.Agent("azure-events", shortdesc=description[0], longdesc=description[1]) agent.add_parameter( "eventTypes", shortdesc="List of resources to be considered", longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)", content_type="string", default="Reboot,Redeploy") agent.add_parameter( "verbose", shortdesc="Enable verbose agent logging", longdesc="Set to true to enable verbose logging", content_type="boolean", default="false") agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("validate-all", timeout=20, handler=validate_action) agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action) setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false"))) agent.run() if __name__ == '__main__': main()