""" :mod: GFAL2_SRM2Storage ================= .. module: python :synopsis: SRM2 module based on the GFAL2_StorageBase class. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function # pylint: disable=invalid-name import six import errno import json import gfal2 # pylint: disable=import-error # from DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR from DIRAC.Resources.Storage.GFAL2_StorageBase import GFAL2_StorageBase from DIRAC.Resources.Storage.Utilities import checkArgumentFormat __RCSID__ = "3d9d586b0 (2021-08-03 16:28:29 +0200) Christophe Haen " class GFAL2_SRM2Storage(GFAL2_StorageBase): """ SRM2 SE class that inherits from GFAL2StorageBase """ _INPUT_PROTOCOLS = ['file', 'root', 'srm', 'gsiftp'] _OUTPUT_PROTOCOLS = ['file', 'root', 'dcap', 'gsidcap', 'rfio', 'srm', 'gsiftp'] def __init__(self, storageName, parameters): """ """ super(GFAL2_SRM2Storage, self).__init__(storageName, parameters) self.log = gLogger.getSubLogger("GFAL2_SRM2Storage") self.log.debug("GFAL2_SRM2Storage.__init__: Initializing object") self.pluginName = 'GFAL2_SRM2' # This attribute is used to know the file status (OFFLINE,NEARLINE,ONLINE) self._defaultExtendedAttributes = ['user.status'] # ## # Setting the default SRM parameters here. For methods where this # is not the default there is a method defined in this class, setting # the proper values and then calling the base class method. # ## self.gfal2requestLifetime = gConfig.getValue('/Resources/StorageElements/RequestLifeTime', 100) self.__setSRMOptionsToDefault() # This lists contains the list of protocols to ask to SRM to get a URL # It can be either defined in the plugin of the SE, or as a global option if 'ProtocolsList' in parameters: self.protocolsList = parameters['ProtocolsList'].split(',') else: self.log.debug("GFAL2_SRM2Storage: No protocols provided, using the default protocols.") self.protocolsList = self.defaultLocalProtocols self.log.debug('GFAL2_SRM2Storage: protocolsList = %s' % self.protocolsList) def __setSRMOptionsToDefault(self): ''' Resetting the SRM options back to default ''' self.ctx.set_opt_integer("SRM PLUGIN", "OPERATION_TIMEOUT", self.gfal2Timeout) if self.spaceToken: self.ctx.set_opt_string("SRM PLUGIN", "SPACETOKENDESC", self.spaceToken) self.ctx.set_opt_integer("SRM PLUGIN", "REQUEST_LIFETIME", self.gfal2requestLifetime) # Setting the TURL protocol to gsiftp because with other protocols we have authorisation problems # self.ctx.set_opt_string_list( "SRM PLUGIN", "TURL_PROTOCOLS", self.defaultLocalProtocols ) self.ctx.set_opt_string_list("SRM PLUGIN", "TURL_PROTOCOLS", ['gsiftp']) def _updateMetadataDict(self, metadataDict, attributeDict): """ Updating the metadata dictionary with srm specific attributes :param self: self reference :param dict: metadataDict we want add the SRM specific attributes to :param dict: attributeDict contains 'user.status' which we then fill in the metadataDict """ # 'user.status' is the extended attribute we are interested in user_status = attributeDict.get('user.status', '') metadataDict['Cached'] = int('ONLINE' in user_status) metadataDict['Migrated'] = int('NEARLINE' in user_status) metadataDict['Lost'] = int(user_status == 'LOST') metadataDict['Unavailable'] = int(user_status == 'UNAVAILABLE') metadataDict['Accessible'] = not metadataDict['Lost'] and metadataDict['Cached'] and not metadataDict['Unavailable'] def getTransportURL(self, path, protocols=False): """ obtain the tURLs for the supplied path and protocols :param self: self reference :param str path: path on storage :param mixed protocols: protocols to use :returns: Failed dict {path : error message} Successful dict {path : transport url} S_ERROR in case of argument problems """ res = checkArgumentFormat(path) if not res['OK']: return res urls = res['Value'] self.log.debug( 'GFAL2_SRM2Storage.getTransportURL: Attempting to retrieve tURL for %s paths' % len(urls)) failed = {} successful = {} if not protocols: listProtocols = self.protocolsList if not listProtocols: return S_ERROR( "GFAL2_SRM2Storage.getTransportURL: No local protocols defined and no defaults found.") elif isinstance(protocols, six.string_types): listProtocols = [protocols] elif isinstance(protocols, list): listProtocols = protocols else: return S_ERROR("getTransportURL: Must supply desired protocols to this plug-in.") # Compatibility because of castor returning a castor: url if you ask # for a root URL, and a root: url if you ask for a xroot url... if 'root' in listProtocols and 'xroot' not in listProtocols: listProtocols.insert(listProtocols.index('root'), 'xroot') elif 'xroot' in listProtocols and 'root' not in listProtocols: listProtocols.insert(listProtocols.index('xroot') + 1, 'root') if self.protocolParameters['Protocol'] in listProtocols: successful = {} failed = {} for url in urls: if self.isURL(url)['Value']: successful[url] = url else: failed[url] = 'getTransportURL: Failed to obtain turls.' return S_OK({'Successful': successful, 'Failed': failed}) for url in urls: res = self.__getSingleTransportURL(url, listProtocols) self.log.debug('res = %s' % res) if not res['OK']: failed[url] = res['Message'] else: successful[url] = res['Value'] return S_OK({'Failed': failed, 'Successful': successful}) def __getSingleTransportURL(self, path, protocols=False): """ Get the tURL from path with getxattr from gfal2 :param self: self reference :param str path: path on the storage :returns: S_OK( Transport_URL ) in case of success S_ERROR( errStr ) in case of a failure """ self.log.debug( 'GFAL2_SRM2Storage.__getSingleTransportURL: trying to retrieve tURL for %s' % path) if protocols: self.ctx.set_opt_string_list("SRM PLUGIN", "TURL_PROTOCOLS", protocols) res = self._getExtendedAttributes(path, attributes=['user.replicas']) self.__setSRMOptionsToDefault() if res['OK']: return S_OK(res['Value']['user.replicas']) errStr = 'GFAL2_SRM2Storage.__getSingleTransportURL: Extended attribute tURL is not set.' self.log.debug(errStr, res['Message']) return res def getOccupancy(self, *parms, **kws): """ Gets the GFAL2_SRM2Storage occupancy info. TODO: needs gfal2.15 because of bugs: https://its.cern.ch/jira/browse/DMC-979 https://its.cern.ch/jira/browse/DMC-977 It queries the srm interface for a given space token. Out of the results, we keep totalsize, guaranteedsize, and unusedsize all in B. """ if not self.spaceToken: self.log.info("getOccupancy: SpaceToken not defined for this SE. Falling back to the default getOccupancy.") return super(GFAL2_SRM2Storage, self).getOccupancy(*parms, **kws) # Gfal2 extended parameter name to query the space token occupancy spaceTokenAttr = 'spacetoken.description?%s' % self.protocolParameters['SpaceToken'] # gfal2 can take any srm url as a base. spaceTokenEndpoint = self.getURLBase(withWSUrl=True)['Value'] try: occupancyStr = self.ctx.getxattr(spaceTokenEndpoint, spaceTokenAttr) try: occupancyDict = json.loads(occupancyStr)[0] except ValueError: # https://its.cern.ch/jira/browse/DMC-977 # a closing bracket is missing, so we retry after adding it occupancyStr = occupancyStr[:-1] + '}]' occupancyDict = json.loads(occupancyStr)[0] # https://its.cern.ch/jira/browse/DMC-979 # We set totalsize to guaranteed size # (it is anyway true for all the SEs I could test) occupancyDict['totalsize'] = occupancyDict.get('guaranteedsize', 0) except (gfal2.GError, ValueError) as e: errStr = 'Something went wrong while checking for spacetoken occupancy.' self.log.verbose(errStr, e.message) return S_ERROR(getattr(e, 'code', errno.EINVAL), "%s %s" % (errStr, repr(e))) sTokenDict = {} sTokenDict['Total'] = float(occupancyDict.get('totalsize', '0')) sTokenDict['Free'] = float(occupancyDict.get('unusedsize', '0')) sTokenDict['SpaceReservation'] = self.protocolParameters['SpaceToken'] return S_OK(sTokenDict)