diff --git a/heartbeat/azure-events-az.in b/heartbeat/azure-events-az.in index 46d4d1f3d..6d31e5aba 100644 --- a/heartbeat/azure-events-az.in +++ b/heartbeat/azure-events-az.in @@ -1,772 +1,819 @@ #!@PYTHON@ -tt # # Resource agent for monitoring Azure Scheduled Events # # License: GNU General Public License (GPL) # (c) 2018 Tobias Niekamp, Microsoft Corp. # and Linux-HA contributors import os import sys import time import subprocess import json try: import urllib2 from urllib2 import URLError except ImportError: import urllib.request as urllib2 from urllib.error import URLError import socket from collections import defaultdict OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT")) sys.path.append(OCF_FUNCTIONS_DIR) import ocf ############################################################################## -VERSION = "0.10" +VERSION = "0.20" USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro()) attr_globalPullState = "azure-events-az_globalPullState" attr_lastDocVersion = "azure-events-az_lastDocVersion" attr_curNodeState = "azure-events-az_curNodeState" attr_pendingEventIDs = "azure-events-az_pendingEventIDs" attr_healthstate = "#health-azure" default_loglevel = ocf.logging.INFO default_relevantEventTypes = set(["Reboot", "Redeploy"]) -global_pullMaxAttempts = 3 -global_pullDelaySecs = 1 - ############################################################################## class attrDict(defaultdict): """ A wrapper for accessing dict keys like an attribute """ def __init__(self, data): super(attrDict, self).__init__(attrDict) for d in data.keys(): self.__setattr__(d, data[d]) def __getattr__(self, key): try: return self[key] except KeyError: raise AttributeError(key) def __setattr__(self, key, value): self[key] = value ############################################################################## class azHelper: """ Helper class for Azure's metadata API (including Scheduled Events) """ metadata_host = "http://169.254.169.254/metadata" instance_api = "instance" events_api = "scheduledevents" - api_version = "2019-08-01" + events_api_version = "2020-07-01" + instance_api_version = "2021-12-13" @staticmethod - def _sendMetadataRequest(endpoint, postData=None): + def _sendMetadataRequest(endpoint, postData=None, api_version="2019-08-01"): """ Send a request to Azure's Azure Metadata Service API """ - url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version) + + retryCount = int(ocf.get_parameter("retry_count",3)) + retryWaitTime = int(ocf.get_parameter("retry_wait",20)) + requestTimeout = int(ocf.get_parameter("request_timeout",15)) + + url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, api_version) data = "" - ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData)) + ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s, retry_count = %s, retry_wait time = %s, request_timeout = %s" % (endpoint, postData, retryCount, retryWaitTime, requestTimeout)) ocf.logger.debug("_sendMetadataRequest: url = %s" % url) if postData and type(postData) != bytes: postData = postData.encode() req = urllib2.Request(url, postData) req.add_header("Metadata", "true") req.add_header("User-Agent", USER_AGENT) - try: - resp = urllib2.urlopen(req) - except URLError as e: - if hasattr(e, 'reason'): - ocf.logger.warning("Failed to reach the server: %s" % e.reason) - clusterHelper.setAttr(attr_globalPullState, "IDLE") - elif hasattr(e, 'code'): - ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) - clusterHelper.setAttr(attr_globalPullState, "IDLE") - else: - data = resp.read() - ocf.logger.debug("_sendMetadataRequest: response = %s" % data) + + if retryCount > 0: + ocf.logger.debug("_sendMetadataRequest: retry enabled") + + successful = None + for retry in range(retryCount+1): + try: + resp = urllib2.urlopen(req, timeout=requestTimeout) + except Exception as e: + excType = e.__class__.__name__ + if excType == TimeoutError.__name__: + ocf.logger.warning("Request timed out after %s seconds Error: %s" % (requestTimeout, e)) + if excType == URLError.__name__: + if hasattr(e, 'reason'): + ocf.logger.warning("Failed to reach the server: %s" % e.reason) + elif hasattr(e, 'code'): + ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code) + + if retryCount > 1 and retry != retryCount: + ocf.logger.warning("Request failed, retry (%s/%s) wait %s seconds before retry (wait time)" % (retry + 1,retryCount,retryWaitTime)) + time.sleep(retryWaitTime) + + else: + data = resp.read() + ocf.logger.debug("_sendMetadataRequest: response = %s" % data) + successful = 1 + break + + # When no request was successful also with retry enabled, set the cluster to idle + if successful is None: + clusterHelper.setAttr(attr_globalPullState, "IDLE") if data: data = json.loads(data) ocf.logger.debug("_sendMetadataRequest: finished") return data @staticmethod def getInstanceInfo(): """ Fetch details about the current VM from Azure's Azure Metadata Service API """ ocf.logger.debug("getInstanceInfo: begin") - jsondata = azHelper._sendMetadataRequest(azHelper.instance_api) + jsondata = azHelper._sendMetadataRequest(azHelper.instance_api, None, azHelper.instance_api_version) ocf.logger.debug("getInstanceInfo: json = %s" % jsondata) if jsondata: ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"])) return attrDict(jsondata["compute"]) else: - ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info") + apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.instance_api, azHelper.instance_api_version) + ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info - call: %s" % apiCall) sys.exit(ocf.OCF_ERR_GENERIC) @staticmethod def pullScheduledEvents(): """ Retrieve all currently scheduled events via Azure Metadata Service API """ ocf.logger.debug("pullScheduledEvents: begin") - jsondata = azHelper._sendMetadataRequest(azHelper.events_api) + jsondata = azHelper._sendMetadataRequest(azHelper.events_api, None, azHelper.events_api_version) ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata) - ocf.logger.debug("pullScheduledEvents: finished") - return attrDict(jsondata) + if jsondata: + ocf.logger.debug("pullScheduledEvents: finished") + return attrDict(jsondata) + else: + apiCall = "%s/%s?api-version=%s" % (azHelper.metadata_host, azHelper.events_api, azHelper.events_api_version) + ocf.ocf_exit_reason("pullScheduledEvents: Unable to get scheduledevents info - call: %s" % apiCall) + sys.exit(ocf.OCF_ERR_GENERIC) + @staticmethod def forceEvents(eventIDs): """ Force a set of events to start immediately """ ocf.logger.debug("forceEvents: begin") events = [] for e in eventIDs: events.append({ "EventId": e, }) postData = { "StartRequests" : events } ocf.logger.info("forceEvents: postData = %s" % postData) resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData)) ocf.logger.debug("forceEvents: finished") return ############################################################################## class clusterHelper: """ Helper functions for Pacemaker control via crm """ @staticmethod def _getLocation(node): """ Helper function to retrieve local/global attributes """ if node: return ["--node", node] else: return ["--type", "crm_config"] @staticmethod def _exec(command, *args): """ Helper function to execute a UNIX command """ args = list(args) ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args))) def flatten(*n): return (str(e) for a in n for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),))) command = list(flatten([command] + args)) ocf.logger.debug("_exec: cmd = %s" % " ".join(command)) try: ret = subprocess.check_output(command) if type(ret) != str: ret = ret.decode() ocf.logger.debug("_exec: return = %s" % ret) return ret.rstrip() except Exception as err: ocf.logger.exception(err) return None @staticmethod def setAttr(key, value, node=None): """ Set the value of a specific global/local attribute in the Pacemaker cluster """ ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node)) if value: ret = clusterHelper._exec("crm_attribute", "--name", key, "--update", value, clusterHelper._getLocation(node)) else: ret = clusterHelper._exec("crm_attribute", "--name", key, "--delete", clusterHelper._getLocation(node)) ocf.logger.debug("setAttr: finished") return len(ret) == 0 @staticmethod def getAttr(key, node=None): """ Retrieve a global/local attribute from the Pacemaker cluster """ ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node)) val = clusterHelper._exec("crm_attribute", "--name", key, "--query", "--quiet", "--default", "", clusterHelper._getLocation(node)) ocf.logger.debug("getAttr: finished") if not val: return None return val if not val.isdigit() else int(val) @staticmethod def getAllNodes(): """ Get a list of hostnames for all nodes in the Pacemaker cluster """ ocf.logger.debug("getAllNodes: begin") nodes = [] nodeList = clusterHelper._exec("crm_node", "--list") for n in nodeList.split("\n"): nodes.append(n.split()[1]) ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes)) return nodes @staticmethod def getHostNameFromAzName(azName): """ Helper function to get the actual host name from an Azure node name """ return clusterHelper.getAttr("hostName_%s" % azName) @staticmethod def removeHoldFromNodes(): """ Remove the ON_HOLD state from all nodes in the Pacemaker cluster """ ocf.logger.debug("removeHoldFromNodes: begin") for n in clusterHelper.getAllNodes(): if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD": clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n) ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n) ocf.logger.debug("removeHoldFromNodes: finished") return False @staticmethod def otherNodesAvailable(exceptNode): """ Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE """ ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode) for n in clusterHelper.getAllNodes(): state = clusterHelper.getAttr(attr_curNodeState, node=n) state = stringToNodeState(state) if state else AVAILABLE if state == AVAILABLE and n != exceptNode.hostName: ocf.logger.info("otherNodesAvailable: at least %s is available" % n) ocf.logger.debug("otherNodesAvailable: finished") return True ocf.logger.info("otherNodesAvailable: no other nodes are available") ocf.logger.debug("otherNodesAvailable: finished") return False @staticmethod def transitionSummary(): """ Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby) """ # Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node? # # crm_simulate -LS # Transition Summary: # * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1) # * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0) # * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1) # * Start rsc_nc_HN1_HDB03 (hsr3-db1) # # Excepted result when there are no pending actions: # Transition Summary: ocf.logger.debug("transitionSummary: begin") summary = clusterHelper._exec("crm_simulate", "-LS") if not summary: ocf.logger.warning("transitionSummary: could not load transition summary") return "" if summary.find("Transition Summary:") < 0: ocf.logger.debug("transitionSummary: no transactions: %s" % summary) return "" j=summary.find('Transition Summary:') + len('Transition Summary:') l=summary.lower().find('executing cluster transition:') ret = list(filter(str.strip, summary[j:l].split("\n"))) ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret)) return ret @staticmethod def listOperationsOnNode(node): """ Get a list of all current operations for a given node (used to check if any resources are pending) """ # hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0 # rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete # rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete # rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending # rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node) resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node) if len(resources) == 0: ret = [] else: ret = resources.split("\n") ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret)) return ret @staticmethod def noPendingResourcesOnNode(node): """ Check that there are no pending resources on a given node """ ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node) for r in clusterHelper.listOperationsOnNode(node): ocf.logger.debug("noPendingResourcesOnNode: * %s" % r) resource = r.split()[-1] if resource == "pending": ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource) ocf.logger.debug("noPendingResourcesOnNode: finished; return = False") return False ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node) ocf.logger.debug("noPendingResourcesOnNode: finished; return = True") return True @staticmethod def allResourcesStoppedOnNode(node): """ Check that all resources on a given node are stopped """ ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node) if clusterHelper.noPendingResourcesOnNode(node): if len(clusterHelper.transitionSummary()) == 0: ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True") return True ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty") ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node) ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False") return False ############################################################################## AVAILABLE = 0 # Node is online and ready to handle events STOPPING = 1 # Standby has been triggered, but some resources are still running IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available def stringToNodeState(name): if type(name) == int: return name if name == "STOPPING": return STOPPING if name == "IN_EVENT": return IN_EVENT if name == "ON_HOLD": return ON_HOLD return AVAILABLE def nodeStateToString(state): if state == STOPPING: return "STOPPING" if state == IN_EVENT: return "IN_EVENT" if state == ON_HOLD: return "ON_HOLD" return "AVAILABLE" ############################################################################## class Node: """ Core class implementing logic for a cluster node """ def __init__(self, ra): self.raOwner = ra self.azInfo = azHelper.getInstanceInfo() self.azName = self.azInfo.name self.hostName = socket.gethostname() self.setAttr("azName", self.azName) clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName) def getAttr(self, key): """ Get a local attribute """ return clusterHelper.getAttr(key, node=self.hostName) def setAttr(self, key, value): """ Set a local attribute """ return clusterHelper.setAttr(key, value, node=self.hostName) def selfOrOtherNode(self, node): """ Helper function to distinguish self/other node """ return node if node else self.hostName def setState(self, state, node=None): """ Set the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state))) clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node) ocf.logger.debug("setState: finished") def getState(self, node=None): """ Get the state for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getState: begin; node = %s" % node) state = clusterHelper.getAttr(attr_curNodeState, node=node) ocf.logger.debug("getState: state = %s" % state) ocf.logger.debug("getState: finished") if not state: return AVAILABLE return stringToNodeState(state) def setEventIDs(self, eventIDs, node=None): """ Set pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs))) if eventIDs: eventIDStr = ",".join(eventIDs) else: eventIDStr = None clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node) ocf.logger.debug("setEventIDs: finished") return def getEventIDs(self, node=None): """ Get pending EventIDs for a given node (or self) """ node = self.selfOrOtherNode(node) ocf.logger.debug("getEventIDs: begin; node = %s" % node) eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node) if eventIDStr: eventIDs = eventIDStr.split(",") else: eventIDs = None ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs)) return eventIDs def updateNodeStateAndEvents(self, state, eventIDs, node=None): """ Set the state and pending EventIDs for a given node (or self) """ ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs))) self.setState(state, node=node) self.setEventIDs(eventIDs, node=node) ocf.logger.debug("updateNodeStateAndEvents: finished") return state def putNodeStandby(self, node=None): """ Put self to standby """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeStandby: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "--node", node, "--name", attr_healthstate, "--update", "-1000000", "--lifetime=forever") ocf.logger.debug("putNodeStandby: finished") def isNodeInStandby(self, node=None): """ check if node is in standby """ node = self.selfOrOtherNode(node) ocf.logger.debug("isNodeInStandby: begin; node = %s" % node) isInStandy = False healthAttributeStr = clusterHelper.getAttr(attr_healthstate, node) if healthAttributeStr is not None: try: healthAttribute = int(healthAttributeStr) isInStandy = healthAttribute < 0 except ValueError: # Handle the exception ocf.logger.warn("Health attribute %s on node %s cannot be converted to an integer value" % (healthAttributeStr, node)) - + ocf.logger.debug("isNodeInStandby: finished - result %s" % isInStandy) return isInStandy def putNodeOnline(self, node=None): """ Put self back online """ node = self.selfOrOtherNode(node) ocf.logger.debug("putNodeOnline: begin; node = %s" % node) clusterHelper._exec("crm_attribute", "--node", node, "--name", "#health-azure", "--update", "0", "--lifetime=forever") ocf.logger.debug("putNodeOnline: finished") def separateEvents(self, events): """ Split own/other nodes' events """ ocf.logger.debug("separateEvents: begin; events = %s" % str(events)) localEvents = [] remoteEvents = [] for e in events: e = attrDict(e) if e.EventType not in self.raOwner.relevantEventTypes: continue if self.azName in e.Resources: localEvents.append(e) else: remoteEvents.append(e) ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents))) return (localEvents, remoteEvents) ############################################################################## class raAzEvents: """ Main class for resource agent """ def __init__(self, relevantEventTypes): self.node = Node(self) self.relevantEventTypes = relevantEventTypes def monitor(self): ocf.logger.debug("monitor: begin") - + events = azHelper.pullScheduledEvents() # get current document version curDocVersion = events.DocumentIncarnation lastDocVersion = self.node.getAttr(attr_lastDocVersion) ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion)) # split events local/remote (localEvents, remoteEvents) = self.node.separateEvents(events.Events) # ensure local events are only executing once if curDocVersion == lastDocVersion: ocf.logger.info("monitor: already handled curDocVersion, skip") return ocf.OCF_SUCCESS - localAzEventIDs = set() + localAzEventIds = dict() for e in localEvents: - localAzEventIDs.add(e.EventId) + localAzEventIds[e.EventId] = json.dumps(e) curState = self.node.getState() clusterEventIDs = self.node.getEventIDs() ocf.logger.debug("monitor: curDocVersion has not been handled yet") - + if clusterEventIDs: # there are pending events set, so our state must be STOPPING or IN_EVENT i = 0; touchedEventIDs = False while i < len(clusterEventIDs): # clean up pending events that are already finished according to AZ - if clusterEventIDs[i] not in localAzEventIDs: + if clusterEventIDs[i] not in localAzEventIds.keys(): ocf.logger.info("monitor: remove finished local clusterEvent %s" % (clusterEventIDs[i])) clusterEventIDs.pop(i) touchedEventIDs = True else: i += 1 if len(clusterEventIDs) > 0: # there are still pending events (either because we're still stopping, or because the event is still in place) # either way, we need to wait if touchedEventIDs: ocf.logger.info("monitor: added new local clusterEvent %s" % str(clusterEventIDs)) self.node.setEventIDs(clusterEventIDs) else: ocf.logger.info("monitor: no local clusterEvents were updated") else: # there are no more pending events left after cleanup if clusterHelper.noPendingResourcesOnNode(self.node.hostName): # and no pending resources on the node -> set it back online ocf.logger.info("monitor: all local events finished -> clean up, put node online and AVAILABLE") curState = self.node.updateNodeStateAndEvents(AVAILABLE, None) self.node.putNodeOnline() clusterHelper.removeHoldFromNodes() # If Azure Scheduled Events are not used for 24 hours (e.g. because the cluster was asleep), it will be disabled for a VM. # When the cluster wakes up and starts using it again, the DocumentIncarnation is reset. # We need to remove it during cleanup, otherwise azure-events-az will not process the event after wakeup self.node.setAttr(attr_lastDocVersion, None) else: ocf.logger.info("monitor: all local events finished, but some resources have not completed startup yet -> wait") else: if curState == AVAILABLE: - if len(localAzEventIDs) > 0: + if len(localAzEventIds) > 0: if clusterHelper.otherNodesAvailable(self.node): - ocf.logger.info("monitor: can handle local events %s -> set state STOPPING" % (str(localAzEventIDs))) - curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIDs) + ocf.logger.info("monitor: can handle local events %s -> set state STOPPING - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) + curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIds.keys()) else: - ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(localAzEventIDs)) + ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD - %s" % (str(list(localAzEventIds.keys())), str(list(localAzEventIds.values())))) self.node.setState(ON_HOLD) else: ocf.logger.debug("monitor: no local azEvents to handle") if curState == STOPPING: eventIDsForNode = {} if clusterHelper.noPendingResourcesOnNode(self.node.hostName): if not self.node.isNodeInStandby(): ocf.logger.info("monitor: all local resources are started properly -> put node standby and exit") self.node.putNodeStandby() return ocf.OCF_SUCCESS for e in localEvents: ocf.logger.info("monitor: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources))) # before we can force an event to start, we need to ensure all nodes involved have stopped their resources if e.EventStatus == "Scheduled": allNodesStopped = True for azName in e.Resources: hostName = clusterHelper.getHostNameFromAzName(azName) state = self.node.getState(node=hostName) if state == STOPPING: # the only way we can continue is when node state is STOPPING, but all resources have been stopped if not clusterHelper.allResourcesStoppedOnNode(hostName): ocf.logger.info("monitor: (at least) node %s has still resources running -> wait" % hostName) allNodesStopped = False break elif state in (AVAILABLE, IN_EVENT, ON_HOLD): ocf.logger.info("monitor: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state))) allNodesStopped = False break if allNodesStopped: ocf.logger.info("monitor: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId)) for n in e.Resources: hostName = clusterHelper.getHostNameFromAzName(n) if hostName in eventIDsForNode: eventIDsForNode[hostName].append(e.EventId) else: eventIDsForNode[hostName] = [e.EventId] elif e.EventStatus == "Started": ocf.logger.info("monitor: remote event already started") # force the start of all events whose nodes are ready (i.e. have no more resources running) if len(eventIDsForNode.keys()) > 0: eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist]) ocf.logger.info("monitor: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce))) for node, eventId in eventIDsForNode.items(): self.node.updateNodeStateAndEvents(IN_EVENT, eventId, node=node) azHelper.forceEvents(eventIDsToForce) self.node.setAttr(attr_lastDocVersion, curDocVersion) else: ocf.logger.info("monitor: some local resources are not clean yet -> wait") ocf.logger.debug("monitor: finished") return ocf.OCF_SUCCESS ############################################################################## def setLoglevel(verbose): # set up writing into syslog loglevel = default_loglevel if verbose: opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1)) urllib2.install_opener(opener) loglevel = ocf.logging.DEBUG ocf.log.setLevel(loglevel) description = ( "Microsoft Azure Scheduled Events monitoring agent", """This resource agent implements a monitor for scheduled (maintenance) events for a Microsoft Azure VM. If any relevant events are found, it moves all Pacemaker resources away from the affected node to allow for a graceful shutdown. Deployment: crm configure primitive rsc_azure-events-az ocf:heartbeat:azure-events-az \ op monitor interval=10s crm configure clone cln_azure-events-az rsc_azure-events-az For further information on Microsoft Azure Scheduled Events, please refer to the following documentation: https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events """) def monitor_action(eventTypes): relevantEventTypes = set(eventTypes.split(",") if eventTypes else []) ra = raAzEvents(relevantEventTypes) return ra.monitor() def validate_action(eventTypes): if eventTypes: for event in eventTypes.split(","): if event not in ("Freeze", "Reboot", "Redeploy"): ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes) return ocf.OCF_ERR_CONFIGURED return ocf.OCF_SUCCESS def main(): agent = ocf.Agent("azure-events-az", shortdesc=description[0], longdesc=description[1]) agent.add_parameter( "eventTypes", shortdesc="List of resources to be considered", longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)", content_type="string", default="Reboot,Redeploy") agent.add_parameter( "verbose", shortdesc="Enable verbose agent logging", longdesc="Set to true to enable verbose logging", content_type="boolean", default="false") + agent.add_parameter( + "retry_count", + shortdesc="Azure IMDS webservice retry count", + longdesc="Set to any number bigger than zero to enable retry count", + content_type="integer", + default="3") + agent.add_parameter( + "retry_wait", + shortdesc="Configure a retry wait time", + longdesc="Set retry wait time in seconds", + content_type="integer", + default="20") + agent.add_parameter( + "request_timeout", + shortdesc="Configure a request timeout", + longdesc="Set request timeout in seconds", + content_type="integer", + default="15") agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS) agent.add_action("validate-all", timeout=20, handler=validate_action) agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action) setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false"))) agent.run() if __name__ == '__main__': main() diff --git a/heartbeat/ocf.py b/heartbeat/ocf.py index dda2fed4b..571cd1966 100644 --- a/heartbeat/ocf.py +++ b/heartbeat/ocf.py @@ -1,486 +1,524 @@ # # Copyright (c) 2016 Red Hat, Inc, Oyvind Albrigtsen # All Rights Reserved. # # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# +# import sys, os, logging, syslog argv=sys.argv env=os.environ # # Common variables for the OCF Resource Agents supplied by # heartbeat. # OCF_SUCCESS=0 OCF_ERR_GENERIC=1 OCF_ERR_ARGS=2 OCF_ERR_UNIMPLEMENTED=3 OCF_ERR_PERM=4 OCF_ERR_INSTALLED=5 OCF_ERR_CONFIGURED=6 OCF_NOT_RUNNING=7 # Non-standard values. # # OCF does not include the concept of master/slave resources so we # need to extend it so we can discover a resource's complete state. # -# OCF_RUNNING_MASTER: +# OCF_RUNNING_MASTER: # The resource is in "master" mode and fully operational # OCF_FAILED_MASTER: # The resource is in "master" mode but in a failed state -# +# # The extra two values should only be used during a probe. # # Probes are used to discover resources that were started outside of # the CRM and/or left behind if the LRM fails. -# +# # They can be identified in RA scripts by checking for: # [ "${__OCF_ACTION}" = "monitor" -a "${OCF_RESKEY_CRM_meta_interval}" = "0" ] -# +# # Failed "slaves" should continue to use: OCF_ERR_GENERIC # Fully operational "slaves" should continue to use: OCF_SUCCESS # OCF_RUNNING_MASTER=8 OCF_FAILED_MASTER=9 ## Own logger handler that uses old-style syslog handler as otherwise ## everything is sourced from /dev/syslog class SyslogLibHandler(logging.StreamHandler): """ A handler class that correctly push messages into syslog """ def emit(self, record): syslog_level = { logging.CRITICAL:syslog.LOG_CRIT, logging.ERROR:syslog.LOG_ERR, logging.WARNING:syslog.LOG_WARNING, logging.INFO:syslog.LOG_INFO, logging.DEBUG:syslog.LOG_DEBUG, logging.NOTSET:syslog.LOG_DEBUG, }[record.levelno] msg = self.format(record) # syslog.syslog can not have 0x00 character inside or exception # is thrown syslog.syslog(syslog_level, msg.replace("\x00","\n")) return OCF_RESOURCE_INSTANCE = env.get("OCF_RESOURCE_INSTANCE") OCF_ACTION = env.get("__OCF_ACTION") if OCF_ACTION is None and len(argv) == 2: OCF_ACTION = argv[1] HA_DEBUG = env.get("HA_debug", 0) HA_DATEFMT = env.get("HA_DATEFMT", "%b %d %T ") HA_LOGFACILITY = env.get("HA_LOGFACILITY") HA_LOGFILE = env.get("HA_LOGFILE") HA_DEBUGLOG = env.get("HA_DEBUGLOG") log = logging.getLogger(os.path.basename(argv[0])) log.setLevel(logging.DEBUG) ## add logging to stderr if sys.stdout.isatty(): seh = logging.StreamHandler(stream=sys.stderr) if HA_DEBUG == 0: seh.setLevel(logging.WARNING) sehformatter = logging.Formatter('%(filename)s(%(OCF_RESOURCE_INSTANCE)s)[%(process)s]:\t%(asctime)s%(levelname)s: %(message)s', datefmt=HA_DATEFMT) seh.setFormatter(sehformatter) log.addHandler(seh) ## add logging to syslog if HA_LOGFACILITY: slh = SyslogLibHandler() if HA_DEBUG == 0: slh.setLevel(logging.WARNING) slhformatter = logging.Formatter('%(levelname)s: %(message)s') slh.setFormatter(slhformatter) log.addHandler(slh) ## add logging to file if HA_LOGFILE: lfh = logging.FileHandler(HA_LOGFILE) if HA_DEBUG == 0: lfh.setLevel(logging.WARNING) lfhformatter = logging.Formatter('%(filename)s(%(OCF_RESOURCE_INSTANCE)s)[%(process)s]:\t%(asctime)s%(levelname)s: %(message)s', datefmt=HA_DATEFMT) lfh.setFormatter(lfhformatter) log.addHandler(lfh) ## add debug logging to file if HA_DEBUGLOG and HA_LOGFILE != HA_DEBUGLOG: dfh = logging.FileHandler(HA_DEBUGLOG) if HA_DEBUG == 0: dfh.setLevel(logging.WARNING) dfhformatter = logging.Formatter('%(filename)s(%(OCF_RESOURCE_INSTANCE)s)[%(process)s]:\t%(asctime)s%(levelname)s: %(message)s', datefmt=HA_DATEFMT) dfh.setFormatter(dfhformatter) log.addHandler(dfh) logger = logging.LoggerAdapter(log, {'OCF_RESOURCE_INSTANCE': OCF_RESOURCE_INSTANCE}) _exit_reason_set = False def ocf_exit_reason(msg): """ Print exit error string to stderr. Allows the OCF agent to provide a string describing why the exit code was returned. """ global _exit_reason_set cookie = env.get("OCF_EXIT_REASON_PREFIX", "ocf-exit-reason:") sys.stderr.write("{}{}\n".format(cookie, msg)) sys.stderr.flush() logger.error(msg) _exit_reason_set = True def have_binary(name): """ True if binary exists, False otherwise. """ def _access_check(fn): return (os.path.exists(fn) and os.access(fn, os.F_OK | os.X_OK) and not os.path.isdir(fn)) if _access_check(name): return True path = env.get("PATH", os.defpath).split(os.pathsep) seen = set() for dir in path: dir = os.path.normcase(dir) if dir not in seen: seen.add(dir) name2 = os.path.join(dir, name) if _access_check(name2): return True return False def is_true(val): """ Convert an OCF truth value to a Python boolean. """ return val in ("yes", "true", "1", 1, "YES", "TRUE", "ja", "on", "ON", True) def is_probe(): """ A probe is defined as a monitor operation with an interval of zero. This is called by Pacemaker to check the status of a possibly not running resource. """ return (OCF_ACTION == "monitor" and ( env.get("OCF_RESKEY_CRM_meta_interval", "") == "0" or env.get("OCF_RESKEY_CRM_meta_interval", "") == "" )) def get_parameter(name, default=None): """ Extract the parameter value from the environment """ return env.get("OCF_RESKEY_{}".format(name), default) def distro(): """ Return name of distribution/platform. If possible, returns "name/version", else just "name". """ import subprocess import platform try: ret = subprocess.check_output(["lsb_release", "-si"]) if type(ret) != str: ret = ret.decode() distro = ret.strip() ret = subprocess.check_output(["lsb_release", "-sr"]) if type(ret) != str: ret = ret.decode() version = ret.strip() return "{}/{}".format(distro, version) except Exception: if os.path.exists("/etc/debian_version"): return "Debian" if os.path.exists("/etc/SuSE-release"): return "SUSE" if os.path.exists("/etc/redhat-release"): return "Redhat" return platform.system() class Parameter(object): def __init__(self, name, shortdesc, longdesc, content_type, unique, required, default): self.name = name self.shortdesc = shortdesc self.longdesc = longdesc self.content_type = content_type self.unique = unique self.required = required self.default = default def __str__(self): return self.to_xml() def to_xml(self): ret = '' + "\n" ret += '' + self.shortdesc + '' + "\n" ret += ' {ocf_version} {longdesc} {shortdesc} {parameters} {actions} """.format(name=self.name, version = self.version, ocf_version = self.ocf_version, longdesc=self.longdesc, shortdesc=self.shortdesc, parameters="".join(p.to_xml() for p in self.parameters), actions="".join(a.to_xml() for a in self.actions)) def run(self): run(self) def run(agent, handlers=None): """ Main loop implementation for resource agents. Does not return. Arguments: agent: Agent object. handlers: Dict of action name to handler function. Handler functions can take parameters as arguments, the run loop will read parameter values from the environment and pass to the handler. """ import inspect agent._handlers.update(handlers or {}) handlers = agent._handlers def check_required_params(): for p in agent.parameters: if p.required and get_parameter(p.name) is None: ocf_exit_reason("{}: Required parameter not set".format(p.name)) sys.exit(OCF_ERR_CONFIGURED) def call_handler(func): if hasattr(inspect, 'signature'): params = inspect.signature(func).parameters.keys() else: params = inspect.getargspec(func).args if 'self' in params: params.remove('self') def value_for_parameter(param): val = get_parameter(param) if val is not None: return val for p in agent.parameters: if p.name == param: return p.default arglist = [value_for_parameter(p) for p in params] try: rc = func(*arglist) if rc is None: rc = OCF_SUCCESS return rc except Exception as err: if not _exit_reason_set: ocf_exit_reason(str(err)) else: logger.error(str(err)) return OCF_ERR_GENERIC meta_data_action = False for action in agent.actions: if action.name == "meta-data": meta_data_action = True break if not meta_data_action: agent.add_action("meta-data", timeout=10) if len(sys.argv) == 2 and sys.argv[1] in ("-h", "--help"): sys.stdout.write("usage: %s {%s}\n\n" % (sys.argv[0], "|".join(sorted(handlers.keys()))) + "Expects to have a fully populated OCF RA compliant environment set.\n") sys.exit(OCF_SUCCESS) if OCF_ACTION is None: ocf_exit_reason("No action argument set") sys.exit(OCF_ERR_UNIMPLEMENTED) if OCF_ACTION in ('meta-data', 'usage', 'methods'): sys.stdout.write(agent.to_xml() + "\n") sys.exit(OCF_SUCCESS) check_required_params() if OCF_ACTION in handlers: rc = call_handler(handlers[OCF_ACTION]) sys.exit(rc) sys.exit(OCF_ERR_UNIMPLEMENTED) + if __name__ == "__main__": import unittest + import logging class TestMetadata(unittest.TestCase): def test_noparams_noactions(self): m = Agent("foo", shortdesc="shortdesc", longdesc="longdesc") self.assertEqual(""" - + 1.0 longdesc shortdesc """, str(m)) def test_params_actions(self): m = Agent("foo", shortdesc="shortdesc", longdesc="longdesc") m.add_parameter("testparam") m.add_action("start") self.assertEqual(str(m.actions[0]), '\n') + def test_retry_params_actions(self): + log= logging.getLogger( "test_retry_params_actions" ) + + m = Agent("foo", shortdesc="shortdesc", longdesc="longdesc") + m.add_parameter( + "retry_count", + shortdesc="Azure ims webservice retry count", + longdesc="Set to any number bigger than zero to enable retry count", + content_type="integer", + default="0") + m.add_parameter( + "retry_wait", + shortdesc="Configure a retry wait time", + longdesc="Set retry wait time in seconds", + content_type="integer", + default="20") + m.add_parameter( + "request_timeout", + shortdesc="Configure a request timeout", + longdesc="Set request timeout in seconds", + content_type="integer", + default="15") + + m.add_action("start") + + log.debug( "actions= %s", str(m.actions[0] )) + self.assertEqual(str(m.actions[0]), '\n') + + log.debug( "parameters= %s", str(m.parameters[0] )) + log.debug( "parameters= %s", str(m.parameters[1] )) + log.debug( "parameters= %s", str(m.parameters[2] )) + self.assertEqual(str(m.parameters[0]), '\nSet to any number bigger than zero to enable retry count\nAzure ims webservice retry count\n\n\n') + self.assertEqual(str(m.parameters[1]), '\nSet retry wait time in seconds\nConfigure a retry wait time\n\n\n') + self.assertEqual(str(m.parameters[2]), '\nSet request timeout in seconds\nConfigure a request timeout\n\n\n') + + logging.basicConfig( stream=sys.stderr ) unittest.main()