diff --git a/heartbeat/gcp-pd-move.in b/heartbeat/gcp-pd-move.in index f82bd25e5..e99cc71f8 100644 --- a/heartbeat/gcp-pd-move.in +++ b/heartbeat/gcp-pd-move.in @@ -1,376 +1,382 @@ #!@PYTHON@ -tt # - *- coding: utf- 8 - *- # # --------------------------------------------------------------------- # Copyright 2018 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # --------------------------------------------------------------------- # Description: Google Cloud Platform - Disk attach # --------------------------------------------------------------------- import json import logging import os import re import sys import time OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT")) sys.path.append(OCF_FUNCTIONS_DIR) import ocf from ocf import logger try: import googleapiclient.discovery except ImportError: pass if sys.version_info >= (3, 0): # Python 3 imports. import urllib.parse as urlparse import urllib.request as urlrequest else: # Python 2 imports. import urllib as urlparse import urllib2 as urlrequest CONN = None PROJECT = None ZONE = None +REGION = None LIST_DISK_ATTACHED_INSTANCES = None INSTANCE_NAME = None PARAMETERS = { 'disk_name': '', 'disk_scope': 'detect', 'disk_csek_file': '', 'mode': "READ_WRITE", 'device_name': '', 'stackdriver_logging': 'no', } MANDATORY_PARAMETERS = ['disk_name', 'disk_scope'] METADATA_SERVER = 'http://metadata.google.internal/computeMetadata/v1/' METADATA_HEADERS = {'Metadata-Flavor': 'Google'} METADATA = ''' 1.0 Resource Agent that can attach or detach a regional/zonal disk on current GCP instance. Requirements : - Disk has to be properly created as regional/zonal in order to be used correctly. Attach/Detach a persistent disk on current GCP instance The name of the GCP disk. Disk name Disk scope Network name Path to a Customer-Supplied Encryption Key (CSEK) key file Customer-Supplied Encryption Key file Attachment mode (READ_WRITE, READ_ONLY) Attachment mode An optional name that indicates the disk name the guest operating system will see. Optional device name Use stackdriver_logging output to global resource (yes, true, enabled) Use stackdriver_logging '''.format(PARAMETERS['disk_name'], PARAMETERS['disk_scope'], PARAMETERS['disk_csek_file'], PARAMETERS['mode'], PARAMETERS['device_name'], PARAMETERS['stackdriver_logging']) def get_metadata(metadata_key, params=None, timeout=None): """Performs a GET request with the metadata headers. Args: metadata_key: string, the metadata to perform a GET request on. params: dictionary, the query parameters in the GET request. timeout: int, timeout in seconds for metadata requests. Returns: HTTP response from the GET request. Raises: urlerror.HTTPError: raises when the GET request fails. """ timeout = timeout or 60 metadata_url = os.path.join(METADATA_SERVER, metadata_key) params = urlparse.urlencode(params or {}) url = '%s?%s' % (metadata_url, params) request = urlrequest.Request(url, headers=METADATA_HEADERS) request_opener = urlrequest.build_opener(urlrequest.ProxyHandler({})) return request_opener.open(request, timeout=timeout * 1.1).read().decode("utf-8") def populate_vars(): global CONN global INSTANCE_NAME global PROJECT global ZONE + global REGION global LIST_DISK_ATTACHED_INSTANCES # Populate global vars try: CONN = googleapiclient.discovery.build('compute', 'v1') except Exception as e: logger.error('Couldn\'t connect with google api: ' + str(e)) sys.exit(ocf.OCF_ERR_CONFIGURED) for param in PARAMETERS: value = os.environ.get('OCF_RESKEY_%s' % param, PARAMETERS[param]) if not value and param in MANDATORY_PARAMETERS: logger.error('Missing %s mandatory parameter' % param) sys.exit(ocf.OCF_ERR_CONFIGURED) elif value: PARAMETERS[param] = value try: INSTANCE_NAME = get_metadata('instance/name') except Exception as e: logger.error( 'Couldn\'t get instance name, is this running inside GCE?: ' + str(e)) sys.exit(ocf.OCF_ERR_CONFIGURED) PROJECT = get_metadata('project/project-id') if PARAMETERS['disk_scope'] in ['detect', 'regional']: ZONE = get_metadata('instance/zone').split('/')[-1] + REGION = ZONE[:-2] else: ZONE = PARAMETERS['disk_scope'] LIST_DISK_ATTACHED_INSTANCES = get_disk_attached_instances( PARAMETERS['disk_name']) def configure_logs(): # Prepare logging global logger logging.getLogger('googleapiclient').setLevel(logging.WARN) logging_env = os.environ.get('OCF_RESKEY_stackdriver_logging') if logging_env: logging_env = logging_env.lower() if any(x in logging_env for x in ['yes', 'true', 'enabled']): try: import google.cloud.logging.handlers client = google.cloud.logging.Client() handler = google.cloud.logging.handlers.CloudLoggingHandler( client, name=INSTANCE_NAME) handler.setLevel(logging.INFO) formatter = logging.Formatter('gcp:alias "%(message)s"') handler.setFormatter(formatter) ocf.log.addHandler(handler) logger = logging.LoggerAdapter( ocf.log, {'OCF_RESOURCE_INSTANCE': ocf.OCF_RESOURCE_INSTANCE}) except ImportError: logger.error('Couldn\'t import google.cloud.logging, ' 'disabling Stackdriver-logging support') def wait_for_operation(operation): while True: result = CONN.zoneOperations().get( project=PROJECT, zone=ZONE, operation=operation['name']).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) return time.sleep(1) def get_disk_attached_instances(disk): def get_users_list(): fl = 'name="%s"' % disk request = CONN.disks().aggregatedList(project=PROJECT, filter=fl) while request is not None: response = request.execute() locations = response.get('items', {}) for location in locations.values(): for d in location.get('disks', []): if d['name'] == disk: return d.get('users', []) request = CONN.instances().aggregatedList_next( previous_request=request, previous_response=response) raise Exception("Unable to find disk %s" % disk) def get_only_instance_name(user): return re.sub('.*/instances/', '', user) return map(get_only_instance_name, get_users_list()) def is_disk_attached(instance): return instance in LIST_DISK_ATTACHED_INSTANCES def detach_disk(instance, disk_name): # Python API misses disk-scope argument. # Detaching a disk is only possible by using deviceName, which is retrieved # as a disk parameter when listing the instance information request = CONN.instances().get( project=PROJECT, zone=ZONE, instance=instance) response = request.execute() device_name = None for disk in response['disks']: - if disk_name in disk['source']: + if disk_name == re.sub('.*disks/',"",disk['source']): device_name = disk['deviceName'] break if not device_name: logger.error("Didn't find %(d)s deviceName attached to %(i)s" % { 'd': disk_name, 'i': instance, }) return request = CONN.instances().detachDisk( project=PROJECT, zone=ZONE, instance=instance, deviceName=device_name) wait_for_operation(request.execute()) def attach_disk(instance, disk_name): location = 'zones/%s' % ZONE + if PARAMETERS['disk_scope'] == 'regional': + location = 'regions/%s' % REGION + prefix = 'https://www.googleapis.com/compute/v1' body = { 'source': '%(prefix)s/projects/%(project)s/%(location)s/disks/%(disk)s' % { 'prefix': prefix, 'project': PROJECT, 'location': location, 'disk': disk_name, }, } # Customer-Supplied Encryption Key (CSEK) if PARAMETERS['disk_csek_file']: with open(PARAMETERS['disk_csek_file']) as csek_file: body['diskEncryptionKey'] = { 'rawKey': csek_file.read(), } if PARAMETERS['device_name']: body['deviceName'] = PARAMETERS['device_name'] if PARAMETERS['mode']: body['mode'] = PARAMETERS['mode'] force_attach = None if PARAMETERS['disk_scope'] == 'regional': # Python API misses disk-scope argument. force_attach = True else: # If this disk is attached to some instance, detach it first. for other_instance in LIST_DISK_ATTACHED_INSTANCES: logger.info("Detaching disk %(disk_name)s from other instance %(i)s" % { 'disk_name': PARAMETERS['disk_name'], 'i': other_instance, }) detach_disk(other_instance, PARAMETERS['disk_name']) request = CONN.instances().attachDisk( project=PROJECT, zone=ZONE, instance=instance, body=body, forceAttach=force_attach) wait_for_operation(request.execute()) def fetch_data(): configure_logs() populate_vars() def gcp_pd_move_start(): fetch_data() if not is_disk_attached(INSTANCE_NAME): logger.info("Attaching disk %(disk_name)s to %(instance)s" % { 'disk_name': PARAMETERS['disk_name'], 'instance': INSTANCE_NAME, }) attach_disk(INSTANCE_NAME, PARAMETERS['disk_name']) def gcp_pd_move_stop(): fetch_data() if is_disk_attached(INSTANCE_NAME): logger.info("Detaching disk %(disk_name)s to %(instance)s" % { 'disk_name': PARAMETERS['disk_name'], 'instance': INSTANCE_NAME, }) detach_disk(INSTANCE_NAME, PARAMETERS['disk_name']) def gcp_pd_move_status(): fetch_data() if is_disk_attached(INSTANCE_NAME): logger.debug("Disk %(disk_name)s is correctly attached to %(instance)s" % { 'disk_name': PARAMETERS['disk_name'], 'instance': INSTANCE_NAME, }) else: sys.exit(ocf.OCF_NOT_RUNNING) def main(): if len(sys.argv) < 2: logger.error('Missing argument') return command = sys.argv[1] if 'meta-data' in command: print(METADATA) return if command in 'start': gcp_pd_move_start() elif command in 'stop': gcp_pd_move_stop() elif command in ('monitor', 'status'): gcp_pd_move_status() else: configure_logs() logger.error('no such function %s' % str(command)) if __name__ == "__main__": main()