########################################################################
# File :   HTCondorCEComputingElement.py
# Author : A.S.
########################################################################

"""HTCondorCE Computing Element

Allows direct submission to HTCondorCE Computing Elements with a SiteDirector Agent

**Configuration Parameters**

Configuration for the HTCondorCE submission can be done via the configuration system. See the page about
configuring :ref:`resourcesComputing` for where the options can be placed.

WorkingDirectory:
   Location to store the pilot and condor log files locally. It should exist on the server and be accessible (both
   readable and writeable).  Also temporary files like condor submit files are kept here. This option is only read
   from the global Resources/Computing/HTCondorCE location.

DaysToKeepRemoteLogs:
   How long to keep the log files on the remote schedd until they are removed

DaysToKeepLogs:
   How long to keep the log files locally until they are removed

ExtraSubmitString:
   Additional options for the condor submit file, separate options with '\\n', for example::

     request_cpus = 8 \\n periodic_remove = ...

   CERN proposes additional features to the standard HTCondor implementation. Among these features, one can find
   an option to limit the allocation runtime (`+MaxRuntime`), that does not exist in the standard HTCondor version:
   no explicit way to define a runtime limit (`maxCPUTime` would act as the limit). On CERN-HTCondor CEs, one can use
   CERN-specific features via the `ExtraSubmitString` configuration parameter.

UseLocalSchedd:
   If False, directly submit to a remote condor schedule daemon,
   then one does not need to run condor daemons on the submit machine.
   If True requires the condor grid middleware (condor_submit, condor_history, condor_q, condor_rm)

**Proxy renewal or lifetime**

When not using a local condor_schedd, add ``delegate_job_GSI_credentials_lifetime = 0`` to the ``ExtraSubmitString``.

When using a local condor_schedd look at the HTCondor documenation for enabling the proxy refresh.

**Code Documentation**
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# Note: if you read this documentation in the source code and not via the sphinx
# created documentation, there should only be one slash when setting the option,
# but "\n" gets rendered as a linebreak in sphinx

import six
import os
import tempfile
# TODO: This should be modernised to use subprocess(32)
try:
  import commands
except ImportError:
  # Python 3's subprocess module contains a compatibility layer
  import subprocess as commands
import datetime
import errno
import threading

from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.Core.Utilities.Grid import executeGridCommand
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.Core.Utilities.List import breakListIntoChunks

# BEWARE: this import makes it impossible to instantiate this CE client side
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
from DIRAC.WorkloadManagementSystem.Client import PilotStatus

from DIRAC.Core.Utilities.File import makeGuid
from DIRAC.Core.Utilities.Subprocess import Subprocess

from DIRAC.Resources.Computing.BatchSystems.Condor import parseCondorStatus, treatCondorHistory

__RCSID__ = "fadc0c734 (2021-08-05 09:30:04 +0200) aldbr <alex.811@hotmail.fr>"

CE_NAME = 'HTCondorCE'
MANDATORY_PARAMETERS = ['Queue']
DEFAULT_WORKINGDIRECTORY = '/opt/dirac/pro/runit/WorkloadManagement/SiteDirectorHT'
DEFAULT_DAYSTOKEEPREMOTELOGS = 1
DEFAULT_DAYSTOKEEPLOGS = 15


def logDir(ceName, stamp):
  """ Return path to log and output files for pilot.

  :param str ceName: Name of the CE
  :param str stamp: pilot stamp from/for jobRef
  """
  return os.path.join(ceName, stamp[0], stamp[1:3])


def condorIDAndPathToResultFromJobRef(jobRef):
  """ Extract tuple of jobURL and jobID from the jobRef string.
  The condorID as well as the path leading to the job results are also extracted from the jobID.

  :param str jobRef: PilotJobReference of the following form: ``htcondorce://<ceName>/<condorID>:::<pilotStamp>``

  :return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef
  """
  splits = jobRef.split(":::")
  jobURL = splits[0]
  stamp = splits[1] if len(splits) > 1 else ''
  _, _, ceName, condorID = jobURL.split("/")

  # Reconstruct the path leading to the result (log, output)
  # Construction of the path can be found in submitJob()
  pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else ''

  return jobURL, pathToResult, condorID


def findFile(workingDir, fileName, pathToResult=None):
  """ Find a file in a file system.

  :param str workingDir: the name of the directory containing the given file to search for
  :param str fileName: the name of the file to find
  :param str pathToResult: the path to follow from workingDir to find the file

  :return: list of paths leading to the file
  """

  # In the case pathToResult is defined, we just have to check the path exists
  if pathToResult:
    path = os.path.join(workingDir, pathToResult, fileName)
    if os.path.exists(path):
      # We put the path in a list to be consistent
      return S_OK([path])

  # In the case pathToResult is not defined or not correct
  # We have to search for the file in workingDir and can get multiple results
  res = Subprocess().systemCall("find %s -name '%s'" % (workingDir, fileName), shell=True)
  if not res['OK']:
    return res
  paths = res['Value'][1].splitlines()
  if not paths:
    return S_ERROR(errno.ENOENT, "Could not find %s in directory %s" % (fileName, workingDir))
  return S_OK(paths)


def getCondorLogFile(pilotRef):
  """ Return the location of the logFile belonging to the pilot reference.
  """
  _jobUrl, pathToResult, condorID = condorIDAndPathToResultFromJobRef(pilotRef)
  # FIXME: This gets called from the WMSAdministrator, so we don't have the same
  # working directory as for the SiteDirector unless we force it, there is also
  # no CE instantiated when this function is called so we can only pick this option up from one place
  workingDirectory = gConfig.getValue("Resources/Computing/HTCondorCE/WorkingDirectory",
                                      DEFAULT_WORKINGDIRECTORY)

  resLog = findFile(workingDirectory, '%s.log' % condorID, pathToResult)
  return resLog


class HTCondorCEComputingElement(ComputingElement):
  """ HTCondorCE computing element class
      implementing the functions jobSubmit, getJobOutput
  """

  # static variables to ensure single cleanup every minute
  _lastCleanupTime = datetime.datetime.utcnow()
  _cleanupLock = threading.Lock()

  #############################################################################
  def __init__(self, ceUniqueID):
    """ Standard constructor.
    """
    super(HTCondorCEComputingElement, self).__init__(ceUniqueID)

    self.ceType = CE_NAME
    self.submittedJobs = 0
    self.mandatoryParameters = MANDATORY_PARAMETERS
    self.pilotProxy = ''
    self.queue = ''
    self.outputURL = 'gsiftp://localhost'
    self.gridEnv = ''
    self.proxyRenewal = 0
    self.daysToKeepLogs = DEFAULT_DAYSTOKEEPLOGS
    self.daysToKeepRemoteLogs = DEFAULT_DAYSTOKEEPREMOTELOGS
    self.extraSubmitString = ''
    # see note on getCondorLogFile, why we can only use the global setting
    self.workingDirectory = gConfig.getValue("Resources/Computing/HTCondorCE/WorkingDirectory",
                                             DEFAULT_WORKINGDIRECTORY)
    self.useLocalSchedd = True
    self.remoteScheddOptions = ""

  #############################################################################
  def __writeSub(self, executable, nJobs, location, processors):
    """ Create the Sub File for submission.

    :param str executable: name of the script to execute
    :param int nJobs: number of desired jobs
    :param str location: directory that should contain the result of the jobs
    :param int processors: number of CPU cores to allocate
    """

    self.log.debug("Working directory: %s " % self.workingDirectory)
    mkDir(os.path.join(self.workingDirectory, location))

    self.log.debug("InitialDir: %s" % os.path.join(self.workingDirectory, location))

    self.log.debug("ExtraSubmitString:\n### \n %s \n###" % self.extraSubmitString)

    fd, name = tempfile.mkstemp(suffix='.sub', prefix='HTCondorCE_', dir=self.workingDirectory)
    subFile = os.fdopen(fd, 'w')

    executable = os.path.join(self.workingDirectory, executable)

    # This is used to remove outputs from the remote schedd
    # Used in case a local schedd is not used
    periodicRemove = "periodic_remove = "
    periodicRemove += "(JobStatus == 4) && "
    periodicRemove += "(time() - EnteredCurrentStatus) > (%s * 24 * 3600)" % self.daysToKeepRemoteLogs

    localScheddOptions = """
