""" DIRAC API Base Class """ from __future__ import print_function from __future__ import absolute_import from __future__ import division import six import pprint import sys from DIRAC import gLogger, gConfig, S_OK, S_ERROR from DIRAC.Core.Security.ProxyInfo import getProxyInfo, formatProxyInfoAsString from DIRAC.Core.Utilities.Version import getCurrentVersion from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites __RCSID__ = "c59810311 (2021-10-01 10:44:25 +0200) Chris Burr " COMPONENT_NAME = "API" def _printFormattedDictList(dictList, fields, uniqueField, orderBy): """Will print ordered the supplied field of a list of dictionaries :param list dictList: list of dictionaries :param list fields: fields :param str uniqueField: unique field :param str orderBy: ordered """ orderDict = {} fieldWidths = {} dictFields = {} for myDict in dictList: for field in fields: fieldValue = myDict[field] if field not in fieldWidths: fieldWidths[field] = len(str(field)) if len(str(fieldValue)) > fieldWidths[field]: fieldWidths[field] = len(str(fieldValue)) orderValue = myDict[orderBy] if orderValue not in orderDict: orderDict[orderValue] = [] orderDict[orderValue].append(myDict[uniqueField]) dictFields[myDict[uniqueField]] = myDict headString = "%s" % fields[0].ljust(fieldWidths[fields[0]] + 5) for field in fields[1:]: headString = "%s %s" % (headString, field.ljust(fieldWidths[field] + 5)) print(headString) for orderValue in sorted(orderDict): uniqueFields = orderDict[orderValue] for uniqueField in sorted(uniqueFields): myDict = dictFields[uniqueField] outStr = "%s" % str(myDict[fields[0]]).ljust(fieldWidths[fields[0]] + 5) for field in fields[1:]: outStr = "%s %s" % (outStr, str(myDict[field]).ljust(fieldWidths[field] + 5)) print(outStr) # TODO: some of these can just be functions, and moved out of here class API(object): """An utilities class for APIs""" ############################################################################# def __init__(self): """c'tor""" self._printFormattedDictList = _printFormattedDictList self.log = gLogger.getSubLogger(COMPONENT_NAME) self.section = COMPONENT_NAME self.pPrint = pprint.PrettyPrinter() # Global error dictionary self.errorDict = {} self.setup = gConfig.getValue("/DIRAC/Setup", "Unknown") self.diracInfo = getCurrentVersion()["Value"] self._siteSet = set(getSites().get("Value", [])) ############################################################################# def __getstate__(self): """Return a copied dictionary containing all the attributes of the API. Called when pickling the object. Also used in copy.deepcopy. :return: dictionary of attributes """ from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging state = dict(self.__dict__) # Replace the Logging instance by its name because it is not copyable # because of the thread locks if isinstance(state["log"], Logging): state["log"] = state["log"].getSubName() # In Python 3 PrettyPrinter can't be pickled del state["pPrint"] return state def __setstate__(self, state): """Parameter the Job with an attributes dictionary. Called when un-pickling the object. :params state: attributes dictionary """ self.__dict__.update(state) # Build the Logging instance again because it can not be in the dictionary # due to the thread locks if isinstance(state["log"], six.string_types): self.log = gLogger.getSubLogger(state["log"]) self.pPrint = pprint.PrettyPrinter() ############################################################################# def _errorReport(self, error, message=None): """Internal function to return errors and exit with an S_ERROR() :param str error: error :param str message: message :return: S_ERROR(str) """ if not message: message = error self.log.warn(error) return S_ERROR(message) ############################################################################# def _prettyPrint(self, myObject): """Helper function to pretty print an object. :param object myObject: an object to pring """ print(self.pPrint.pformat(myObject)) ############################################################################# def _checkSiteIsValid(self, site): """Internal function to check that a site name is valid.""" if isinstance(site, (list, set, dict)): site = set(site) - self._siteSet if not site: return S_OK() elif site in self._siteSet: return S_OK() return self._reportError("Specified site %s is not in list of defined sites" % str(site)) ############################################################################# def _getCurrentUser(self): """Get current user :return: S_OK(dict)/S_ERROR() """ res = getProxyInfo(False, False) if not res["OK"]: return self._errorReport("No proxy found in local environment", res["Message"]) proxyInfo = res["Value"] gLogger.debug(formatProxyInfoAsString(proxyInfo)) if "group" not in proxyInfo: return self._errorReport("Proxy information does not contain the group", res["Message"]) res = getDNForUsername(proxyInfo["username"]) if not res["OK"]: return self._errorReport("Failed to get proxies for user", res["Message"]) return S_OK(proxyInfo["username"]) ############################################################################# def _reportError(self, message, name="", **kwargs): """Internal Function. Gets caller method name and arguments, formats the information and adds an error to the global error dictionary to be returned to the user. :param str message: message :param str name: name :return: S_ERROR(str) """ className = name if not name: className = __name__ methodName = sys._getframe(1).f_code.co_name arguments = [] for key in kwargs: if kwargs[key]: arguments.append("%s = %s ( %s )" % (key, kwargs[key], type(kwargs[key]))) finalReport = """Problem with %s.%s() call: Arguments: %s Message: %s """ % ( className, methodName, "/".join(arguments), message, ) if methodName in self.errorDict: tmp = self.errorDict[methodName] tmp.append(finalReport) self.errorDict[methodName] = tmp else: self.errorDict[methodName] = [finalReport] self.log.verbose(finalReport) return S_ERROR(finalReport)