Page MenuHomeClusterLabs Projects

No OneTemporary

diff --git a/heartbeat/azure-events-az.in b/heartbeat/azure-events-az.in
index 59d095306..67c02c642 100644
--- a/heartbeat/azure-events-az.in
+++ b/heartbeat/azure-events-az.in
@@ -1,771 +1,771 @@
#!@PYTHON@ -tt
#
# Resource agent for monitoring Azure Scheduled Events
#
# License: GNU General Public License (GPL)
# (c) 2018 Tobias Niekamp, Microsoft Corp.
# and Linux-HA contributors
import os
import sys
import time
import subprocess
import json
try:
import urllib2
from urllib2 import URLError
except ImportError:
import urllib.request as urllib2
from urllib.error import URLError
import socket
from collections import defaultdict
OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT"))
sys.path.append(OCF_FUNCTIONS_DIR)
import ocf
##############################################################################
VERSION = "0.10"
USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro())
attr_globalPullState = "azure-events-az_globalPullState"
attr_lastDocVersion = "azure-events-az_lastDocVersion"
attr_curNodeState = "azure-events-az_curNodeState"
attr_pendingEventIDs = "azure-events-az_pendingEventIDs"
attr_healthstate = "#health-azure"
default_loglevel = ocf.logging.INFO
default_relevantEventTypes = set(["Reboot", "Redeploy"])
global_pullMaxAttempts = 3
global_pullDelaySecs = 1
##############################################################################
class attrDict(defaultdict):
"""
A wrapper for accessing dict keys like an attribute
"""
def __init__(self, data):
super(attrDict, self).__init__(attrDict)
for d in data.keys():
self.__setattr__(d, data[d])
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(key)
def __setattr__(self, key, value):
self[key] = value
##############################################################################
class azHelper:
"""
Helper class for Azure's metadata API (including Scheduled Events)
"""
metadata_host = "http://169.254.169.254/metadata"
instance_api = "instance"
events_api = "scheduledevents"
api_version = "2019-08-01"
@staticmethod
def _sendMetadataRequest(endpoint, postData=None):
"""
Send a request to Azure's Azure Metadata Service API
"""
url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version)
data = ""
ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData))
ocf.logger.debug("_sendMetadataRequest: url = %s" % url)
if postData and type(postData) != bytes:
postData = postData.encode()
req = urllib2.Request(url, postData)
req.add_header("Metadata", "true")
req.add_header("User-Agent", USER_AGENT)
try:
resp = urllib2.urlopen(req)
except URLError as e:
if hasattr(e, 'reason'):
ocf.logger.warning("Failed to reach the server: %s" % e.reason)
clusterHelper.setAttr(attr_globalPullState, "IDLE")
elif hasattr(e, 'code'):
ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code)
clusterHelper.setAttr(attr_globalPullState, "IDLE")
else:
data = resp.read()
ocf.logger.debug("_sendMetadataRequest: response = %s" % data)
if data:
data = json.loads(data)
ocf.logger.debug("_sendMetadataRequest: finished")
return data
@staticmethod
def getInstanceInfo():
"""
Fetch details about the current VM from Azure's Azure Metadata Service API
"""
ocf.logger.debug("getInstanceInfo: begin")
jsondata = azHelper._sendMetadataRequest(azHelper.instance_api)
ocf.logger.debug("getInstanceInfo: json = %s" % jsondata)
if jsondata:
ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"]))
return attrDict(jsondata["compute"])
else:
ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info")
sys.exit(ocf.OCF_ERR_GENERIC)
@staticmethod
def pullScheduledEvents():
"""
Retrieve all currently scheduled events via Azure Metadata Service API
"""
ocf.logger.debug("pullScheduledEvents: begin")
jsondata = azHelper._sendMetadataRequest(azHelper.events_api)
ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata)
ocf.logger.debug("pullScheduledEvents: finished")
return attrDict(jsondata)
@staticmethod
def forceEvents(eventIDs):
"""
Force a set of events to start immediately
"""
ocf.logger.debug("forceEvents: begin")
events = []
for e in eventIDs:
events.append({
"EventId": e,
})
postData = {
"StartRequests" : events
}
ocf.logger.info("forceEvents: postData = %s" % postData)
resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData))
ocf.logger.debug("forceEvents: finished")
return
##############################################################################
class clusterHelper:
"""
Helper functions for Pacemaker control via crm
"""
@staticmethod
def _getLocation(node):
"""
Helper function to retrieve local/global attributes
"""
if node:
return ["--node", node]
else:
return ["--type", "crm_config"]
@staticmethod
def _exec(command, *args):
"""
Helper function to execute a UNIX command
"""
args = list(args)
ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args)))
def flatten(*n):
return (str(e) for a in n
for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),)))
command = list(flatten([command] + args))
ocf.logger.debug("_exec: cmd = %s" % " ".join(command))
try:
ret = subprocess.check_output(command)
if type(ret) != str:
ret = ret.decode()
ocf.logger.debug("_exec: return = %s" % ret)
return ret.rstrip()
except Exception as err:
ocf.logger.exception(err)
return None
@staticmethod
def setAttr(key, value, node=None):
"""
Set the value of a specific global/local attribute in the Pacemaker cluster
"""
ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node))
if value:
ret = clusterHelper._exec("crm_attribute",
"--name", key,
"--update", value,
clusterHelper._getLocation(node))
else:
ret = clusterHelper._exec("crm_attribute",
"--name", key,
"--delete",
clusterHelper._getLocation(node))
ocf.logger.debug("setAttr: finished")
return len(ret) == 0
@staticmethod
def getAttr(key, node=None):
"""
Retrieve a global/local attribute from the Pacemaker cluster
"""
ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node))
val = clusterHelper._exec("crm_attribute",
"--name", key,
"--query", "--quiet",
"--default", "",
clusterHelper._getLocation(node))
ocf.logger.debug("getAttr: finished")
if not val:
return None
return val if not val.isdigit() else int(val)
@staticmethod
def getAllNodes():
"""
Get a list of hostnames for all nodes in the Pacemaker cluster
"""
ocf.logger.debug("getAllNodes: begin")
nodes = []
nodeList = clusterHelper._exec("crm_node", "--list")
for n in nodeList.split("\n"):
nodes.append(n.split()[1])
ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes))
return nodes
@staticmethod
def getHostNameFromAzName(azName):
"""
Helper function to get the actual host name from an Azure node name
"""
return clusterHelper.getAttr("hostName_%s" % azName)
@staticmethod
def removeHoldFromNodes():
"""
Remove the ON_HOLD state from all nodes in the Pacemaker cluster
"""
ocf.logger.debug("removeHoldFromNodes: begin")
for n in clusterHelper.getAllNodes():
if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD":
clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n)
ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n)
ocf.logger.debug("removeHoldFromNodes: finished")
return False
@staticmethod
def otherNodesAvailable(exceptNode):
"""
Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE
"""
ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode)
for n in clusterHelper.getAllNodes():
state = clusterHelper.getAttr(attr_curNodeState, node=n)
state = stringToNodeState(state) if state else AVAILABLE
if state == AVAILABLE and n != exceptNode.hostName:
ocf.logger.info("otherNodesAvailable: at least %s is available" % n)
ocf.logger.debug("otherNodesAvailable: finished")
return True
ocf.logger.info("otherNodesAvailable: no other nodes are available")
ocf.logger.debug("otherNodesAvailable: finished")
return False
@staticmethod
def transitionSummary():
"""
Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby)
"""
# <tniek> Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node?
# # crm_simulate -Ls
# Transition Summary:
# * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1)
# * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0)
# * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1)
# * Start rsc_nc_HN1_HDB03 (hsr3-db1)
# # Excepted result when there are no pending actions:
# Transition Summary:
ocf.logger.debug("transitionSummary: begin")
summary = clusterHelper._exec("crm_simulate", "-Ls")
if not summary:
ocf.logger.warning("transitionSummary: could not load transition summary")
- return False
+ return ""
if summary.find("Transition Summary:") < 0:
- ocf.logger.warning("transitionSummary: received unexpected transition summary: %s" % summary)
- return False
+ ocf.logger.debug("transitionSummary: no transactions: %s" % summary)
+ return ""
summary = summary.split("Transition Summary:")[1]
ret = summary.split("\n").pop(0)
ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret))
return ret
@staticmethod
def listOperationsOnNode(node):
"""
Get a list of all current operations for a given node (used to check if any resources are pending)
"""
# hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0
# rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete
# rsc_azure-events-az (ocf::heartbeat:azure-events-az): Started: rsc_azure-events-az_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete
# rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending
# rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete
ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node)
resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node)
if len(resources) == 0:
ret = []
else:
ret = resources.split("\n")
ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret))
return ret
@staticmethod
def noPendingResourcesOnNode(node):
"""
Check that there are no pending resources on a given node
"""
ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node)
for r in clusterHelper.listOperationsOnNode(node):
ocf.logger.debug("noPendingResourcesOnNode: * %s" % r)
resource = r.split()[-1]
if resource == "pending":
ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource)
ocf.logger.debug("noPendingResourcesOnNode: finished; return = False")
return False
ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node)
ocf.logger.debug("noPendingResourcesOnNode: finished; return = True")
return True
@staticmethod
def allResourcesStoppedOnNode(node):
"""
Check that all resources on a given node are stopped
"""
ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node)
if clusterHelper.noPendingResourcesOnNode(node):
if len(clusterHelper.transitionSummary()) == 0:
ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node)
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True")
return True
ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty")
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
return False
ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node)
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
return False
##############################################################################
AVAILABLE = 0 # Node is online and ready to handle events
STOPPING = 1 # Standby has been triggered, but some resources are still running
IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service
ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available
def stringToNodeState(name):
if type(name) == int: return name
if name == "STOPPING": return STOPPING
if name == "IN_EVENT": return IN_EVENT
if name == "ON_HOLD": return ON_HOLD
return AVAILABLE
def nodeStateToString(state):
if state == STOPPING: return "STOPPING"
if state == IN_EVENT: return "IN_EVENT"
if state == ON_HOLD: return "ON_HOLD"
return "AVAILABLE"
##############################################################################
class Node:
"""
Core class implementing logic for a cluster node
"""
def __init__(self, ra):
self.raOwner = ra
self.azInfo = azHelper.getInstanceInfo()
self.azName = self.azInfo.name
self.hostName = socket.gethostname()
self.setAttr("azName", self.azName)
clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName)
def getAttr(self, key):
"""
Get a local attribute
"""
return clusterHelper.getAttr(key, node=self.hostName)
def setAttr(self, key, value):
"""
Set a local attribute
"""
return clusterHelper.setAttr(key, value, node=self.hostName)
def selfOrOtherNode(self, node):
"""
Helper function to distinguish self/other node
"""
return node if node else self.hostName
def setState(self, state, node=None):
"""
Set the state for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state)))
clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node)
ocf.logger.debug("setState: finished")
def getState(self, node=None):
"""
Get the state for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("getState: begin; node = %s" % node)
state = clusterHelper.getAttr(attr_curNodeState, node=node)
ocf.logger.debug("getState: state = %s" % state)
ocf.logger.debug("getState: finished")
if not state:
return AVAILABLE
return stringToNodeState(state)
def setEventIDs(self, eventIDs, node=None):
"""
Set pending EventIDs for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs)))
if eventIDs:
eventIDStr = ",".join(eventIDs)
else:
eventIDStr = None
clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node)
ocf.logger.debug("setEventIDs: finished")
return
def getEventIDs(self, node=None):
"""
Get pending EventIDs for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("getEventIDs: begin; node = %s" % node)
eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node)
if eventIDStr:
eventIDs = eventIDStr.split(",")
else:
eventIDs = None
ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs))
return eventIDs
def updateNodeStateAndEvents(self, state, eventIDs, node=None):
"""
Set the state and pending EventIDs for a given node (or self)
"""
ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs)))
self.setState(state, node=node)
self.setEventIDs(eventIDs, node=node)
ocf.logger.debug("updateNodeStateAndEvents: finished")
return state
def putNodeStandby(self, node=None):
"""
Put self to standby
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("putNodeStandby: begin; node = %s" % node)
clusterHelper._exec("crm_attribute",
"--node", node,
"--name", attr_healthstate,
"--update", "-1000000",
"--lifetime=forever")
ocf.logger.debug("putNodeStandby: finished")
def isNodeInStandby(self, node=None):
"""
check if node is in standby
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("isNodeInStandby: begin; node = %s" % node)
isInStandy = False
healthAttributeStr = clusterHelper.getAttr(attr_healthstate, node)
if healthAttributeStr is not None:
try:
healthAttribute = int(healthAttributeStr)
isInStandy = healthAttribute < 0
except ValueError:
# Handle the exception
ocf.logger.warn("Health attribute %s on node %s cannot be converted to an integer value" % (healthAttributeStr, node))
ocf.logger.debug("isNodeInStandby: finished - result %s" % isInStandy)
return isInStandy
def putNodeOnline(self, node=None):
"""
Put self back online
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("putNodeOnline: begin; node = %s" % node)
clusterHelper._exec("crm_attribute",
"--node", node,
"--name", "#health-azure",
"--update", "0",
"--lifetime=forever")
ocf.logger.debug("putNodeOnline: finished")
def separateEvents(self, events):
"""
Split own/other nodes' events
"""
ocf.logger.debug("separateEvents: begin; events = %s" % str(events))
localEvents = []
remoteEvents = []
for e in events:
e = attrDict(e)
if e.EventType not in self.raOwner.relevantEventTypes:
continue
if self.azName in e.Resources:
localEvents.append(e)
else:
remoteEvents.append(e)
ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents)))
return (localEvents, remoteEvents)
##############################################################################
class raAzEvents:
"""
Main class for resource agent
"""
def __init__(self, relevantEventTypes):
self.node = Node(self)
self.relevantEventTypes = relevantEventTypes
def monitor(self):
ocf.logger.debug("monitor: begin")
events = azHelper.pullScheduledEvents()
# get current document version
curDocVersion = events.DocumentIncarnation
lastDocVersion = self.node.getAttr(attr_lastDocVersion)
ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion))
# split events local/remote
(localEvents, remoteEvents) = self.node.separateEvents(events.Events)
# ensure local events are only executing once
if curDocVersion == lastDocVersion:
ocf.logger.info("monitor: already handled curDocVersion, skip")
return ocf.OCF_SUCCESS
localAzEventIDs = set()
for e in localEvents:
localAzEventIDs.add(e.EventId)
curState = self.node.getState()
clusterEventIDs = self.node.getEventIDs()
ocf.logger.debug("monitor: curDocVersion has not been handled yet")
if clusterEventIDs:
# there are pending events set, so our state must be STOPPING or IN_EVENT
i = 0; touchedEventIDs = False
while i < len(clusterEventIDs):
# clean up pending events that are already finished according to AZ
if clusterEventIDs[i] not in localAzEventIDs:
ocf.logger.info("monitor: remove finished local clusterEvent %s" % (clusterEventIDs[i]))
clusterEventIDs.pop(i)
touchedEventIDs = True
else:
i += 1
if len(clusterEventIDs) > 0:
# there are still pending events (either because we're still stopping, or because the event is still in place)
# either way, we need to wait
if touchedEventIDs:
ocf.logger.info("monitor: added new local clusterEvent %s" % str(clusterEventIDs))
self.node.setEventIDs(clusterEventIDs)
else:
ocf.logger.info("monitor: no local clusterEvents were updated")
else:
# there are no more pending events left after cleanup
if clusterHelper.noPendingResourcesOnNode(self.node.hostName):
# and no pending resources on the node -> set it back online
ocf.logger.info("monitor: all local events finished -> clean up, put node online and AVAILABLE")
curState = self.node.updateNodeStateAndEvents(AVAILABLE, None)
self.node.putNodeOnline()
clusterHelper.removeHoldFromNodes()
# If Azure Scheduled Events are not used for 24 hours (e.g. because the cluster was asleep), it will be disabled for a VM.
# When the cluster wakes up and starts using it again, the DocumentIncarnation is reset.
# We need to remove it during cleanup, otherwise azure-events-az will not process the event after wakeup
self.node.setAttr(attr_lastDocVersion, None)
else:
ocf.logger.info("monitor: all local events finished, but some resources have not completed startup yet -> wait")
else:
if curState == AVAILABLE:
if len(localAzEventIDs) > 0:
if clusterHelper.otherNodesAvailable(self.node):
ocf.logger.info("monitor: can handle local events %s -> set state STOPPING" % (str(localAzEventIDs)))
curState = self.node.updateNodeStateAndEvents(STOPPING, localAzEventIDs)
else:
ocf.logger.info("monitor: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(localAzEventIDs))
self.node.setState(ON_HOLD)
else:
ocf.logger.debug("monitor: no local azEvents to handle")
if curState == STOPPING:
eventIDsForNode = {}
if clusterHelper.noPendingResourcesOnNode(self.node.hostName):
if not self.node.isNodeInStandby():
ocf.logger.info("monitor: all local resources are started properly -> put node standby and exit")
self.node.putNodeStandby()
return ocf.OCF_SUCCESS
for e in localEvents:
ocf.logger.info("monitor: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources)))
# before we can force an event to start, we need to ensure all nodes involved have stopped their resources
if e.EventStatus == "Scheduled":
allNodesStopped = True
for azName in e.Resources:
hostName = clusterHelper.getHostNameFromAzName(azName)
state = self.node.getState(node=hostName)
if state == STOPPING:
# the only way we can continue is when node state is STOPPING, but all resources have been stopped
if not clusterHelper.allResourcesStoppedOnNode(hostName):
ocf.logger.info("monitor: (at least) node %s has still resources running -> wait" % hostName)
allNodesStopped = False
break
elif state in (AVAILABLE, IN_EVENT, ON_HOLD):
ocf.logger.info("monitor: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state)))
allNodesStopped = False
break
if allNodesStopped:
ocf.logger.info("monitor: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId))
for n in e.Resources:
hostName = clusterHelper.getHostNameFromAzName(n)
if hostName in eventIDsForNode:
eventIDsForNode[hostName].append(e.EventId)
else:
eventIDsForNode[hostName] = [e.EventId]
elif e.EventStatus == "Started":
ocf.logger.info("monitor: remote event already started")
# force the start of all events whose nodes are ready (i.e. have no more resources running)
if len(eventIDsForNode.keys()) > 0:
eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist])
ocf.logger.info("monitor: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce)))
for node, eventId in eventIDsForNode.items():
self.node.updateNodeStateAndEvents(IN_EVENT, eventId, node=node)
azHelper.forceEvents(eventIDsToForce)
self.node.setAttr(attr_lastDocVersion, curDocVersion)
else:
ocf.logger.info("monitor: some local resources are not clean yet -> wait")
ocf.logger.debug("monitor: finished")
return ocf.OCF_SUCCESS
##############################################################################
def setLoglevel(verbose):
# set up writing into syslog
loglevel = default_loglevel
if verbose:
opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1))
urllib2.install_opener(opener)
loglevel = ocf.logging.DEBUG
ocf.log.setLevel(loglevel)
description = (
"Microsoft Azure Scheduled Events monitoring agent",
"""This resource agent implements a monitor for scheduled
(maintenance) events for a Microsoft Azure VM.
If any relevant events are found, it moves all Pacemaker resources
away from the affected node to allow for a graceful shutdown.
Deployment:
crm configure primitive rsc_azure-events-az ocf:heartbeat:azure-events-az \
op monitor interval=10s
crm configure clone cln_azure-events-az rsc_azure-events-az
For further information on Microsoft Azure Scheduled Events, please
refer to the following documentation:
https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events
""")
def monitor_action(eventTypes):
relevantEventTypes = set(eventTypes.split(",") if eventTypes else [])
ra = raAzEvents(relevantEventTypes)
return ra.monitor()
def validate_action(eventTypes):
if eventTypes:
for event in eventTypes.split(","):
if event not in ("Freeze", "Reboot", "Redeploy"):
ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes)
return ocf.OCF_ERR_CONFIGURED
return ocf.OCF_SUCCESS
def main():
agent = ocf.Agent("azure-events-az", shortdesc=description[0], longdesc=description[1])
agent.add_parameter(
"eventTypes",
shortdesc="List of resources to be considered",
longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)",
content_type="string",
default="Reboot,Redeploy")
agent.add_parameter(
"verbose",
shortdesc="Enable verbose agent logging",
longdesc="Set to true to enable verbose logging",
content_type="boolean",
default="false")
agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
agent.add_action("validate-all", timeout=20, handler=validate_action)
agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action)
setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false")))
agent.run()
if __name__ == '__main__':
- main()
\ No newline at end of file
+ main()
diff --git a/heartbeat/azure-events.in b/heartbeat/azure-events.in
index 66e129060..5ad658df9 100644
--- a/heartbeat/azure-events.in
+++ b/heartbeat/azure-events.in
@@ -1,846 +1,846 @@
#!@PYTHON@ -tt
#
# Resource agent for monitoring Azure Scheduled Events
#
# License: GNU General Public License (GPL)
# (c) 2018 Tobias Niekamp, Microsoft Corp.
# and Linux-HA contributors
import os
import sys
import time
import subprocess
import json
try:
import urllib2
from urllib2 import URLError
except ImportError:
import urllib.request as urllib2
from urllib.error import URLError
import socket
from collections import defaultdict
OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT"))
sys.path.append(OCF_FUNCTIONS_DIR)
import ocf
##############################################################################
VERSION = "0.10"
USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro())
attr_globalPullState = "azure-events_globalPullState"
attr_lastDocVersion = "azure-events_lastDocVersion"
attr_curNodeState = "azure-events_curNodeState"
attr_pendingEventIDs = "azure-events_pendingEventIDs"
default_loglevel = ocf.logging.INFO
default_relevantEventTypes = set(["Reboot", "Redeploy"])
global_pullMaxAttempts = 3
global_pullDelaySecs = 1
##############################################################################
class attrDict(defaultdict):
"""
A wrapper for accessing dict keys like an attribute
"""
def __init__(self, data):
super(attrDict, self).__init__(attrDict)
for d in data.keys():
self.__setattr__(d, data[d])
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(key)
def __setattr__(self, key, value):
self[key] = value
##############################################################################
class azHelper:
"""
Helper class for Azure's metadata API (including Scheduled Events)
"""
metadata_host = "http://169.254.169.254/metadata"
instance_api = "instance"
events_api = "scheduledevents"
api_version = "2019-08-01"
@staticmethod
def _sendMetadataRequest(endpoint, postData=None):
"""
Send a request to Azure's Azure Metadata Service API
"""
url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version)
data = ""
ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData))
ocf.logger.debug("_sendMetadataRequest: url = %s" % url)
if postData and type(postData) != bytes:
postData = postData.encode()
req = urllib2.Request(url, postData)
req.add_header("Metadata", "true")
req.add_header("User-Agent", USER_AGENT)
try:
resp = urllib2.urlopen(req)
except URLError as e:
if hasattr(e, 'reason'):
ocf.logger.warning("Failed to reach the server: %s" % e.reason)
clusterHelper.setAttr(attr_globalPullState, "IDLE")
elif hasattr(e, 'code'):
ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code)
clusterHelper.setAttr(attr_globalPullState, "IDLE")
else:
data = resp.read()
ocf.logger.debug("_sendMetadataRequest: response = %s" % data)
if data:
data = json.loads(data)
ocf.logger.debug("_sendMetadataRequest: finished")
return data
@staticmethod
def getInstanceInfo():
"""
Fetch details about the current VM from Azure's Azure Metadata Service API
"""
ocf.logger.debug("getInstanceInfo: begin")
jsondata = azHelper._sendMetadataRequest(azHelper.instance_api)
ocf.logger.debug("getInstanceInfo: json = %s" % jsondata)
if jsondata:
ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"]))
return attrDict(jsondata["compute"])
else:
ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info")
sys.exit(ocf.OCF_ERR_GENERIC)
@staticmethod
def pullScheduledEvents():
"""
Retrieve all currently scheduled events via Azure Metadata Service API
"""
ocf.logger.debug("pullScheduledEvents: begin")
jsondata = azHelper._sendMetadataRequest(azHelper.events_api)
ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata)
ocf.logger.debug("pullScheduledEvents: finished")
return attrDict(jsondata)
@staticmethod
def forceEvents(eventIDs):
"""
Force a set of events to start immediately
"""
ocf.logger.debug("forceEvents: begin")
events = []
for e in eventIDs:
events.append({
"EventId": e,
})
postData = {
"StartRequests" : events
}
ocf.logger.info("forceEvents: postData = %s" % postData)
resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData))
ocf.logger.debug("forceEvents: finished")
return
##############################################################################
class clusterHelper:
"""
Helper functions for Pacemaker control via crm
"""
@staticmethod
def _getLocation(node):
"""
Helper function to retrieve local/global attributes
"""
if node:
return ["--node", node]
else:
return ["--type", "crm_config"]
@staticmethod
def _exec(command, *args):
"""
Helper function to execute a UNIX command
"""
args = list(args)
ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args)))
def flatten(*n):
return (str(e) for a in n
for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),)))
command = list(flatten([command] + args))
ocf.logger.debug("_exec: cmd = %s" % " ".join(command))
try:
ret = subprocess.check_output(command)
if type(ret) != str:
ret = ret.decode()
ocf.logger.debug("_exec: return = %s" % ret)
return ret.rstrip()
except Exception as err:
ocf.logger.exception(err)
return None
@staticmethod
def setAttr(key, value, node=None):
"""
Set the value of a specific global/local attribute in the Pacemaker cluster
"""
ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node))
if value:
ret = clusterHelper._exec("crm_attribute",
"--name", key,
"--update", value,
clusterHelper._getLocation(node))
else:
ret = clusterHelper._exec("crm_attribute",
"--name", key,
"--delete",
clusterHelper._getLocation(node))
ocf.logger.debug("setAttr: finished")
return len(ret) == 0
@staticmethod
def getAttr(key, node=None):
"""
Retrieve a global/local attribute from the Pacemaker cluster
"""
ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node))
val = clusterHelper._exec("crm_attribute",
"--name", key,
"--query", "--quiet",
"--default", "",
clusterHelper._getLocation(node))
ocf.logger.debug("getAttr: finished")
if not val:
return None
return val if not val.isdigit() else int(val)
@staticmethod
def getAllNodes():
"""
Get a list of hostnames for all nodes in the Pacemaker cluster
"""
ocf.logger.debug("getAllNodes: begin")
nodes = []
nodeList = clusterHelper._exec("crm_node", "--list")
for n in nodeList.split("\n"):
nodes.append(n.split()[1])
ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes))
return nodes
@staticmethod
def getHostNameFromAzName(azName):
"""
Helper function to get the actual host name from an Azure node name
"""
return clusterHelper.getAttr("hostName_%s" % azName)
@staticmethod
def removeHoldFromNodes():
"""
Remove the ON_HOLD state from all nodes in the Pacemaker cluster
"""
ocf.logger.debug("removeHoldFromNodes: begin")
for n in clusterHelper.getAllNodes():
if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD":
clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n)
ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n)
ocf.logger.debug("removeHoldFromNodes: finished")
return False
@staticmethod
def otherNodesAvailable(exceptNode):
"""
Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE
"""
ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode)
for n in clusterHelper.getAllNodes():
state = clusterHelper.getAttr(attr_curNodeState, node=n)
state = stringToNodeState(state) if state else AVAILABLE
if state == AVAILABLE and n != exceptNode.hostName:
ocf.logger.info("otherNodesAvailable: at least %s is available" % n)
ocf.logger.debug("otherNodesAvailable: finished")
return True
ocf.logger.info("otherNodesAvailable: no other nodes are available")
ocf.logger.debug("otherNodesAvailable: finished")
return False
@staticmethod
def transitionSummary():
"""
Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby)
"""
# <tniek> Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node?
# # crm_simulate -Ls
# Transition Summary:
# * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1)
# * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0)
# * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1)
# * Start rsc_nc_HN1_HDB03 (hsr3-db1)
# # Excepted result when there are no pending actions:
# Transition Summary:
ocf.logger.debug("transitionSummary: begin")
summary = clusterHelper._exec("crm_simulate", "-Ls")
if not summary:
ocf.logger.warning("transitionSummary: could not load transition summary")
- return False
+ return ""
if summary.find("Transition Summary:") < 0:
- ocf.logger.warning("transitionSummary: received unexpected transition summary: %s" % summary)
- return False
+ ocf.logger.debug("transitionSummary: no transactions: %s" % summary)
+ return ""
summary = summary.split("Transition Summary:")[1]
ret = summary.split("\n").pop(0)
ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret))
return ret
@staticmethod
def listOperationsOnNode(node):
"""
Get a list of all current operations for a given node (used to check if any resources are pending)
"""
# hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0
# rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete
# rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete
# rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending
# rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete
ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node)
resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node)
if len(resources) == 0:
ret = []
else:
ret = resources.split("\n")
ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret))
return ret
@staticmethod
def noPendingResourcesOnNode(node):
"""
Check that there are no pending resources on a given node
"""
ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node)
for r in clusterHelper.listOperationsOnNode(node):
ocf.logger.debug("noPendingResourcesOnNode: * %s" % r)
resource = r.split()[-1]
if resource == "pending":
ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource)
ocf.logger.debug("noPendingResourcesOnNode: finished; return = False")
return False
ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node)
ocf.logger.debug("noPendingResourcesOnNode: finished; return = True")
return True
@staticmethod
def allResourcesStoppedOnNode(node):
"""
Check that all resources on a given node are stopped
"""
ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node)
if clusterHelper.noPendingResourcesOnNode(node):
if len(clusterHelper.transitionSummary()) == 0:
ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node)
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True")
return True
ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty")
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
return False
ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node)
ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
return False
##############################################################################
AVAILABLE = 0 # Node is online and ready to handle events
STOPPING = 1 # Standby has been triggered, but some resources are still running
IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service
ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available
def stringToNodeState(name):
if type(name) == int: return name
if name == "STOPPING": return STOPPING
if name == "IN_EVENT": return IN_EVENT
if name == "ON_HOLD": return ON_HOLD
return AVAILABLE
def nodeStateToString(state):
if state == STOPPING: return "STOPPING"
if state == IN_EVENT: return "IN_EVENT"
if state == ON_HOLD: return "ON_HOLD"
return "AVAILABLE"
##############################################################################
class Node:
"""
Core class implementing logic for a cluster node
"""
def __init__(self, ra):
self.raOwner = ra
self.azInfo = azHelper.getInstanceInfo()
self.azName = self.azInfo.name
self.hostName = socket.gethostname()
self.setAttr("azName", self.azName)
clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName)
def getAttr(self, key):
"""
Get a local attribute
"""
return clusterHelper.getAttr(key, node=self.hostName)
def setAttr(self, key, value):
"""
Set a local attribute
"""
return clusterHelper.setAttr(key, value, node=self.hostName)
def selfOrOtherNode(self, node):
"""
Helper function to distinguish self/other node
"""
return node if node else self.hostName
def setState(self, state, node=None):
"""
Set the state for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state)))
clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node)
ocf.logger.debug("setState: finished")
def getState(self, node=None):
"""
Get the state for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("getState: begin; node = %s" % node)
state = clusterHelper.getAttr(attr_curNodeState, node=node)
ocf.logger.debug("getState: state = %s" % state)
ocf.logger.debug("getState: finished")
if not state:
return AVAILABLE
return stringToNodeState(state)
def setEventIDs(self, eventIDs, node=None):
"""
Set pending EventIDs for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs)))
if eventIDs:
eventIDStr = ",".join(eventIDs)
else:
eventIDStr = None
clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node)
ocf.logger.debug("setEventIDs: finished")
return
def getEventIDs(self, node=None):
"""
Get pending EventIDs for a given node (or self)
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("getEventIDs: begin; node = %s" % node)
eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node)
if eventIDStr:
eventIDs = eventIDStr.split(",")
else:
eventIDs = None
ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs))
return eventIDs
def updateNodeStateAndEvents(self, state, eventIDs, node=None):
"""
Set the state and pending EventIDs for a given node (or self)
"""
ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs)))
self.setState(state, node=node)
self.setEventIDs(eventIDs, node=node)
ocf.logger.debug("updateNodeStateAndEvents: finished")
return state
def putNodeStandby(self, node=None):
"""
Put self to standby
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("putNodeStandby: begin; node = %s" % node)
clusterHelper._exec("crm_attribute",
"-t", "nodes",
"-N", node,
"-n", "standby",
"-v", "on",
"--lifetime=forever")
ocf.logger.debug("putNodeStandby: finished")
def putNodeOnline(self, node=None):
"""
Put self back online
"""
node = self.selfOrOtherNode(node)
ocf.logger.debug("putNodeOnline: begin; node = %s" % node)
clusterHelper._exec("crm_attribute",
"-t", "nodes",
"-N", node,
"-n", "standby",
"-v", "off",
"--lifetime=forever")
ocf.logger.debug("putNodeOnline: finished")
def separateEvents(self, events):
"""
Split own/other nodes' events
"""
ocf.logger.debug("separateEvents: begin; events = %s" % str(events))
localEvents = []
remoteEvents = []
for e in events:
e = attrDict(e)
if e.EventType not in self.raOwner.relevantEventTypes:
continue
if self.azName in e.Resources:
localEvents.append(e)
else:
remoteEvents.append(e)
ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents)))
return (localEvents, remoteEvents)
def removeOrphanedEvents(self, azEvents):
"""
Remove remote events that are already finished
"""
ocf.logger.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents))
azEventIDs = set()
for e in azEvents:
azEventIDs.add(e.EventId)
# for all nodes except self ...
for n in clusterHelper.getAllNodes():
if n == self.hostName:
continue
curState = self.getState(node=n)
# ... that still show in an event or shutting down resources ...
if curState in (STOPPING, IN_EVENT):
ocf.logger.info("removeOrphanedEvents: node %s has state %s" % (n, curState))
clusterEventIDs = self.getEventIDs(node=n)
stillActive = False
# ... but don't have any more events running according to Azure, ...
for p in clusterEventIDs:
if p in azEventIDs:
ocf.logger.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n))
stillActive = True
break
if not stillActive:
# ... put them back online.
ocf.logger.info("removeOrphanedEvents: clusterEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(clusterEventIDs), n, str(azEventIDs)))
self.putNodeOnline(node=n)
ocf.logger.debug("removeOrphanedEvents: finished")
def handleRemoteEvents(self, azEvents):
"""
Handle a list of events (as provided by Azure Metadata Service) for other nodes
"""
ocf.logger.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents)))
if len(azEvents) == 0:
ocf.logger.debug("handleRemoteEvents: no remote events to handle")
ocf.logger.debug("handleRemoteEvents: finished")
return
eventIDsForNode = {}
# iterate through all current events as per Azure
for e in azEvents:
ocf.logger.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources)))
# before we can force an event to start, we need to ensure all nodes involved have stopped their resources
if e.EventStatus == "Scheduled":
allNodesStopped = True
for azName in e.Resources:
hostName = clusterHelper.getHostNameFromAzName(azName)
state = self.getState(node=hostName)
if state == STOPPING:
# the only way we can continue is when node state is STOPPING, but all resources have been stopped
if not clusterHelper.allResourcesStoppedOnNode(hostName):
ocf.logger.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName)
allNodesStopped = False
break
elif state in (AVAILABLE, IN_EVENT, ON_HOLD):
ocf.logger.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state)))
allNodesStopped = False
break
if allNodesStopped:
ocf.logger.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId))
for n in e.Resources:
hostName = clusterHelper.getHostNameFromAzName(n)
if hostName in eventIDsForNode:
eventIDsForNode[hostName].append(e.EventId)
else:
eventIDsForNode[hostName] = [e.EventId]
elif e.EventStatus == "Started":
ocf.logger.info("handleRemoteEvents: remote event already started")
# force the start of all events whose nodes are ready (i.e. have no more resources running)
if len(eventIDsForNode.keys()) > 0:
eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist])
ocf.logger.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce)))
for node, eventId in eventIDsForNode.items():
self.updateNodeStateAndEvents(IN_EVENT, eventId, node=node)
azHelper.forceEvents(eventIDsToForce)
ocf.logger.debug("handleRemoteEvents: finished")
def handleLocalEvents(self, azEvents):
"""
Handle a list of own events (as provided by Azure Metadata Service)
"""
ocf.logger.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents)))
azEventIDs = set()
for e in azEvents:
azEventIDs.add(e.EventId)
curState = self.getState()
clusterEventIDs = self.getEventIDs()
mayUpdateDocVersion = False
ocf.logger.info("handleLocalEvents: current state = %s; pending local clusterEvents = %s" % (nodeStateToString(curState), str(clusterEventIDs)))
# check if there are currently/still events set for the node
if clusterEventIDs:
# there are pending events set, so our state must be STOPPING or IN_EVENT
i = 0; touchedEventIDs = False
while i < len(clusterEventIDs):
# clean up pending events that are already finished according to AZ
if clusterEventIDs[i] not in azEventIDs:
ocf.logger.info("handleLocalEvents: remove finished local clusterEvent %s" % (clusterEventIDs[i]))
clusterEventIDs.pop(i)
touchedEventIDs = True
else:
i += 1
if len(clusterEventIDs) > 0:
# there are still pending events (either because we're still stopping, or because the event is still in place)
# either way, we need to wait
if touchedEventIDs:
ocf.logger.info("handleLocalEvents: added new local clusterEvent %s" % str(clusterEventIDs))
self.setEventIDs(clusterEventIDs)
else:
ocf.logger.info("handleLocalEvents: no local clusterEvents were updated")
else:
# there are no more pending events left after cleanup
if clusterHelper.noPendingResourcesOnNode(self.hostName):
# and no pending resources on the node -> set it back online
ocf.logger.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE")
curState = self.updateNodeStateAndEvents(AVAILABLE, None)
self.putNodeOnline()
clusterHelper.removeHoldFromNodes()
# repeat handleLocalEvents() since we changed status to AVAILABLE
else:
ocf.logger.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait")
else:
# there are no pending events set for us (yet)
if curState == AVAILABLE:
if len(azEventIDs) > 0:
if clusterHelper.otherNodesAvailable(self):
ocf.logger.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs)))
# this will also set mayUpdateDocVersion = True
curState = self.updateNodeStateAndEvents(STOPPING, azEventIDs)
else:
ocf.logger.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs))
self.setState(ON_HOLD)
else:
ocf.logger.debug("handleLocalEvents: no local azEvents to handle")
if curState == STOPPING:
if clusterHelper.noPendingResourcesOnNode(self.hostName):
ocf.logger.info("handleLocalEvents: all local resources are started properly -> put node standby")
self.putNodeStandby()
mayUpdateDocVersion = True
else:
ocf.logger.info("handleLocalEvents: some local resources are not clean yet -> wait")
ocf.logger.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion))
return mayUpdateDocVersion
##############################################################################
class raAzEvents:
"""
Main class for resource agent
"""
def __init__(self, relevantEventTypes):
self.node = Node(self)
self.relevantEventTypes = relevantEventTypes
def monitor(self):
ocf.logger.debug("monitor: begin")
pullFailedAttemps = 0
while True:
# check if another node is pulling at the same time;
# this should only be a concern for the first pull, as setting up Scheduled Events may take up to 2 minutes.
if clusterHelper.getAttr(attr_globalPullState) == "PULLING":
pullFailedAttemps += 1
if pullFailedAttemps == global_pullMaxAttempts:
ocf.logger.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts)
ocf.logger.debug("monitor: finished")
return ocf.OCF_SUCCESS
else:
ocf.logger.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs)
time.sleep(global_pullDelaySecs)
continue
# we can pull safely from Azure Metadata Service
clusterHelper.setAttr(attr_globalPullState, "PULLING")
events = azHelper.pullScheduledEvents()
clusterHelper.setAttr(attr_globalPullState, "IDLE")
# get current document version
curDocVersion = events.DocumentIncarnation
lastDocVersion = self.node.getAttr(attr_lastDocVersion)
ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion))
# split events local/remote
(localEvents, remoteEvents) = self.node.separateEvents(events.Events)
# ensure local events are only executing once
if curDocVersion != lastDocVersion:
ocf.logger.debug("monitor: curDocVersion has not been handled yet")
# handleLocalEvents() returns True if mayUpdateDocVersion is True;
# this is only the case if we can ensure there are no pending events
if self.node.handleLocalEvents(localEvents):
ocf.logger.info("monitor: handleLocalEvents completed successfully -> update curDocVersion")
self.node.setAttr(attr_lastDocVersion, curDocVersion)
else:
ocf.logger.debug("monitor: handleLocalEvents still waiting -> keep curDocVersion")
else:
ocf.logger.info("monitor: already handled curDocVersion, skip")
# remove orphaned remote events and then handle the remaining remote events
self.node.removeOrphanedEvents(remoteEvents)
self.node.handleRemoteEvents(remoteEvents)
break
ocf.logger.debug("monitor: finished")
return ocf.OCF_SUCCESS
##############################################################################
def setLoglevel(verbose):
# set up writing into syslog
loglevel = default_loglevel
if verbose:
opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1))
urllib2.install_opener(opener)
loglevel = ocf.logging.DEBUG
ocf.log.setLevel(loglevel)
description = (
"Microsoft Azure Scheduled Events monitoring agent",
"""This resource agent implements a monitor for scheduled
(maintenance) events for a Microsoft Azure VM.
If any relevant events are found, it moves all Pacemaker resources
away from the affected node to allow for a graceful shutdown.
Usage:
[OCF_RESKEY_eventTypes=VAL] [OCF_RESKEY_verbose=VAL] azure-events ACTION
action (required): Supported values: monitor, help, meta-data
eventTypes (optional): List of event types to be considered
relevant by the resource agent (comma-separated).
Supported values: Freeze,Reboot,Redeploy
Default = Reboot,Redeploy
/ verbose (optional): If set to true, displays debug info.
Default = false
Deployment:
crm configure primitive rsc_azure-events ocf:heartbeat:azure-events \
op monitor interval=10s
crm configure clone cln_azure-events rsc_azure-events
For further information on Microsoft Azure Scheduled Events, please
refer to the following documentation:
https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events
""")
def monitor_action(eventTypes):
relevantEventTypes = set(eventTypes.split(",") if eventTypes else [])
ra = raAzEvents(relevantEventTypes)
return ra.monitor()
def validate_action(eventTypes):
if eventTypes:
for event in eventTypes.split(","):
if event not in ("Freeze", "Reboot", "Redeploy"):
ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes)
return ocf.OCF_ERR_CONFIGURED
return ocf.OCF_SUCCESS
def main():
agent = ocf.Agent("azure-events", shortdesc=description[0], longdesc=description[1])
agent.add_parameter(
"eventTypes",
shortdesc="List of resources to be considered",
longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)",
content_type="string",
default="Reboot,Redeploy")
agent.add_parameter(
"verbose",
shortdesc="Enable verbose agent logging",
longdesc="Set to true to enable verbose logging",
content_type="boolean",
default="false")
agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
agent.add_action("validate-all", timeout=20, handler=validate_action)
agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action)
setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false")))
agent.run()
if __name__ == '__main__':
main()

File Metadata

Mime Type
text/x-diff
Expires
Tue, Feb 25, 8:25 AM (1 d, 13 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1455738
Default Alt Text
(57 KB)

Event Timeline