""" Utilities to create replication transformations """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import six from DIRAC.TransformationSystem.Client.Transformation import Transformation from DIRAC import gLogger, S_OK, S_ERROR def createDataTransformation( flavour, targetSE, sourceSE, metaKey, metaValue, extraData=None, extraname="", groupSize=1, plugin="Broadcast", tGroup=None, tBody=None, enable=False, ): """Creates the replication transformation based on the given parameters. :param str flavour: Flavour of replication to create: Replication or Moving :param targetSE: Destination for files :type targetSE: python:list or str :param sourceSE: Origin of files :type sourceSE: python:list or str :param int metaKey: Meta key to identify input files :param int metaValue: Meta value to identify input files :param dict metaData: Additional meta data to use to identify input files :param str extraname: addition to the transformation name, only needed if the same transformation was already created :param int groupSize: number of files per transformation taks :param str plugin: plugin to use :param str tGroup: transformation group to set :param tBody: transformation body to set :param bool enable: if true submit the transformation, otherwise dry run :returns: S_OK (with the transformation object, if successfully added), S_ERROR """ metadata = {metaKey: metaValue} if isinstance(extraData, dict): metadata.update(extraData) gLogger.debug("Using %r for metadata search" % metadata) if isinstance(targetSE, six.string_types): targetSE = [targetSE] if isinstance(sourceSE, (list, tuple)): sourceSE = "%s" % (",".join(sourceSE)) gLogger.debug("Using plugin: %r" % plugin) if flavour not in ("Replication", "Moving"): return S_ERROR("Unsupported flavour %s" % flavour) transVerb = {"Replication": "Replicate", "Moving": "Move"}[flavour] transGroup = {"Replication": "Replication", "Moving": "Moving"}[flavour] if not tGroup else tGroup trans = Transformation() transName = "%s_%s_%s" % (transVerb, str(metaValue), ",".join(targetSE)) if extraname: transName += "_%s" % extraname trans.setTransformationName(transName) description = "%s files for %s %s to %s" % (transVerb, metaKey, str(metaValue), ",".join(targetSE)) trans.setDescription(description[:255]) trans.setLongDescription(description) trans.setType("Replication") trans.setTransformationGroup(transGroup) trans.setGroupSize(groupSize) trans.setPlugin(plugin) transBody = ( { "Moving": [ ("ReplicateAndRegister", {"SourceSE": sourceSE, "TargetSE": targetSE}), ("RemoveReplica", {"TargetSE": sourceSE}), ], "Replication": "", # empty body }[flavour] if tBody is None else tBody ) trans.setBody(transBody) trans.setInputMetaQuery(metadata) if sourceSE: res = trans.setSourceSE(sourceSE) if not res["OK"]: return S_ERROR("SourceSE not valid: %s" % res["Message"]) res = trans.setTargetSE(targetSE) if not res["OK"]: return S_ERROR("TargetSE not valid: %s" % res["Message"]) if not enable: gLogger.always("Dry run, not creating transformation") return S_OK() res = trans.addTransformation() if not res["OK"]: return res gLogger.verbose(res) trans.setStatus("Active") trans.setAgentType("Automatic") gLogger.always("Successfully created replication transformation") return S_OK(trans)