""" Class for the LCG File Catalog Client """ from __future__ import print_function from __future__ import absolute_import from __future__ import division __RCSID__ = "$Id" from past.builtins import long import six from stat import * import os import re import time import DIRAC from DIRAC import S_OK, S_ERROR, gLogger from DIRAC.Resources.Catalog.Utilities import checkCatalogArguments from DIRAC.Core.Utilities.Time import fromEpoch from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Security.ProxyInfo import getProxyInfo, formatProxyInfoAsString from DIRAC.ConfigurationSystem.Client.Helpers.Registry import ( getDNForUsername, getVOMSAttributeForGroup, getVOForGroup, getVOOption, ) from DIRAC.Resources.Catalog.FileCatalogClientBase import FileCatalogClientBase lfc = None importedLFC = None #################################################################### # # These are some functions used by all methods in the class # def setLfnReplicas(lfn, replicas, successful, failed): if not lfn: return if replicas: successful[lfn] = replicas.copy() replicas.clear() elif lfn not in failed: failed[lfn] = "No active replica" def getClientCertInfo(): res = getProxyInfo(False, False) if not res["OK"]: gLogger.error("getClientCertInfo: Failed to get client proxy information.", res["Message"]) return res proxyInfo = res["Value"] gLogger.debug(formatProxyInfoAsString(proxyInfo)) if "group" not in proxyInfo: errStr = "getClientCertInfo: Proxy information does not contain the group." gLogger.error(errStr) return S_ERROR(errStr) if "VOMS" not in proxyInfo: proxyInfo["VOMS"] = getVOMSAttributeForGroup(proxyInfo["group"]) errStr = "getClientCertInfo: Proxy information does not contain the VOMs information." gLogger.warn(errStr) res = getDNForUsername(proxyInfo["username"]) if not res["OK"]: errStr = "getClientCertInfo: Error getting known proxies for user." gLogger.error(errStr, res["Message"]) return S_ERROR(errStr) diracGroup = proxyInfo.get("group", "Unknown") vo = getVOForGroup(diracGroup) vomsVO = getVOOption(vo, "VOMSVO", "") resDict = { "DN": proxyInfo["identity"], "Role": proxyInfo["VOMS"], "User": proxyInfo["username"], "AllDNs": res["Value"], "Group": diracGroup, "VO": vo, "VOMSVO": vomsVO, } return S_OK(resDict) def existsGuid(guid): """Check if the guid exists""" fstat = lfc.lfc_filestatg() error = lfc.lfc_statg("", guid, fstat) return returnCode(error and lfc.cvar.serrno != 2, not error) def getDNFromUID(userID): buff = " " * (lfc.CA_MAXNAMELEN + 1) res = lfc.lfc_getusrbyuid(userID, buff) if res == 0: dn = buff[: buff.find("\x00")] gLogger.debug("LcgFileCatalogClient.getDNFromUID: UID %s maps to %s." % (userID, dn)) return S_OK(dn) else: errStr = "LcgFileCatalogClient.getDNFromUID: Failed to get DN from UID" gLogger.error(errStr, "%s %s" % (userID, lfc.sstrerror(lfc.cvar.serrno))) return S_ERROR(errStr) def getRoleFromGID(groupID, path=None): buff = " " * (lfc.CA_MAXNAMELEN + 1) res = lfc.lfc_getgrpbygid(groupID, buff) if res == 0: role = buff[: buff.find("\x00")] if role == "lhcb": role = "lhcb/Role=user" gLogger.debug("LcgFileCatalogClient.getRoleFromGID: GID %s maps to %s." % (groupID, role)) return S_OK(role) else: errStr = "LcgFileCatalogClient:getRoleFromGID: Failed to get role from GID" gLogger.error(errStr, "%s %s%s" % (groupID, ("(%s) " % path) if path else "", lfc.sstrerror(lfc.cvar.serrno))) return S_ERROR() def addReplica(guid, pfn, se, master): fid = lfc.lfc_fileid() status = "U" f_type = "D" poolname = "" fs = "" error = lfc.lfc_addreplica(guid, fid, se, pfn, status, f_type, poolname, fs) # If replica exists, re-register it as one may have changed some parameters (status, se) if lfc.sstrerror(lfc.cvar.serrno) == "File exists": retCode = removeReplica(pfn) if not retCode["OK"]: return retCode error = lfc.lfc_addreplica(guid, fid, se, pfn, status, f_type, poolname, fs) return returnCode(error) def removeReplica(pfn): fid = lfc.lfc_fileid() error = lfc.lfc_delreplica("", fid, pfn) return returnCode(error and error != 2) def setReplicaStatus(pfn, status): return returnCode(lfc.lfc_setrstatus(pfn, status)) def modReplica(pfn, newse): return returnCode(lfc.lfc_modreplica(pfn, "", "", newse)) def closeDirectory(oDirectory): return returnCode(lfc.lfc_closedir(oDirectory)) def getDNUserID(dn): error, users = lfc.lfc_getusrmap() userid = None for userMap in users if not error else []: if userMap.username == dn: userid = userMap.userid break return returnCode(userid is None, userid, errMsg="DN does not exist" if not error else "") def addUserDN(userID, dn): error = lfc.lfc_enterusrmap(userID, dn) # 17 is if dn already exists, then OK return returnCode(error and lfc.cvar.serrno != 17) def returnCode(error, value="", errMsg=""): if not error: return S_OK(value) elif errMsg: return S_ERROR(errMsg) else: return S_ERROR(lfc.sstrerror(lfc.cvar.serrno)) ##################################################### # # LFC catalog client class # ##################################################### class LcgFileCatalogClient(FileCatalogClientBase): READ_METHODS = FileCatalogClientBase.READ_METHODS + [ "isLink", "readLink", "isFile", "getFileMetadata", "getReplicas", "getReplicaStatus", "getFileSize", "isDirectory", "getDirectoryReplicas", "listDirectory", "getDirectoryMetadata", "getDirectorySize", "getDirectoryContents", "resolveDataset", "getLFNForPFN", "getUserDirectory", ] WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [ "createLink", "removeLink", "addFile", "addReplica", "removeReplica", "removeFile", "setReplicaStatus", "setReplicaHost", "createDirectory", "removeDirectory", "removeDataset", "removeFileFromDataset", "createDataset", "changePathOwner", "changePathMode", ] NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [ "getUserDirectory", "createUserDirectory", "createUserMapping", "removeUserDirectory", ] ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [ "getUserDirectory", "createUserDirectory", "createUserMapping", "removeUserDirectory", ] def __init__(self, **options): global lfc, importedLFC if importedLFC is None: try: import lfcthr as lfc # This is necessary to make the LFC client thread safe. lfc.init() importedLFC = True gLogger.debug("LcgFileCatalogClient.__init__: Successfully imported lfc module.") except ImportError: importedLFC = False gLogger.exception("LcgFileCatalogClient.__init__: Failed to import lfc module.") self.valid = importedLFC infosys = options.get("LcgGfalInfosys") if not infosys and "LCG_GFAL_INFOSYS" in os.environ: # if not in CS take from environ infosys = os.environ["LCG_GFAL_INFOSYS"] host = options.get("MasterHost") if not host and "LFC_HOST" in os.environ: # if not in CS take from environ host = os.environ["LFC_HOST"] self.host = host if host: os.environ["LFC_HOST"] = host if infosys: os.environ["LCG_GFAL_INFOSYS"] = infosys if "LFC_CONRETRYINT" not in os.environ: os.environ["LFC_CONRETRYINT"] = "5" if "LFC_CONNTIMEOUT" not in os.environ: os.environ["LFC_CONNTIMEOUT"] = "5" if "LFC_CONRETRY" not in os.environ: os.environ["LFC_CONRETRY"] = "5" self.prefix = "/grid" self.session = False self.transaction = False def isOK(self): return self.valid def getName(self): return S_OK(self.name) #################################################################### # # These are the methods for session/transaction manipulation # def __openSession(self): """Open the LFC client/server session""" if self.session: # Another thread is holding a session, can't create it return 0 else: sessionName = "DIRAC_%s.%s at %s at time %s" % ( DIRAC.majorVersion, DIRAC.minorVersion, DIRAC.siteName(), time.time(), ) rc = lfc.lfc_startsess(self.host, sessionName) self.session = rc == 0 # if there was an error, return -1, to be tested just after the call... return 1 if self.session else -1 def __closeSession(self): """Close the LFC client/server session""" if self.session: lfc.lfc_endsess() self.session = False def __startTransaction(self): """Begin transaction for one time commit""" if not self.transaction: transactionName = "Transaction: DIRAC_%s.%s at %s at time %s" % ( DIRAC.majorVersion, DIRAC.minorVersion, DIRAC.siteName(), time.time(), ) lfc.lfc_starttrans(self.host, transactionName) self.transaction = True def __abortTransaction(self): """Abort transaction""" if self.transaction: lfc.lfc_aborttrans() self.transaction = False def __endTransaction(self): """End transaction gracefully""" if self.transaction: lfc.lfc_endtrans() self.transaction = False def setAuthorizationId(self, dn): """Set authorization id for the proxy-less LFC communication""" lfc.lfc_client_setAuthorizationId(0, 0, "GSI", dn) #################################################################### # # The following are read methods for paths # @checkCatalogArguments def exists(self, lfns): """Check if the path exists""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn, guid in lfns.items(): res = self.__existsLfn(lfn) if not res["OK"]: failed[lfn] = res["Message"] elif res["Value"]: successful[lfn] = lfn elif not isinstance(guid, six.string_types): successful[lfn] = False else: res = existsGuid(guid) if not res["OK"]: failed[lfn] = res["Message"] elif not res["Value"]: successful[lfn] = False else: successful[lfn] = self.__getLfnForGUID(guid)["Value"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) def __getPathAccess(self, path): """Determine the permissions using the lfc function lfc_access""" permDict = {"Read": 1, "Write": 2, "Execute": 4} resDict = {} for p in permDict: code = permDict[p] value = lfc.lfc_access(self.__fullLfn(path), code) if value == 0: resDict[p] = True else: resDict[p] = False return S_OK(resDict) @checkCatalogArguments def hasAccess(self, lfns, opType): if opType in LcgFileCatalogClient.READ_METHODS: opType = "Read" elif opType in LcgFileCatalogClient.WRITE_METHODS: opType = "Write" res = self.getPathPermissions(lfns) if not res["OK"]: return res perms = res["Value"] failed = perms["Failed"] successful = dict((path, perms["Successful"][path].get(opType, False)) for path in perms["Successful"]) return S_OK({"Successful": successful, "Failed": failed}) @checkCatalogArguments def getPathPermissions(self, lfns): """Determine the VOMs based ACL information for a supplied path""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for path in lfns: res = self.__getBasePath(path) if not res["OK"]: failed[path] = res["Message"] else: basePath = res["Value"] res = self.__getPathAccess(basePath) if not res["OK"]: failed[path] = res["Message"] else: lfcPerm = res["Value"] res = self.__getACLInformation(basePath) if not res["OK"]: failed[path] = res["Message"] else: # Evaluate access rights val = res["Value"] try: lfcPerm["user"] = val["user"] lfcPerm["group"] = val["group"] lfcPerm["world"] = val["world"] lfcPerm["DN"] = val["DN"] lfcPerm["Role"] = val["Role"] except KeyError: print("key not found: __getACLInformation returned incomplete dictionary", KeyError) failed[path] = lfcPerm continue # ACLs are just an additional information, therefore here it is successful successful[path] = lfcPerm if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are read methods for files # @checkCatalogArguments def isFile(self, lfns): # If we have less than three lfns to query a session doesn't make sense created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn in lfns: res = self.__getPathStat(lfn) if not res["OK"]: failed[lfn] = res["Message"] elif S_ISREG(res["Value"].filemode): successful[lfn] = True else: successful[lfn] = False if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getFileMetadata(self, lfns, ownership=False): """Returns the file metadata associated to a supplied LFN""" # If we have less than three lfns to query a session doesn't make sense created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn in lfns: res = self.__getPathStat(lfn) if not res["OK"]: failed[lfn] = res["Message"] else: fstat = res["Value"] successful[lfn] = {} successful[lfn]["Size"] = fstat.filesize successful[lfn]["ChecksumType"] = fstat.csumtype successful[lfn]["Checksum"] = fstat.csumvalue successful[lfn]["GUID"] = fstat.guid successful[lfn]["Status"] = fstat.status successful[lfn]["CreationDate"] = fromEpoch(fstat.ctime) successful[lfn]["ModificationDate"] = fromEpoch(fstat.mtime) successful[lfn]["NumberOfLinks"] = fstat.nlink successful[lfn]["Mode"] = S_IMODE(fstat.filemode) if ownership: res = getDNFromUID(fstat.uid) if res["OK"]: successful[lfn]["OwnerDN"] = res["Value"] else: successful[lfn]["OwnerDN"] = None res = getRoleFromGID(fstat.gid, path=lfn) if res["OK"]: successful[lfn]["OwnerRole"] = res["Value"] else: successful[lfn]["OwnerRole"] = None if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getFileSize(self, lfns): """Get the size of a supplied file""" # If we have less than three lfns to query a session doesn't make sense created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn in lfns: res = self.__getPathStat(lfn) if not res["OK"]: failed[lfn] = res["Message"] else: successful[lfn] = res["Value"].filesize if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getReplicas(self, lfns, allStatus=False): """Returns replicas for an LFN or list of LFNs""" lfnChunks = breakListIntoChunks(lfns, 1000) # If we have less than three groups to query a session doesn't make sense created = False if len(lfnChunks) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfnList in lfnChunks: fullLfnList = [self.__fullLfn(lfn) for lfn in lfnList if lfn] value, replicaList = lfc.lfc_getreplicasl(fullLfnList, "") if value != 0: for lfn in lfnList: reason = lfc.sstrerror(lfc.cvar.serrno) if "Could not secure the connection" in reason: # This is a fatal error return S_ERROR("Could not secure the connection") elif "Bad credentials" in reason: return S_ERROR("Bad Credentials") continue guid = "" it = iter(lfnList) replicas = {} # This is useless but makes pylinit happy as lfn is defined in the loop when the guid changes lfn = None for oReplica in replicaList: if oReplica.errcode != 0: if (oReplica.guid == "") or (oReplica.guid != guid): setLfnReplicas(lfn, replicas, successful, failed) lfn = it.next() failed[lfn] = lfc.sstrerror(oReplica.errcode) guid = oReplica.guid elif oReplica.sfn == "": setLfnReplicas(lfn, replicas, successful, failed) lfn = it.next() failed[lfn] = "File has zero replicas" guid = oReplica.guid else: # This is where we change lfn for good! if oReplica.guid != guid: setLfnReplicas(lfn, replicas, successful, failed) lfn = it.next() guid = oReplica.guid if (oReplica.status != "P") or allStatus: se = oReplica.host pfn = oReplica.sfn # .strip() replicas[se] = pfn # This is for the last file in the list setLfnReplicas(lfn, replicas, successful, failed) if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getReplicaStatus(self, lfns): # If we have less than three lfns to query a session doesn't make sense created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn, se in lfns.items(): res = self.__getFileReplicaStatus(lfn, se) if not res["OK"]: failed[lfn] = res["Message"] else: successful[lfn] = res["Value"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getLFNForPFN(self, pfns): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for pfn in pfns: res = self.__getLFNForPFN(pfn) if not res["OK"]: failed[pfn] = res["Message"] else: successful[pfn] = res["Value"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following a read methods for directories # @checkCatalogArguments def isDirectory(self, lfns): """Determine whether the path is a directory""" # If we have less than three lfns to query a session doesn't make sense created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn in lfns: res = self.__getPathStat(lfn) if not res["OK"]: failed[lfn] = res["Message"] elif S_ISDIR(res["Value"].filemode): successful[lfn] = True else: successful[lfn] = False if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getDirectoryMetadata(self, lfns): # If we have less than three lfns to query a session doesn't make sense created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn in lfns: res = self.__getPathStat(lfn) if not res["OK"]: failed[lfn] = res["Message"] else: fstat = res["Value"] successful[lfn] = {} successful[lfn]["Size"] = fstat.filesize successful[lfn]["ChecksumType"] = fstat.csumtype successful[lfn]["Checksum"] = fstat.csumvalue successful[lfn]["GUID"] = fstat.guid successful[lfn]["Status"] = fstat.status successful[lfn]["CreationDate"] = fromEpoch(fstat.ctime) successful[lfn]["ModificationDate"] = fromEpoch(fstat.mtime) successful[lfn]["NumberOfSubPaths"] = fstat.nlink res = getDNFromUID(fstat.uid) if res["OK"]: successful[lfn]["OwnerDN"] = res["Value"] else: successful[lfn]["OwnerDN"] = None res = getRoleFromGID(fstat.gid, path=lfn) if res["OK"]: successful[lfn]["OwnerRole"] = res["Value"] else: successful[lfn]["OwnerRole"] = None successful[lfn]["Mode"] = S_IMODE(fstat.filemode) if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getDirectoryReplicas(self, lfns, allStatus=False): """This method gets all of the pfns in the directory""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for path in lfns: res = self.__getDirectoryContents(path) if not res["OK"]: failed[path] = res["Message"] else: pathReplicas = {} files = res["Value"]["Files"] for lfn, fileDict in files.items(): pathReplicas[lfn] = {} for se, seDict in fileDict["Replicas"].items(): pfn = seDict["PFN"] status = seDict["Status"] if (status != "P") or allStatus: pathReplicas[lfn][se] = pfn successful[path] = pathReplicas if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def listDirectory(self, lfns, verbose=False): """Returns the result of __getDirectoryContents for multiple supplied paths""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for path in lfns: res = self.__getDirectoryContents(path, verbose) if res["OK"]: successful[path] = res["Value"] else: failed[path] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def getDirectorySize(self, lfns, longOutput=False, rawFiles=False): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for path in lfns.keys(): res = self.__getDirectorySize(path, longOutput=longOutput) if res["OK"]: successful[path] = res["Value"] else: failed[path] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are read methods for links # @checkCatalogArguments def isLink(self, links): # If we have less than three lfns to query a session doesn't make sense failed = {} successful = {} created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") for link in links: res = self.__getLinkStat(link) if not res["OK"]: failed[link] = res["Message"] elif S_ISLNK(res["Value"].filemode): successful[link] = True else: successful[link] = False if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def readLink(self, links): # If we have less than three lfns to query a session doesn't make sense created = False if len(links) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for link in links: res = self.__readLink(link) if res["OK"]: successful[link] = res["Value"] else: failed[link] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are read methods for datasets # @checkCatalogArguments def resolveDataset(self, datasets, allStatus=False): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") successful = {} failed = {} for datasetName in datasets: res = self.__getDirectoryContents(datasetName) if not res["OK"]: failed[datasetName] = res["Message"] else: # linkDict = res['Value']['Links'] linkDict = res["Value"]["Files"] datasetFiles = {} for link, fileMetadata in linkDict.items(): # target = fileMetadata[link]['MetaData']['Target'] target = link datasetFiles[target] = fileMetadata["Replicas"] successful[datasetName] = datasetFiles if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following a write methods for files # @checkCatalogArguments def addFile(self, lfns): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} baseDirs = [] for lfn in lfns: baseDir = os.path.dirname(lfn) if baseDir in baseDirs: continue baseDirs.append(baseDir) res = self.__executeOperation(baseDir, "exists") # If we failed to find out whether the directory exists if not res["OK"]: continue # If the directory exists if res["Value"]: continue # Make the directories recursively if needed res = self.__makeDirs(baseDir) # If we failed to make the directory for the file if not res["OK"]: continue lfc.lfc_umask(0000) for lfnList in breakListIntoChunks(lfns, 1000): fileChunk = [] for lfn in list(lfnList): lfnInfo = lfns[lfn] pfn = lfnInfo["PFN"] size = lfnInfo["Size"] se = lfnInfo["SE"] guid = lfnInfo["GUID"] checksum = lfnInfo["Checksum"] res = self.__checkAddFile(lfn, pfn, size, se, guid, checksum) if not res["OK"]: # Error failed[lfn] = res["Message"] lfnList.remove(lfn) elif not res["Value"]: # File already exists adn is consistent successful[lfn] = True lfnList.remove(lfn) else: # File doesn't exist, create it oFile = lfc.lfc_filereg() oFile.lfn = self.__fullLfn(lfn) oFile.sfn = pfn oFile.size = size oFile.mode = 0o664 oFile.server = se oFile.guid = guid oFile.csumtype = "AD" oFile.status = "U" oFile.csumvalue = lfnInfo["Checksum"] fileChunk.append(oFile) if not lfnList: continue error, errCodes = lfc.lfc_registerfiles(fileChunk) if error or (len(errCodes) != len(lfnList)): for lfn in lfnList: failed[lfn] = lfc.sstrerror(lfc.cvar.serrno) continue for index in range(len(errCodes)): lfn = lfnList[index] errCode = errCodes[index] if errCode == 0: successful[lfn] = True elif errCode == 17: failed[lfn] = "The supplied GUID is already used" res = self.__getLfnForGUID(guid) if res["OK"]: failed[lfn] = "The supplied GUID is already used by %s" % res["Value"] else: failed[lfn] = lfc.sstrerror(errCode) if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def addReplica(self, lfns): """This adds a replica to the catalogue.""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn, info in lfns.items(): pfn = info["PFN"] se = info["SE"] if "Master" not in info: master = False else: master = info["Master"] res = self.__getLFNGuid(lfn) if not res["OK"]: failed[lfn] = res["Message"] else: guid = res["Value"] res = addReplica(guid, pfn, se, master) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def removeFile(self, lfns): """Remove the supplied path""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") res = self.exists(lfns) if not res["OK"]: if created: self.__closeSession() return res failed = res["Value"]["Failed"] successful = {} for lfn, exists in res["Value"]["Successful"].items(): if not exists: successful[lfn] = True else: res = self.__unlinkPath(lfn) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def removeReplica(self, lfns): created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") res = self.getReplicas(lfns) # We need the PFNs of the input lfn (list) if not res["OK"]: return res for lfn, lfnrep in res["Value"]["Successful"].items(): if "PFN" not in lfns[lfn]: # Update only if the PFN was not supplied lfns[lfn]["PFN"] = lfnrep[lfns[lfn]["SE"]] failed = {} for lfn, message in res["Value"]["Failed"].items(): if "PFN" not in lfns[lfn]: # Change only if PFN is not there failed[lfn] = message # The replicas are not available, mark the lfn as failed lfns.pop(lfn) # and remove them successful = {} for lfn, info in lfns.items(): if ("PFN" not in info) or ("SE" not in info): failed[lfn] = "Required parameters not supplied" else: pfn = info["PFN"] se = info["SE"] res = removeReplica(pfn) if res["OK"]: successful[lfn] = True else: if res["Message"] == "No such file or directory": # The PFN didn't exist, but maybe it wsa changed... res1 = self.getReplicas(lfn) if res1["OK"]: pfn1 = res1["Value"]["Successful"].get(lfn, {}).get(se) if pfn1 and pfn1 != pfn: res = removeReplica(pfn1) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] lfnRemoved = list(successful) if len(lfnRemoved) > 0: res = self.getReplicas(lfnRemoved, True) zeroReplicaFiles = [] if not res["OK"]: if created: self.__closeSession() return res else: for lfn, repDict in res["Value"]["Successful"].items(): if len(repDict) == 0: zeroReplicaFiles.append(lfn) if len(zeroReplicaFiles) > 0: res = self.removeFile(zeroReplicaFiles) if not res["OK"]: if created: self.__closeSession() return res if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def setReplicaStatus(self, lfns): created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn, info in lfns.items(): pfn = info["PFN"] status = info["Status"] res = setReplicaStatus(pfn, status[0]) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def setReplicaProblematic(self, lfns, revert=False): """ Set replicas to problematic. :param lfn lfns: has to be formated this way : { lfn : { se1 : pfn1, se2 : pfn2, ...}, ...} :param revert: If True, remove the problematic flag :return: { Successful : { lfn : [ ses ] }, Failed : { lfn : { se : msg } } } """ # This method does a batch treatment because the setReplicaStatus can only take one replica per lfn at once # # Illustration : # # lfns {'L2': {'S1': 'P3'}, 'L3': {'S3': 'P5', 'S2': 'P4', 'S4': 'P6'}, 'L1': {'S2': 'P2', 'S1': 'P1'}} # # loop1: lfnSEs {'L2': ['S1'], 'L3': ['S3', 'S2', 'S4'], 'L1': ['S2', 'S1']} # loop1 : batch {'L2': {'Status': 'P', 'SE': 'S1', 'PFN': 'P3'}, 'L3': {'Status': 'P', 'SE': 'S4', 'PFN': 'P6'}, # 'L1': {'Status': 'P', 'SE': 'S1', 'PFN': 'P1'}} # # loop2: lfnSEs {'L2': [], 'L3': ['S3', 'S2'], 'L1': ['S2']} # loop2 : batch {'L3': {'Status': 'P', 'SE': 'S2', 'PFN': 'P4'}, 'L1': {'Status': 'P', 'SE': 'S2', 'PFN': 'P2'}} # # loop3: lfnSEs {'L3': ['S3'], 'L1': []} # loop3 : batch {'L3': {'Status': 'P', 'SE': 'S3', 'PFN': 'P5'}} # # loop4: lfnSEs {'L3': []} # loop4 : batch {} successful = {} failed = {} status = "-" if revert else "P" # { lfn : [ se1, se2, ...], ...} lfnsSEs = dict((lfn, [se for se in lfns[lfn]]) for lfn in lfns) while lfnsSEs: # { lfn : { 'SE' : se1, 'PFN' : pfn1, 'Status' : status }, ... } batch = {} for lfn in list(lfnsSEs): # If there are still some Replicas (SE) for the given LFN, we put it in the next batch # else we remove the entry from the lfnsSEs dict if lfnsSEs[lfn]: se = lfnsSEs[lfn].pop() batch[lfn] = {"SE": se, "PFN": lfns[lfn][se], "Status": status} else: del lfnsSEs[lfn] # Happens when there is nothing to treat anymore if not batch: break res = self.setReplicaStatus(batch) if not res["OK"]: for lfn in batch: failed.setdefault(lfn, {})[batch[lfn]["SE"]] = res["Message"] continue for lfn in res["Value"]["Failed"]: failed.setdefault(lfn, {})[batch[lfn]["SE"]] = res["Value"]["Failed"][lfn] for lfn in res["Value"]["Successful"]: successful.setdefault(lfn, []).append(batch[lfn]["SE"]) return S_OK({"Successful": successful, "Failed": failed}) @checkCatalogArguments def setReplicaHost(self, lfns): """This modifies the replica metadata for the SE.""" created = False if len(lfns) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for lfn, info in lfns.items(): pfn = info["PFN"] newse = info["NewSE"] res = modReplica(pfn, newse) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are write methods for directories # @checkCatalogArguments def removeDirectory(self, lfns, recursive=False): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") res = self.exists(lfns) if not res["OK"]: if created: self.__closeSession() return res failed = res["Value"]["Failed"] successful = {} for lfn, exists in res["Value"]["Successful"].items(): if not exists: successful[lfn] = True continue if recursive: res = self.__removeDirs(lfn) else: res = self.__removeDirectory(lfn) if res["OK"]: successful[lfn] = True else: failed[lfn] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def createDirectory(self, lfns): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for path in lfns: res = self.__makeDirs(path) if res["OK"]: successful[path] = True else: failed[path] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are write methods for links # @checkCatalogArguments def createLink(self, links): # If we have less than three lfns to query a session doesn't make sense created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for link, target in links.items(): res = self.__makeDirs(os.path.dirname(link)) if not res["OK"]: failed[link] = res["Message"] else: res = self.__makeLink(link, target) if not res["OK"]: failed[link] = res["Message"] else: successful[link] = target if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def removeLink(self, links): # If we have less than three lfns to query a session doesn't make sense created = False if len(links) > 2: created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") failed = {} successful = {} for link in links: res = self.__unlinkPath(link) if not res["OK"]: failed[link] = res["Message"] else: successful[link] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # The following are write methods for datasets # @checkCatalogArguments def createDataset(self, datasets): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") successful = {} failed = {} for datasetName, lfns in datasets.items(): res = self.__executeOperation(datasetName, "exists") if not res["OK"]: return res elif res["Value"]: return S_ERROR("LcgFileCatalogClient.createDataset: This dataset already exists.") res = self.__createDataset(datasetName, lfns) if res["OK"]: successful[datasetName] = True else: self.__executeOperation(datasetName, "removeDataset") failed[datasetName] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def removeDataset(self, datasets): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") successful = {} failed = {} for datasetName in datasets: res = self.__removeDataset(datasetName) if not res["OK"]: failed[datasetName] = res["Message"] else: successful[datasetName] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def removeFileFromDataset(self, datasets): created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") successful = {} failed = {} for datasetName, lfns in datasets.items(): res = self.__removeFilesFromDataset(datasetName, lfns) if not res["OK"]: failed[datasetName] = res["Message"] else: successful[datasetName] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # These are the internal methods to be used by all methods # def __executeOperation(self, path, method): """Executes the requested functionality with the supplied path""" fcn = None if hasattr(self, method) and callable(getattr(self, method)): fcn = getattr(self, method) if not fcn: return S_ERROR("Unable to invoke %s, it isn't a member function of LcgFileCatalogClient" % method) res = fcn(path) if isinstance(path, dict): path = list(path)[0] if not res["OK"]: return res elif path not in res["Value"]["Successful"]: return S_ERROR(res["Value"]["Failed"][path]) return S_OK(res["Value"]["Successful"][path]) def __getLFNForPFN(self, pfn): fstat = lfc.lfc_filestatg() error = lfc.lfc_statr(pfn, fstat) return returnCode(error, self.__getLfnForGUID(fstat.guid) if not error else None) def __existsLfn(self, lfn): """Check whether the supplied LFN exists""" error = lfc.lfc_access(self.__fullLfn(lfn), 0) return returnCode(error and lfc.cvar.serrno != 2, error == 0) @checkCatalogArguments def getLFNForGUID(self, guids): guidLFN = {} failed = {} for guid in guids: # I somehow have the feeling that lfnlist[0] of __getLfnForGUID # could throw an exception if the guid does not exist in the DB # but... not touching this black magic try: guidLFN[guid] = self.__getLfnForGUID(guid)["Value"] except Exception as _e: failed[guid] = "GUID does not exist" return S_OK({"Successful": guidLFN, "Failed": failed}) def __getLfnForGUID(self, guid): """Resolve the LFN for a supplied GUID""" if not guid: return S_OK() linkList = lfc.lfc_list() lfnlist = [] listlinks = lfc.lfc_listlinks("", guid, lfc.CNS_LIST_BEGIN, linkList) while listlinks: ll = listlinks.path if re.search("^" + self.prefix, ll): ll = listlinks.path.replace(self.prefix, "", 1) lfnlist.append(ll) listlinks = lfc.lfc_listlinks("", guid, lfc.CNS_LIST_CONTINUE, linkList) else: lfc.lfc_listlinks("", guid, lfc.CNS_LIST_END, linkList) return S_OK(lfnlist[0]) def __getBasePath(self, path): exists = False while not exists: res = self.__executeOperation(path, "exists") if not res["OK"]: return res else: exists = res["Value"] if not exists: path = os.path.dirname(path) return S_OK(path) def __getACLInformation(self, path): results, objects = lfc.lfc_getacl(self.__fullLfn(path), 256) # lfc.CNS_ACL_GROUP_OBJ) if results == -1: errStr = "LcgFileCatalogClient.__getACLInformation: Failed to obtain all path ACLs." gLogger.error(errStr, "%s %s" % (path, lfc.sstrerror(lfc.cvar.serrno))) return S_ERROR(errStr) permissionsDict = {} for obj in objects: if obj.a_type == lfc.CNS_ACL_USER_OBJ: res = getDNFromUID(obj.a_id) if not res["OK"]: return res permissionsDict["DN"] = res["Value"] permissionsDict["user"] = obj.a_perm elif obj.a_type == lfc.CNS_ACL_GROUP_OBJ: res = getRoleFromGID(obj.a_id, path=path) if not res["OK"]: return res role = res["Value"] permissionsDict["Role"] = role permissionsDict["group"] = obj.a_perm elif obj.a_type == lfc.CNS_ACL_OTHER: permissionsDict["world"] = obj.a_perm else: errStr = "LcgFileCatalogClient.__getACLInformation: ACL type not considered." gLogger.debug(errStr, obj.a_type) gLogger.verbose( "LcgFileCatalogClient.__getACLInformation: %s owned by %s:%s." % (path, permissionsDict["DN"], permissionsDict["Role"]) ) return S_OK(permissionsDict) def __getPathStat(self, path="", guid=""): if path: path = self.__fullLfn(path) fstat = lfc.lfc_filestatg() error = lfc.lfc_statg(path, guid, fstat) return returnCode(error, fstat) def __getFileReplicas(self, lfn, allStatus): error, replicaObjects = lfc.lfc_getreplica(self.__fullLfn(lfn), "", "") return returnCode( error or not replicaObjects, dict([(replica.host, replica.sfn) for replica in replicaObjects if allStatus or replica.status != "P"]) if not error else None, errMsg="File has zero replicas" if not error else "", ) def __getFileReplicaStatus(self, lfn, se): error, replicaObjects = lfc.lfc_getreplica(self.__fullLfn(lfn), "", "") status = None for replica in replicaObjects if not error else []: if se == replica.host: status = replica.status break return returnCode(status is None, status, errMsg="No replica at supplied site" if not error else "") def __checkAddFile(self, lfn, pfn, size, se, guid, checksum): res = self.__getPathStat(lfn) if not res["OK"]: if res["Message"] != "No such file or directory": return S_ERROR("Failed to find pre-existance of LFN") else: # File exists, check if consistent with supplied parameters fstat = res["Value"] errStr = "" if fstat.guid != guid: errStr = "This LFN %s is already registered with another GUID" % lfn elif fstat.filesize != size: errStr = "This LFN %s is already registered with another size" % lfn elif fstat.csumvalue.upper() != checksum.upper(): errStr = "This LFN %s is already registered with another adler32" % lfn if errStr: return S_ERROR(errStr) res = self.__getFileReplicas(lfn, True) if not res["OK"]: return S_ERROR("Failed to obtain replicas for existing LFN %s" % lfn) replicas = res["Value"] if replicas.get(se) != pfn: return S_ERROR("This LFN %s is already registered with another SE/PFN" % lfn) return S_OK(False) # We reach here only if the file doesn't exist, which is what we look for!! # Now we check the arguments try: errStr = "" size = long(size) except Exception: errStr = "The size of the file must be an 'int','long' or 'string'" if not se: errStr = "The SE for the file was not supplied." elif not pfn: errStr = "The PFN for the file was not supplied." elif not lfn: errStr = "The LFN for the file was not supplied." elif not guid: errStr = "The GUID for the file was not supplied." elif not checksum: errStr = "The adler32 for the file was not supplied." if errStr: return S_ERROR(errStr) return S_OK(True) def __unlinkPath(self, lfn): return returnCode(lfc.lfc_unlink(self.__fullLfn(lfn))) def __removeDirectory(self, path): return returnCode(lfc.lfc_rmdir(self.__fullLfn(path))) def __removeDirs(self, path): """Black magic contained within...""" res = self.__getDirectoryContents(path) if not res["OK"]: return res subDirs = res["Value"]["SubDirs"] files = res["Value"]["Files"] for subDir in subDirs: res = self.__removeDirs(subDir) if not res["OK"]: return res if files: return S_ERROR("Directory not empty") return self.__removeDirectory(path) def __makeDirs(self, path, mode=0o775): """Black magic contained within....""" dirName = os.path.dirname(path) res = self.__executeOperation(path, "exists") if not res["OK"]: return res if res["Value"]: return S_OK() res = self.__executeOperation(dirName, "exists") if not res["OK"]: return res if res["Value"]: res = self.__makeDirectory(path, mode) else: res = self.__makeDirs(dirName, mode) res = self.__makeDirectory(path, mode) return res def __makeDirectory(self, path, mode): lfc.lfc_umask(0000) return returnCode(lfc.lfc_mkdir(self.__fullLfn(path), mode)) def __openDirectory(self, path): value = lfc.lfc_opendirg(self.__fullLfn(path), "") return returnCode(not value, value) def __getDirectoryContents(self, path, verbose=False): """Returns a dictionary containing all of the contents of a directory. This includes the metadata associated to files (replicas, size, guid, status) and the subdirectories found. """ # First check that the directory exists res = self.__executeOperation(path, "exists") if not res["OK"]: return res if not res["Value"]: return S_ERROR("No such file or directory") res = self.__getPathStat(path) if not res["OK"]: return res nbfiles = res["Value"].nlink res = self.__openDirectory(path) if not res["OK"]: return res oDirectory = res["Value"] subDirs = {} links = {} files = {} loop = list(range(nbfiles + 1)) while loop.pop(): result = lfc.lfc_readdirxr(oDirectory, "") if not result: # In some rare cases we reach the end of oDirectory, before nbfiles iterations (!!!) break entry, fileInfo = result pathMetadata = {} pathMetadata["Mode"] = S_IMODE(entry.filemode) subPath = "%s/%s" % (path, entry.d_name) if verbose: statRes = self.__getPathStat(subPath) if statRes["OK"]: oPath = statRes["Value"] pathMetadata["Size"] = oPath.filesize pathMetadata["ChecksumType"] = oPath.csumtype pathMetadata["Checksum"] = oPath.csumvalue pathMetadata["GUID"] = oPath.guid pathMetadata["Status"] = oPath.status pathMetadata["CreationDate"] = fromEpoch(oPath.ctime) pathMetadata["ModificationDate"] = fromEpoch(oPath.mtime) pathMetadata["NumberOfLinks"] = oPath.nlink pathMetadata["LastAccess"] = oPath.atime res = getDNFromUID(oPath.uid) if res["OK"]: pathMetadata["OwnerDN"] = res["Value"] else: pathMetadata["OwnerDN"] = None res = getRoleFromGID(oPath.gid, path=subPath) if res["OK"]: pathMetadata["OwnerRole"] = res["Value"] else: pathMetadata["OwnerRole"] = None if S_ISDIR(entry.filemode): subDirs[subPath] = pathMetadata else: replicaDict = {} if fileInfo: for replica in fileInfo: replicaDict[replica.host] = {"PFN": replica.sfn, "Status": replica.status} pathMetadata["Size"] = entry.filesize pathMetadata["GUID"] = entry.guid if S_ISLNK(entry.filemode): res = self.__executeOperation(subPath, "readLink") if res["OK"]: pathMetadata["Target"] = res["Value"] links[subPath] = {} links[subPath]["MetaData"] = pathMetadata links[subPath]["Replicas"] = replicaDict elif S_ISREG(entry.filemode): files[subPath] = {} files[subPath]["Replicas"] = replicaDict files[subPath]["MetaData"] = pathMetadata pathDict = {} res = closeDirectory(oDirectory) pathDict = {"Files": files, "SubDirs": subDirs, "Links": links} return S_OK(pathDict) def __getDirectorySize(self, path, longOutput=False): res = self.__executeOperation(path, "exists") if not res["OK"]: return res if not res["Value"]: return S_ERROR("No such file or directory") res = self.__getPathStat(path) if not res["OK"]: return res nbfiles = res["Value"].nlink res = self.__openDirectory(path) if not res["OK"]: return res oDirectory = res["Value"] pathDict = {"SubDirs": {}, "ClosedDirs": [], "Files": 0, "TotalSize": 0, "SiteUsage": {}} loop = list(range(nbfiles + 1)) while loop.pop(): entry, fileInfo = lfc.lfc_readdirxr(oDirectory, "") if S_ISDIR(entry.filemode): subDir = "%s/%s" % (path, entry.d_name) permissions = S_IMODE(entry.filemode) if (not permissions & S_IWUSR) and (not permissions & S_IWGRP) and (not permissions & S_IWOTH): pathDict["ClosedDirs"].append(subDir) modTime = time.ctime() statRes = self.__getPathStat(subDir) if statRes["OK"]: modTime = fromEpoch(statRes["Value"].mtime) pathDict["SubDirs"][subDir] = modTime else: fileSize = entry.filesize pathDict["TotalSize"] += fileSize pathDict["Files"] += 1 if not fileInfo: gLogger.error( "LcgFileCatalogClient.__getDirectorySize: File found with no replicas", "%s/%s" % (path, entry.d_name), ) else: for replica in fileInfo: if replica.host not in pathDict["SiteUsage"]: pathDict["SiteUsage"][replica.host] = {"Files": 0, "Size": 0} pathDict["SiteUsage"][replica.host]["Size"] += fileSize pathDict["SiteUsage"][replica.host]["Files"] += 1 res = closeDirectory(oDirectory) return S_OK(pathDict) def __getLinkStat(self, link): lstat = lfc.lfc_filestat() return returnCode(lfc.lfc_lstat(self.__fullLfn(link), lstat), lstat) def __readLink(self, link): buff = " " * (lfc.CA_MAXPATHLEN + 1) chars = lfc.lfc_readlink(self.__fullLfn(link), buff, lfc.CA_MAXPATHLEN) if chars > 0: error = 0 chars = buff[:chars].replace(self.prefix, "", 1).replace("\x00", "") else: error = 1 return returnCode(error, chars) def __makeLink(self, source, target): return returnCode(lfc.lfc_symlink(self.__fullLfn(target), self.__fullLfn(source))) def __getLFNGuid(self, lfn): """Get the GUID for the given lfn""" fstat = lfc.lfc_filestatg() return returnCode(lfc.lfc_statg(self.__fullLfn(lfn), "", fstat), fstat.guid) def __createDataset(self, datasetName, lfns): res = self.__makeDirs(datasetName) if not res["OK"]: return res links = {} for lfn in lfns: res = self.__getLFNGuid(lfn) if not res["OK"]: return res else: link = "%s/%s" % (datasetName, res["Value"]) links[link] = lfn res = self.createLink(links) if len(res["Value"]["Successful"]) == len(links): return S_OK() totalError = "" for link, error in res["Value"]["Failed"].items(): gLogger.error("LcgFileCatalogClient.__createDataset: Failed to create link", "for %s: %s" % (link, error)) totalError = "%s\n %s : %s" % (totalError, link, error) return S_ERROR(totalError) def __removeDataset(self, datasetName): res = self.__getDirectoryContents(datasetName) if not res["OK"]: return res links = list(res["Value"]["Files"]) res = self.removeLink(links) if not res["OK"]: return res elif len(res["Value"]["Failed"]): return S_ERROR("Failed to remove all links") else: res = self.__executeOperation(datasetName, "removeDirectory") return res def __removeFilesFromDataset(self, datasetName, lfns): links = [] for lfn in lfns: res = self.__getLFNGuid(lfn) if not res["OK"]: return res guid = res["Value"] linkPath = "%s/%s" % (datasetName, guid) links.append(linkPath) res = self.removeLink(links) if not res["OK"]: return res if len(res["Value"]["Successful"]) == len(links): return S_OK() totalError = "" for link, error in res["Value"]["Failed"].items(): gLogger.error( "LcgFileCatalogClient.__removeFilesFromDataset: Failed to remove link", "%s: %s" % (link, error) ) totalError = "%s %s : %s" % (totalError, link, error) return S_ERROR(totalError) #################################################################### # # These are the methods required for the admin interface # def getUserDirectory(self, usernames): """Takes a list of users and determines whether their directories already exist""" result = getClientCertInfo() if not result["OK"]: return result vo = result["Value"]["VO"] usernameDict = {} for username in usernames: userDirectory = "/%s/user/%s/%s" % (vo, username[0], username) usernameDict[userDirectory] = username res = self.exists(list(usernameDict)) if not res["OK"]: return res failed = {} for directory, reason in res["Value"]["Failed"].items(): failed[usernameDict[directory]] = reason successful = {} for directory, exists in res["Value"]["Successful"].items(): successful[usernameDict[directory]] = exists resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) def createUserDirectory(self, usernames): """Creates the user directory""" result = getClientCertInfo() if not result["OK"]: return result vo = result["Value"]["VO"] successful = {} failed = {} created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") for username in usernames: userDirectory = "/%s/user/%s/%s" % (vo, username[0], username) res = self.__makeDirs(userDirectory, 0o755) if res["OK"]: successful[username] = userDirectory else: failed[username] = res["Message"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) def removeUserDirectory(self, username): """Remove the user directory and remove the user mapping""" created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") res = self.getUserDirectory(username) if not res["OK"]: return res failed = {} for username, error in res["Value"]["Failed"].items(): failed[username] = error directoriesToRemove = {} successful = {} for username, directory in res["Value"]["Successful"].items(): if not directory: successful[username] = True else: directoriesToRemove[directory] = username res = self.removeDirectory(list(directoriesToRemove)) if not res["OK"]: return res for directory, error in res["Value"]["Failed"].items(): failed[directoriesToRemove[directory]] = error for directory in res["Value"]["Successful"]: successful[directoriesToRemove[directory]] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def changePathOwner(self, paths): """Change the ownership of the directory to the user associated to the supplied DN""" successful = {} failed = {} created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") for dirPath, dn in paths.items(): res = getDNUserID(dn) if not res["OK"]: failed[dirPath] = res["Message"] else: userID = res["Value"] res = self.__changeOwner(dirPath, userID) if not res["OK"]: failed[dirPath] = res["Message"] else: successful[dirPath] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) @checkCatalogArguments def changePathMode(self, paths): """Change the ownership of the directory to the user associated to the supplied DN""" successful = {} failed = {} created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") for dirPath, mode in paths.items(): res = self.__changeMode(dirPath, mode) if not res["OK"]: failed[dirPath] = res["Message"] else: successful[dirPath] = True if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) def createUserMapping(self, userDNs): """Create a user with the supplied DN and return the userID""" successful = {} failed = {} created = self.__openSession() if created < 0: return S_ERROR("Error opening LFC session") for userDN, uid in userDNs.items(): if not uid: uid = -1 res = addUserDN(uid, userDN) if not res["OK"]: failed[userDN] = res["Message"] else: res = getDNUserID(userDN) if not res["OK"]: failed[userDN] = res["Message"] else: successful[userDN] = res["Value"] if created: self.__closeSession() resDict = {"Failed": failed, "Successful": successful} return S_OK(resDict) #################################################################### # # These are the internal methods used for the admin interface # def __changeOwner(self, lfn, userID): return returnCode(lfc.lfc_chown(self.__fullLfn(lfn), userID, -1)) def __changeMod(self, lfn, mode): return returnCode(lfc.lfc_chmod(self.__fullLfn(lfn), mode)) def __fullLfn(self, lfn): return str(self.prefix + lfn) # THIS IS NOT YET WORKING