#! /usr/bin/env ccp4-python # # Copyright (C) 2005 Ronan Keegan # # This code is distributed under the terms and conditions of the # CCP4 Program Suite Licence Agreement as a CCP4 Application. # A copy of the CCP4 licence can be obtained by writing to the # CCP4 Secretary, Daresbury Laboratory, Warrington WA4 4AD, UK. # # Mrbump script for submitting MR/refinement robs to cluster queue systems # Ronan Keegan - 2/02/07 # import os, sys, string import re import subprocess from subprocess import PIPE from cluster_run import ClusterJob class Cluster_submit: """ A class to submit a job to a cluster batch queue. 'mrbump_keywords' dictionary of keywords from command line (MRBUMP_keywords.py) """ def __init__(self, mrbump_keywords): self.mrbump_keywords=mrbump_keywords self.q_type="" self.qsub_command="" self.qsub_options="" self.commandline_tup=() self.command_line="" self.job_script="" self.job_name="" self.job_directory="" self.job_number=0 self.logfile="" try: self.debug=eval(os.environ['MRBUMP_DEBUG']) except: self.debug=False self.local_debug = False self.input_dict={} def setCommandLine(self, line): self.command_line=line def setCommandLineTup(self, cl_tup): self.commandline_tup=cl_tup def setJobDirectory(self, directory): self.job_directory=directory def setInputDictionary(self, input_dict): self.input_dict=input_dict def submit(self, qtype, job_name, job_script_name, logfile_name, qsub_command="", qsub_options=""): """ General submission setup. """ # A few sanity checks if self.job_directory == "": sys.stdout.write("Cluster sub log: Setup error - job directory not set\n") return if os.path.isdir(self.job_directory) == False: sys.stdout.write("Cluster sub log: Setup error - job directory does not exist\n") return if self.command_line == "": sys.stdout.write("Cluster sub log: Setup error - command line not set\n") return # Set the job script name self.job_script = os.path.join(self.job_directory, job_script_name) # Set the job name self.job_name = job_name # Set the job name self.logfile = os.path.join(self.job_directory, logfile_name) # Set the queue options self.qsub_options=qsub_options # Are we using an SGE queue if qtype == "SGE": # Set the queue type to be SGE self.q_type="SGE" if qsub_command=="": self.qsub_command="qsub" else: self.qsub_command=qsub_command # Submit to a Sun Grid Engine queue self.submit_SGE() # Or are we using a PBS queue? elif qtype == "PBS": # Set the queue type to PBS self.q_type="PBS" if qsub_command=="": self.qsub_command="qsub" else: self.qsub_command=qsub_command # Submit to PBS self.submit_PBS() # Or are we using a CONDOR queue? elif qtype == "CONDOR": # Set the queue type to CONDOR self.q_type="CONDOR" if qsub_command=="": self.qsub_command="condor_submit" else: self.qsub_command=qsub_command # Usually for cluster jobs with a shared filesystem, you submit a job which runs # "python cluster_run.py" which instantiates "ClusterJob" class, which then # runs the mr and refinement programs. But with a non-shared filesystem, there is no # python interpreter so we instantiate "ClusterJob" directly, and let the "ClusterJob" class do the job # submission itself # Setup a cluster job cljob=ClusterJob() # Tell ClusterJob that we don't have a shared filesystem, so it can submit to the queue itself cljob.setSharedFilesystem(False) # So that all the submission code stays right here, pass submit function to ClusterJob cljob.setSubmitFunction(submit_CONDOR) # Any extra keywords from MrBump command line cljob.setMrBumpKeywords(self.mrbump_keywords) # Set host filesystem to Windows cljob.setHostOS("nt") # Parse the input dictionary directly, rather than the input file cljob.parse_input_dict(self.input_dict) # Launch the clusterised job RBODREF, molrep_keyfile, refineRB_keyfile, refine_keyfile, mr_program, refine_program = self.commandline_tup if RBODREF == True: self.job_number=cljob.run(mr_program, refine_program, molrep_keyfile, refineRB_keyfile=refineRB_keyfile, refine_keyfile=refine_keyfile) else: self.job_number=cljob.run(mr_program, refine_program, molrep_keyfile, refine_keyfile=refine_keyfile) def submit_PBS(self): """ Submit a job to a PBS batch queue. """ # Create the submission script for a PBS job line = "#!/bin/sh\n" line += "#PBS -j oe\n" line += "#PBS -V\n" line += "#PBS -o " + self.logfile + "\n" line += "#PBS -N " + self.job_name + "\n" line += "\n" line += "# cd to the directory where the jobs are to be run\n" line += "\n" line += "cd " + self.job_directory + "\n" line += "\n" line += "# Below are the programs to be run in this script:\n" line += "\n" line += self.command_line + "\n" # Write the submission script script=open(self.job_script, "w") script.write(line) script.close() # Submit the job to the PBS queue o=os.popen(self.qsub_command + " " + self.job_script) line=o.readline() while line: if self.debug: sys.stdout.write(line + "\n") if line[0].isdigit(): self.job_number=int(string.split(line,".")[0]) line=o.readline() status=o.close() if status == None: sys.stdout.write("MR log: Job %s submitted to the PBS batch queue successfully\n" % self.job_name) sys.stdout.write("\n") else: sys.stdout.write("MR log: WARNING - There was a problem submitting job %s to the PBS batch queue\n" % self.job_name) sys.stdout.write("\n") def submit_SGE(self): """ Submit a job to an SGE batch queue. """ # Create the submission script for an SGE job - first set the template line = "#!/bin/sh\n" line += "#$ -j y\n" line += "#$ -S /bin/sh\n" line += "#$ -V\n" if self.qsub_options != "": line += "#$ %s\n" % self.qsub_options line += "#$ -o " + self.logfile + "\n" line += "#$ -N " + self.job_name + "\n" line += "\n" line += "#Re-set CCP4_SCR to the SGE TMPDIR\n" line += "export CCP4_SCR=$TMPDIR\n" line += "\n" line += "# cd to the directory where the jobs are to be run\n" line += "\n" line += "cd " + self.job_directory + "\n" line += "\n" line += "# Below are the programs to be run in this script:\n" line += "\n" line += self.command_line + "\n" # Write the submission script script=open(self.job_script, "w") script.write(line) script.close() # Submit the job to the SGE queue o=os.popen(self.qsub_command + " " + self.job_script) line=o.readline() while line: if self.debug: print line if "submitted" in line: self.job_number=int(string.split(line)[2]) line=o.readline() status=o.close() if status == None: sys.stdout.write("MR log: Job %s submitted to the SGE batch queue successfully\n" % self.job_name) sys.stdout.write("\n") else: sys.stdout.write("MR log: WARNING - There was a problem submitting job %s to the SGE batch queue\n" % self.job_name) sys.stdout.write("\n") def submit_CONDOR(job_directory, input_file_list, output_file_list, command_line_dict, keywords_dict, logfile_dict, runType_list, mrbump_keywords, local_debug): """ Submit a job to a CONDOR batch queue. This is a generic function designed to run a set of executables under one CONDOR job job_directory: where the job will be set up input_file_list: list of files to send out with the CONDOR job output_file_list: list of files to get back from the CONDOR job (not used) keywords_dict: dict of keyworks for each command key is runType, e.g. MOLREP, REFMAC logfile_dict: dict of names of log files for each command runType_list: list of keys for two previous parameters mrbump_keywords: MrBump keywords from command line local_debug: turns on additional debug printout NB: This function is deliberately not a part of this class, so it can be passed from one function to another. """ # Go to job directory os.chdir(job_directory) bat_file="mrbump.bat" job_script="mrbump.sub" # Write the keyword files keyword_file_dict={} for runType, keywords in keywords_dict.iteritems(): keyword_file="keywords_"+runType+".tmp" keyword_file_dict[runType]=keyword_file keyword_fp=open(keyword_file, "w") keyword_fp.write(keywords) keyword_fp.close() if local_debug: sys.stdout.write("CONDOR: Keyword file for "+runType+":\n") sys.stdout.write(keywords) sys.stdout.write("\n") # Create the submission script for a CONDOR job line = """universe = vanilla executable = %s transfer_executable = True transfer_input_files = """%bat_file # Input files = 'input_file_list' param + keyword_files for input_file in input_file_list+keyword_file_dict.values(): line += " "+input_file+"," line=line.rstrip(",") line += """ should_transfer_files = YES when_to_transfer_output = ON_EXIT # Blank Line """ if mrbump_keywords!=None: if mrbump_keywords.CRKEYWORD!=[]: for keyword in mrbump_keywords.CRKEYWORD: line += keyword+"\n" line += """rank = %s requirements = %s output = output.txt error = error.txt log = log.txt # Blank Line #transfer_files = ALWAYS # Blank Line queue"""%(mrbump_keywords.CRRANK,mrbump_keywords.CRREQUIRES) # Write the submission script script=open(job_script, "w") script.write(line) if local_debug: sys.stdout.write("CONDOR job submission script:\n") sys.stdout.write(line) sys.stdout.write("\n") script.close() # Write the batch file - DOS format batch=open(bat_file, "w") batch.write('set CLIB=%CD%\r\n') batch.write('set CLIBD=%CD%\\data\r\n') batch.write('set CLIBD_MON=%CD%\\data\\monomers\\\r\n') batch.write('set CINCL=%CD%\r\n') batch.write('set CCP4=%CD%\r\n') batch.write('set CCP4_SCR=%CD%\r\n') batch.write('set SYMINFO=%CD%\\data\\syminfo.lib\r\n') if local_debug: batch.write('set\r\n') batch.write('echo "START"\r\n') batch.write('time /T\r\n') batch.write('dir\r\n') batch.write('echo "UNZIP"\r\n') batch.write('unzip2.vbs\r\n') if local_debug: batch.write('echo "FINISHED UNZIP"\r\n') batch.write('time /T\r\n') batch.write('dir\r\n') batch.write('echo "monomers:"\r\n') batch.write('dir %CLIBD_MON%\r\n') batch.write('echo "data:"\r\n') batch.write('dir %CLIBD%\r\n') batch.write('echo "BEFORE MOLREP"\r\n') batch.write('time /T\r\n') for runType in runType_list: if local_debug: batch.write('echo "BEFORE %s"\r\ndir\r\n'%runType) batch.write("time /T\r\n") batch.write("%s < %s > %s\r\n"%(command_line_dict.get(runType,""), keyword_file_dict.get(runType,""), logfile_dict.get(runType,""))) if local_debug: batch.write('echo "AFTER %s"\r\ndir\r\n'%runType) batch.write("time /T\r\n") if local_debug: batch.write('echo "END"\r\n') batch.close() # Substitute Condor's submit command qsub_command = 'condor_submit '+job_script # Submit the job to the CONDOR queue pipe=subprocess.Popen(qsub_command, shell=True, stdout=PIPE).stdout outlines=pipe.readlines() pipe.close() job_number=None if local_debug: print "\n\nCONDOR submission output:" for outline in outlines: if local_debug: print outline jobnumber_reobj = re.search("1 job\(s\) submitted to cluster (\d+)\.", outline) if jobnumber_reobj != None: job_number=jobnumber_reobj.group(1) if job_number != None: sys.stdout.write("CONDOR log: Job %s submitted to the CONDOR batch queue successfully\n" % job_number) sys.stdout.write("\n") else: sys.stdout.write("CONDOR log: WARNING - There was a problem submitting job to the CONDOR batch queue\n") sys.stdout.write("\n") return job_number