import requests import json import warnings from typing import Union import copy from datetime import datetime from dateutil.relativedelta import relativedelta import os import math import time from collections import OrderedDict import re warnings.filterwarnings('ignore') class vropsAPI(object): """Module for the vropsAPI for Capacity Management: Author: Daniel Tomlinson Team: Capacity & Monitoring Date: October 2019 VROPS documentation: https://sc1prapvro01/suite-api/docs/rest/ index.html StatKeys for VMS: https://docs.vmware.com/en/vRealize-Operations-Manager/6.7/com.vmware.vcom.metrics.doc/GUID-1322F5A4-DA1D-481F-BBEA-99B228E96AF2.html Usage ----- Authenticate a session: vrops = vropsAPI.authenticate( 'https://sc1prapvro01/', 'username', 'authSource', 'password', verify=False, ) Get all clusters: Query VROPS for available clusters: vrops.getClusters() vrops.getClusterIdentifiers() Get dict of all cluster IDs and cluster names: allClustersDict = vrops.allClusters Get list of all cluster names: allClustersList = vrops.getList(vrops.allClusters) Get all hosts: Query VROPS for available hosts: From a single cluster: vrops.getHostsFromCluster(cluster='SC1PRCONTXWHCUXCCL01') vrops.getHostIdentifiers() From a list of clusters: Get dict of all host IDs and host names: allHostsDisct =vrops.allHosts Get list of host names: allHostsList = vrops.getList(vrops.allHosts) Get all VMs: Query VROPS for available VMs: For a single host: vrops.getVMSFromHost('sc1hsesx148.prod.williamhill.plc') vrops.getVMSIdentifiers() For a list of hosts: vrops.getVMSFromHost(allHostsList) vrops.getVMSIdentifiers() Get dict of all VM IDs and VM names: allVMSDict = vrops.allVMS Get list of all VMs: allVMSList = vrops.getList(vrops.allVMS) Get epoch time relative to another time: Similar to Splunks relative_time command: 1. Can go back N hours/minutes etc. 2. Can set the hour/minute etc. to a specified value (snapping) vrops.epochRelativeTime(epochTime, **kwargs) **kwargs: epochTime: int - start time year: int = datetime.now().year # set year month: int = datetime.now().month # set month day: int = datetime.now().day # set day hour: int = datetime.now().hour # set hour minute: int = datetime.now().minute # set minute second: int = datetime.now().second # set second years: int = 0 # go back/forward N years months: int = 0 # go back/forward N months days: int = 0 # go back/forward N days hours: int = 0 # go back/forward N hours minutes: int = 0 # go back/forward N minutes seconds: int = 0 # go back/forward N seconds Usage: Get epoch 5 minutes ago: vrops.epochRelativeTime(vrops.epochNow, minutes=-5) Get epoch at start of current hour: vrops.epochRelativeTime( vrops.epochNow, hour=0, minute=0, second=0, ) Get epoch 1 week ago at start of day: vrops.epochRelativeTime( vrops.epochNow, days=-7 hour=0, minute=0, second=0, ) Get stats from VMs: Pull back results: Last 30 minutes, 5 minute intervals, average for CPU average and ready %: vrops.getStatsFromVMS( begin=vrops.epochRelativeTime(vrops.epochNow, minutes=-30), end=vrops.epochNow, intervalType='MINUTES', intervalQuantifier='5', rollUpType='AVG', resourceId=list(vrops.allVMS.values()), statKey=['cpu|usage_average', 'cpu|readyPct'], ) Attributes: intervalType: (see https://sc1prapvro01/suite-api/docs/rest/ models.html#repr-1190589417) intervalQuantifier: int rollUpType: (see: https://sc1prapvro01/suite-api/ docs/rest/models.html#repr-1735704374) resourceId: string or list of vrops resourceIds (not names) statKey: vrops api metrics (see https://docs.vmware.com/en/ vRealize-Operations-Manager/6.7/com.vmware.vcom.metrics.doc/ GUID-1322F5A4-DA1D-481F-BBEA-99B228E96AF2.html) Print results: for i in range(0, vrops.totalVMS): print(vrops.vmsResources['values'][i]) Save to disk as json: vrops.saveToDisk(vrops.vmsResources) Attributes ---------- vropsURL: str URL of the VROPS instance "https://sc1prapvro01/" """ defaultHeaders = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Cache-Control': 'no-cache', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'cache-control': 'no-cache', } def __init__(self, vropsURL: str, authToken: str, verify: bool = True): super(vropsAPI, self).__init__() self.vropsURL = vropsURL self.authToken = authToken self.verify = verify self.headers = vropsAPI.defaultHeaders self.headers['Authorization'] = f'vRealizeOpsToken {self.authToken}' @classmethod def authenticate( cls, vropsURL: str, username: str, authSource: str, password: str, verify: bool = True, ): vropsURLauth = vropsAPI.getVropsURL(vropsURL, 'authenticationURL') payload = {} for key, value in zip( ['username', 'authSource', 'password'], [username, authSource, password], ): payload[key] = value authToken = vropsAPI.getAuthenticationToken( vropsURLauth, payload, verify ) return cls(vropsURL, authToken, verify) @staticmethod def getVropsURL(vropsURL: str, endpointKey: str) -> str: endpoints = { 'authenticationURL': 'suite-api/api/auth/token/acquire', 'resourcesURL': 'suite-api/api/resources', 'statsURL': 'suite-api/api/resources/stats/query', } if endpoints[endpointKey] not in vropsURL: if vropsURL[-1] != '/': vropsURL = vropsURL + '/' vropsURL = vropsURL + endpoints[endpointKey] else: vropsURL = vropsURL + endpoints[endpointKey] return vropsURL @staticmethod def pythonToJSON(pythonObject: any, indent=4) -> str: return json.dumps(pythonObject, indent=indent) @staticmethod def jsonToPython(jsonObject: str) -> any: return json.loads(jsonObject) @staticmethod def getAuthenticationToken( vropsURL: str, payload: dict, verify=True ) -> str: payload = vropsAPI.pythonToJSON(payload) vropsURL = vropsAPI.getVropsURL(vropsURL, 'authenticationURL') response = requests.request( 'POST', vropsURL, data=payload, headers=vropsAPI.defaultHeaders, verify=verify, ) print(response) authToken = vropsAPI.jsonToPython(response.text)['token'] return authToken @staticmethod def getIdentifiers( identifierDict: dict, vropsJSON: dict, length: int, resourceKindKey: str, ) -> dict: for i in range(0, length): if ( vropsJSON['resourceList'][i]['resourceKey']['resourceKindKey'] == resourceKindKey ): identifierDict[ vropsJSON['resourceList'][i]['resourceKey']['name'] ] = vropsJSON['resourceList'][i]['identifier'] else: pass return identifierDict @staticmethod def getKeysList(pythonDict: dict) -> list: pythonList = [] for i in pythonDict.keys(): pythonList.append(i) return pythonList @staticmethod def getValuesList(pythonDict: dict) -> list: pythonList = [] for i in pythonDict.values(): pythonList.append(i) return pythonList @staticmethod def epochRelativeTime( epochTime: int, year: int = datetime.now().year, month: int = datetime.now().month, day: int = datetime.now().day, hour: int = datetime.now().hour, minute: int = datetime.now().minute, second: int = datetime.now().second, years: int = 0, months: int = 0, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, ) -> int: delta = relativedelta( year=year, month=month, day=day, hour=hour, minute=minute, second=second, years=years, months=months, days=days, hours=hours, minutes=minutes, seconds=seconds, ) if len(str(epochTime)) >= 12: epochTime /= 1000 relativeTime = datetime.fromtimestamp(epochTime) + delta relativeTime = math.ceil(relativeTime.timestamp() * 1000) return relativeTime def getClusters(self): queryString = {'resourceKind': 'ClusterComputeResource'} vropsURL = vropsAPI.getVropsURL(self.vropsURL, 'resourcesURL') response = requests.request( "GET", vropsURL, headers=self.headers, params=queryString, verify=self.verify, ) print(response) self._clusterResources = vropsAPI.jsonToPython(response.text) return self def getClusterIdentifiers(self): self.totalClusters = len(self._clusterResources['resourceList']) self.allClusters = {} self.getIdentifiers( self.allClusters, self._clusterResources, self.totalClusters, 'ClusterComputeResource', ) return self def getHostsFromCluster(self, cluster='SC1PRCONTXWHCUXCCL01'): self.cluster = cluster url = ( f'{self.vropsURL}/suite-api/api/resources/' f'{self.allClusters[cluster]}/relationships' ) self.chosenCluster = cluster response = requests.request( "GET", url, headers=self.headers, verify=self.verify ) print(response) self._hostResources = vropsAPI.jsonToPython(response.text) return self def getHostIdentifiers(self): self.totalHosts = len(self._hostResources['resourceList']) self.allHosts = {} self.getIdentifiers( self.allHosts, self._hostResources, self.totalHosts, 'HostSystem' ) return self def getVMSFromHost(self, host: Union[str, list]): if isinstance(host, list): self.vmType = 'list' self._vmsResourcesRelationships = [] self.urlList = [] response = [] for item in host: self.urlList.append( ( f'{self.vropsURL}suite-api/api/resources/' f'{self.allHosts[item]}/relationships' ) ) for urlItem in self.urlList: response.append( requests.request( 'GET', urlItem, headers=self.headers, verify=self.verify, ) ) print(response) for i in range(0, len(response)): self._vmsResourcesRelationships.append( vropsAPI.jsonToPython(response[i].text) ) if isinstance(host, str): hostToList = [] hostToList.append(host) print(hostToList) return self.getVMSFromHost(host=hostToList) return self def _getHostInformation(self, i: int, j: int): pattern = r'(?:.*resources\/)([^\/]+)' vm = self._vmsResourcesRelationships[i]['resourceList'][j][ 'resourceKey' ]['name'] host = self.urlList[i] match = re.findall(pattern, host) for key, value in self.allHosts.items(): if match[0] == value: self.VMSHostsNames[vm] = key return self def getVMSIdentifiers(self): self.VMSHostsNames = {} self.allVMS = OrderedDict() if self.vmType == 'list': self.countVMS = [] self.countVMSFiltered = [] for i in range(0, len(self._vmsResourcesRelationships)): counter = 0 for j in range( 0, len(self._vmsResourcesRelationships[i]['resourceList']) ): if ( self._vmsResourcesRelationships[i]['resourceList'][j][ 'resourceKey' ]['resourceKindKey'] ) == 'VirtualMachine': counter += 1 self._getHostInformation(i, j) self.countVMS.append( len(self._vmsResourcesRelationships[i]['resourceList']) ) self.countVMSFiltered.append(counter) for i in range(0, len(self._vmsResourcesRelationships)): self.getIdentifiers( self.allVMS, self._vmsResourcesRelationships[i], self.countVMS[i], 'VirtualMachine', ) if self.vmType == 'string': counter = 0 self.countVMS = len( self._vmsResourcesRelationships['resourceList'] ) for j in range(0, self.countVMS): if ( self._vmsResourcesRelationships['resourceList'][j][ 'resourceKey' ]['resourceKindKey'] ) == 'VirtualMachine': counter += 1 self.countVMSFiltered = counter self.getIdentifiers( self.allVMS, self._vmsResourcesRelationships, self.countVMS, 'VirtualMachine', ) return self def getStatsFromVMS( self, begin: int, end: int, intervalType: str, intervalQuantifier: str, rollUpType: str, resourceId: list, statKey: Union[str, list], ): argList = copy.deepcopy(locals()) del argList['self'] vropsURL = self.getVropsURL(self.vropsURL, 'statsURL') payload = self.pythonToJSON(argList, indent=0) response = requests.request( 'POST', vropsURL, headers=self.headers, data=payload, verify=self.verify, ) # print(response.text) # raise Exception self._vmsResources = OrderedDict(self.jsonToPython(response.text)) for key, value in self.allVMS.items(): for i in range(0, len(self._vmsResources['values'])): if self._vmsResources['values'][i]['resourceId'] == value: self._vmsResources['values'][i] = OrderedDict( self._vmsResources['values'][i] ) self._vmsResources['values'][i]['name'] = key self._vmsResources['values'][i][ 'host' ] = self.VMSHostsNames[key] self._vmsResources['values'][i][ 'cluster' ] = self.chosenCluster for item in ['cluster', 'host', 'name']: self._vmsResources['values'][i].move_to_end( item, last=False ) return self @staticmethod def saveToDisk( pythonObject: any, path: str = os.getcwd(), filePrefix: str = '', type: str = 'json', indent: int = 4, breakLine: bool = False, ) -> None: timeNow = datetime.now().strftime('%d-%m-%Y_%H-%M-%S') fileName = f'{path}/{filePrefix}-{timeNow}.json' if breakLine: if not isinstance(pythonObject, list): raise TypeError( 'You must pass a list when using' ' breakLine=True' ) else: with open(fileName, 'a+') as outputFile: try: outputFile.write( json.dump( pythonObject, outputFile, indent=indent ) ) except TypeError: pass else: with open(fileName, 'w+') as outputFile: json.dump(pythonObject, outputFile, indent=indent) def exportVMData(self): self.export = [] loopLength = len(self._vmsResources['values']) for i in range(0, loopLength): statKeyLength = len( self._vmsResources['values'][i]['stat-list']['stat'] ) timeLength = len( self._vmsResources['values'][i]['stat-list']['stat'][0][ 'timestamps' ] ) for k in range(0, statKeyLength): for j in range(0, timeLength): self.export.append( { 'name': self._vmsResources['values'][i][ 'name' ], 'host': self._vmsResources['values'][i][ 'host' ], 'cluster': self.chosenCluster, 'timestamp': str(self._vmsResources['values'][i][ 'stat-list' ]['stat'][0]['timestamps'][j]), 'value': str(self._vmsResources['values'][i][ 'stat-list' ]['stat'][k]['data'][j]), 'statKey': self._vmsResources['values'][i][ 'stat-list' ]['stat'][k]['statKey']['key'], 'rollUpType': self._vmsResources['values'][i][ 'stat-list' ]['stat'][k]['rollUpType'], 'intervalQuantifier': str(self._vmsResources[ 'values' ][i]['stat-list']['stat'][k]['intervalUnit'][ 'quantifier' ]), 'intervalType': str(self._vmsResources['values'][ i ]['stat-list']['stat'][0]['intervalUnit'][ 'intervalType' ]), } ) return self @property def totalVMS(self): if isinstance(self.countVMSFiltered, list): self.__totalVMS = sum(self.countVMSFiltered) elif isinstance(self.countVMSFiltered, int): self.__totalVMS = self.countVMSFiltered return self.__totalVMS @property def epochNow(self): self.__epochNow = math.ceil(time.time() * 1000) return self.__epochNow @property def epochToday(self): now = datetime.now() self.__epochtoday = now + relativedelta(hour=0, minute=0, second=0) self.__epochToday = math.ceil(time.time() * 1000) return self.__epochToday @property def allVMS(self): return self.__allVMS @allVMS.setter def allVMS(self, allVMS): if not isinstance(allVMS, dict): raise TypeError( 'You must pass a dictionary with a key of the name' f' and a value of the VROPS ID, not {type(allVMS)}.' ) else: pass self.__allVMS = allVMS print('Successfully imported the dictionary.') return self.__allVMS @property def VMSHostsNames(self): return self.__VMSHostsNames @VMSHostsNames.setter def VMSHostsNames(self, VMSHostsNames): if not isinstance(VMSHostsNames, dict): raise TypeError( 'You must pass a dictionary with a key of the name' f' and a value of the VROPS ID, not {type(VMSHostsNames)}.' ) else: pass self.__VMSHostsNames = VMSHostsNames print('Successfully imported the dictionary.') return self.__VMSHostsNames @property def chosenCluster(self): return self.__chosenCluster @chosenCluster.setter def chosenCluster(self, chosenCluster): if not isinstance(chosenCluster, str): raise TypeError( 'You must pass a dictionary with a key of the name' f' and a value of the VROPS ID, not {type(chosenCluster)}.' ) else: pass self.__chosenCluster = chosenCluster return self.__chosenCluster