ShouldTransferFiles = YES
WhenToTransferOutput = ON_EXIT_OR_EVICT
""" if self.useLocalSchedd else periodicRemove

    targetUniverse = "grid" if self.useLocalSchedd else "vanilla"

    sub = """
executable = %(executable)s
universe = %(targetUniverse)s
use_x509userproxy = true
output = $(Cluster).$(Process).out
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
environment = "HTCONDOR_JOBID=$(Cluster).$(Process)"
initialdir = %(initialDir)s
grid_resource = condor %(ceName)s %(ceName)s:9619
transfer_output_files = ""
request_cpus = %(processors)s
%(localScheddOptions)s

kill_sig=SIGTERM

%(extraString)s

Queue %(nJobs)s

""" % dict(executable=executable,
           nJobs=nJobs,
           processors=processors,
           ceName=self.ceName,
           extraString=self.extraSubmitString,
           initialDir=os.path.join(self.workingDirectory, location),
           localScheddOptions=localScheddOptions,
           targetUniverse=targetUniverse,
           )
    subFile.write(sub)
    subFile.close()
    return name

  def _reset(self):
    self.queue = self.ceParameters['Queue']
    self.outputURL = self.ceParameters.get('OutputURL', 'gsiftp://localhost')
    self.gridEnv = self.ceParameters.get('GridEnv')
    self.daysToKeepLogs = self.ceParameters.get('DaysToKeepLogs', DEFAULT_DAYSTOKEEPLOGS)
    self.extraSubmitString = str(self.ceParameters.get('ExtraSubmitString', '').encode().decode('unicode_escape'))
    self.daysToKeepRemoteLogs = self.ceParameters.get('DaysToKeepRemoteLogs', DEFAULT_DAYSTOKEEPREMOTELOGS)
    self.useLocalSchedd = self.ceParameters.get('UseLocalSchedd', self.useLocalSchedd)
    if isinstance(self.useLocalSchedd, six.string_types):
      if self.useLocalSchedd == "False":
        self.useLocalSchedd = False

    self.remoteScheddOptions = "" if self.useLocalSchedd else "-pool %s:9619 -name %s " % (self.ceName, self.ceName)

    self.log.debug("Using local schedd: %r " % self.useLocalSchedd)
    self.log.debug("Remote scheduler option: '%s' " % self.remoteScheddOptions)
    return S_OK()

  #############################################################################
  def submitJob(self, executableFile, proxy, numberOfJobs=1):
    """ Method to submit job
    """

    self.log.verbose("Executable file path: %s" % executableFile)
    if not os.access(executableFile, 5):
      os.chmod(executableFile, 0o755)

    # The submitted pilots are going to have a common part of the stamp to construct a path to retrieve results
    # Then they also have an individual part to make them unique
    jobStamps = []
    commonJobStampPart = makeGuid()[:3]
    for _i in range(numberOfJobs):
      jobStamp = commonJobStampPart + makeGuid()[:5]
      jobStamps.append(jobStamp)

    # We randomize the location of the pilot output and log, because there are just too many of them
    location = logDir(self.ceName, commonJobStampPart)
    nProcessors = self.ceParameters.get('NumberOfProcessors', 1)
    subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors)

    cmd = ['condor_submit', '-terse', subName]
    # the options for submit to remote are different than the other remoteScheddOptions
    scheddOptions = [] if self.useLocalSchedd else ['-pool', '%s:9619' % self.ceName, '-remote', self.ceName]
    for op in scheddOptions:
      cmd.insert(-1, op)

    result = executeGridCommand(self.proxy, cmd, self.gridEnv)
    self.log.verbose(result)
    os.unlink(subName)
    if not result['OK']:
      self.log.error("Failed to submit jobs to htcondor", result['Message'])
      return result

    if result['Value'][0]:
      # We have got a non-zero status code
      errorString = result['Value'][2] if result['Value'][2] else result['Value'][1]
      return S_ERROR('Pilot submission failed with error: %s ' % errorString.strip())

    pilotJobReferences = self.__getPilotReferences(result['Value'][1].strip())
    if not pilotJobReferences['OK']:
      return pilotJobReferences
    pilotJobReferences = pilotJobReferences['Value']

    self.log.verbose("JobStamps: %s " % jobStamps)
    self.log.verbose("pilotRefs: %s " % pilotJobReferences)

    result = S_OK(pilotJobReferences)
    result['PilotStampDict'] = dict(zip(pilotJobReferences, jobStamps))
    self.log.verbose("Result for submission: %s " % result)
    return result

  def killJob(self, jobIDList):
    """ Kill the specified jobs
    """
    if not jobIDList:
      return S_OK()
    if isinstance(jobIDList, six.string_types):
      jobIDList = [jobIDList]

    self.log.verbose("KillJob jobIDList: %s" % jobIDList)

    for jobRef in jobIDList:
      job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
      self.log.verbose("Killing pilot %s " % job)
      status, stdout = commands.getstatusoutput('condor_rm %s %s' % (self.remoteScheddOptions, jobID))
      if status != 0:
        return S_ERROR("Failed to kill pilot %s: %s" % (job, stdout))

    return S_OK()

#############################################################################
  def getCEStatus(self):
    """ Method to return information on running and pending jobs.
    """
    result = S_OK()
    result['SubmittedJobs'] = 0
    result['RunningJobs'] = 0
    result['WaitingJobs'] = 0

    # getWaitingPilots
    condDict = {'DestinationSite': self.ceName,
                'Status': PilotStatus.PILOT_WAITING_STATES}
    res = PilotAgentsDB().countPilots(condDict)
    if res['OK']:
      result['WaitingJobs'] = int(res['Value'])
    else:
      self.log.warn("Failure getting pilot count for %s: %s " % (self.ceName, res['Message']))

    # getRunningPilots
    condDict = {'DestinationSite': self.ceName,
                'Status': 'Running'}
    res = PilotAgentsDB().countPilots(condDict)
    if res['OK']:
      result['RunningJobs'] = int(res['Value'])
    else:
      self.log.warn("Failure getting pilot count for %s: %s " % (self.ceName, res['Message']))

    return result

  def getJobStatus(self, jobIDList):
    """ Get the status information for the given list of jobs
    """
    self.__cleanup()

    self.log.verbose("Job ID List for status: %s " % jobIDList)
    if isinstance(jobIDList, six.string_types):
      jobIDList = [jobIDList]

    resultDict = {}
    condorIDs = {}
    # Get all condorIDs so we can just call condor_q and condor_history once
    for jobRef in jobIDList:
      job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
      condorIDs[job] = jobID

    qList = []
    for _condorIDs in breakListIntoChunks(condorIDs.values(), 100):

      # This will return a list of 1245.75 3
      status, stdout_q = commands.getstatusoutput('condor_q %s %s -af:j JobStatus ' % (self.remoteScheddOptions,
                                                                                       ' '.join(_condorIDs)))
      if status != 0:
        return S_ERROR(stdout_q)
      _qList = stdout_q.strip().split('\n')
      qList.extend(_qList)

      # FIXME: condor_history does only support j for autoformat from 8.5.3,
      # format adds whitespace for each field This will return a list of 1245 75 3
      # needs to cocatenate the first two with a dot
      condorHistCall = 'condor_history %s %s -af ClusterId ProcId JobStatus' % (self.remoteScheddOptions,
                                                                                ' '.join(_condorIDs))

      treatCondorHistory(condorHistCall, qList)

    for job, jobID in condorIDs.items():

      pilotStatus = parseCondorStatus(qList, jobID)
      if pilotStatus == 'HELD':
        # make sure the pilot stays dead and gets taken out of the condor_q
        _rmStat, _rmOut = commands.getstatusoutput('condor_rm %s %s ' % (self.remoteScheddOptions, jobID))
        # self.log.debug( "condor job killed: job %s, stat %s, message %s " % ( jobID, rmStat, rmOut ) )
        pilotStatus = PilotStatus.ABORTED

      resultDict[job] = pilotStatus

    self.log.verbose("Pilot Statuses: %s " % resultDict)
    return S_OK(resultDict)

  def getJobOutput(self, jobID, _localDir=None):
    """ TODO: condor can copy the output automatically back to the
    submission, so we just need to pick it up from the proper folder
    """
    self.log.verbose("Getting job output for jobID: %s " % jobID)
    _job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID)
    # FIXME: the WMSAdministrator does not know about the
    # SiteDirector WorkingDirectory, it might not even run on the
    # same machine
    # workingDirectory = self.ceParameters.get( 'WorkingDirectory', DEFAULT_WORKINGDIRECTORY )

    if not self.useLocalSchedd:
      iwd = None

      # TOREMOVE: once v7r0 will mainly be used, remove the following block that was only useful
      # when path to output was not deterministic
      status, stdout_q = commands.getstatusoutput('condor_q %s %s -af SUBMIT_Iwd' % (self.remoteScheddOptions,
                                                                                     condorID))
      self.log.verbose('condor_q:', stdout_q)
      if status == 0 and self.workingDirectory in stdout_q:
        iwd = stdout_q
        pathToResult = iwd

      # Use the path extracted from the pilotID
      if iwd is None:
        iwd = os.path.join(self.workingDirectory, pathToResult)

      try:
        mkDir(iwd)
      except OSError as e:
        errorMessage = "Failed to create the pilot output directory"
        self.log.exception(errorMessage, iwd)
        return S_ERROR(e.errno, '%s (%s)' % (errorMessage, iwd))

      cmd = ['condor_transfer_data', '-pool', '%s:9619' % self.ceName, '-name', self.ceName, condorID]
      result = executeGridCommand(self.proxy, cmd, self.gridEnv)
      self.log.verbose(result)

      errorMessage = "Failed to get job output from htcondor"
      if not result['OK']:
        self.log.error(errorMessage, result['Message'])
        return result
      # Even if result is OK, the actual exit code of cmd can still be an error
      if result['OK'] and result['Value'][0] != 0:
        varMessage = result['Value'][1].strip()
        self.log.error(errorMessage, varMessage)
        return S_ERROR('%s: %s' % (errorMessage, varMessage))

    output = ''
    error = ''

    resOut = findFile(self.workingDirectory, '%s.out' % condorID, pathToResult)
    if not resOut['OK']:
      self.log.error("Failed to find output file for condor job", jobID)
      return resOut
    outputfilename = resOut['Value'][0]

    resErr = findFile(self.workingDirectory, '%s.err' % condorID, pathToResult)
    if not resErr['OK']:
      self.log.error("Failed to find error file for condor job", jobID)
      return resErr
    errorfilename = resErr['Value'][0]

    try:
      with open(outputfilename) as outputfile:
        output = outputfile.read()
    except IOError as e:
      self.log.error("Failed to open outputfile", str(e))
      return S_ERROR("Failed to get pilot output")
    try:
      with open(errorfilename) as errorfile:
        error = errorfile.read()
    except IOError as e:
      self.log.error("Failed to open errorfile", str(e))
      return S_ERROR("Failed to get pilot error")

    return S_OK((output, error))

  def __getPilotReferences(self, jobString):
    """ Get the references from the condor_submit output.
    Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "

    :param str jobString: the output of condor_submit

    :return: job references such as htcondorce://<CE name>/<path to result>-<clusterID>.<i>
    """
    self.log.verbose("getPilotReferences: %s" % jobString)
    clusterIDs = jobString.split('-')
    if len(clusterIDs) != 2:
      return S_ERROR("Something wrong with the condor_submit output: %s" % jobString)
    clusterIDs = [clu.strip() for clu in clusterIDs]
    self.log.verbose("Cluster IDs parsed: %s " % clusterIDs)
    try:
      clusterID = clusterIDs[0].split('.')[0]
      numJobs = clusterIDs[1].split('.')[1]
    except IndexError:
      return S_ERROR("Something wrong with the condor_submit output: %s" % jobString)
    cePrefix = "htcondorce://%s/" % self.ceName
    jobReferences = ["%s%s.%s" % (cePrefix, clusterID, i) for i in range(int(numJobs) + 1)]
    return S_OK(jobReferences)

  def __cleanup(self):
    """ Clean the working directory of old jobs"""

    # FIXME: again some issue with the working directory...
    # workingDirectory = self.ceParameters.get( 'WorkingDirectory', DEFAULT_WORKINGDIRECTORY )

    if not HTCondorCEComputingElement._cleanupLock.acquire(False):
      return

    now = datetime.datetime.utcnow()
    if (now - HTCondorCEComputingElement._lastCleanupTime).total_seconds() < 60:
      HTCondorCEComputingElement._cleanupLock.release()
      return

    HTCondorCEComputingElement._lastCleanupTime = now

    self.log.debug("Cleaning working directory: %s" % self.workingDirectory)

    # remove all files older than 120 minutes starting with DIRAC_ Condor will
    # push files on submission, but it takes at least a few seconds until this
    # happens so we can't directly unlink after condor_submit
    status, stdout = commands.getstatusoutput('find -O3 %s -maxdepth 1 -mmin +120 -name "DIRAC_*" -delete ' %
                                              self.workingDirectory)
    if status:
      self.log.error("Failure during HTCondorCE __cleanup", stdout)

    # remove all out/err/log files older than "DaysToKeepLogs" days in the working directory
    # not running this for each CE so we do global cleanup
    findPars = dict(workDir=self.workingDirectory, days=self.daysToKeepLogs)
    # remove all out/err/log files older than "DaysToKeepLogs" days
    status, stdout = commands.getstatusoutput(
        r'find %(workDir)s -mtime +%(days)s -type f \( -name "*.out" -o -name "*.err" -o -name "*.log" \) -delete ' %
        findPars)
    if status:
      self.log.error("Failure during HTCondorCE __cleanup", stdout)
    self._cleanupLock.release()