import copy import datetime import getpass import os import subprocess import sys import tarfile import threading import time as timep import traceback import SystemUtility import managerSSH import sge # NOTE: Please remember that you really need to do not use spaces with the redirect commands > and < for SGE and MOAB otherwise managerSSH could confuse redirects # with shell prompt # path_module= os.path.dirname(__file__) # Config = ConfigParser.ConfigParser() # Config.read(os.path.join(path_module,"setup.bor")) # PATH_REMOTE_SGEPY = Config.get("GRID", "path_remote_sgepy") # PATH_REMOTE_PYTHON_INTERPRETER = Config.get("GRID", "python_remote_interpreter") # PATH_LOCAL_PYTHON_INTERPRETER = Config.get("LOCAL", "python_local_interpreter") # if PATH_REMOTE_PYTHON_INTERPRETER.strip() in ["", None]: # PATH_REMOTE_PYTHON_INTERPRETER = "/usr/bin/python" # if PATH_LOCAL_PYTHON_INTERPRETER.strip() in ["", None]: # PATH_LOCAL_PYTHON_INTERPRETER = "/usr/bin/python" PATH_LOCAL_PYTHON_INTERPRETER = "/usr/bin/python" PATH_LOCAL_PYTHON_INTERPRETER = "/usr/bin/python" PATH_REMOTE_SGEPY = "" class Grid(object): """SuperClass of all the Grid Managers """ JOBS = {} # jobid:{timeStart:time,cmd:string,job:Job,gridCluster:nCluster,queue:nQueue} JOBS_DONE = {} remote_library_path = "" remote_mtz_path = "" remote_hkl_path = "" remote_mtzP1_path = "" remote_tncs_path = "" remote_norm_path = "" FILE_TO_COPY = [] JOB_TO_SUBMIT = [] CUMULATIVE_TRANSFERING = 1000 EXCEPTION_TO_CUMULATIVE = [".mtz", ".hkl", ".tncs", ".norm", ".cmd", "r.gz"] FILE_RECEIVED = {} actualDirectory = "" type_grid = "abstract" def __init__(self): self.isExchangeFileOpen = False self.exchangeConn = None self.promptSFTP = "> " self.lo = threading.RLock() # self.lock = ADT.LogLock("Grid.py",threading.Condition(self.lo)) self.lock = threading.Condition(self.lo) def getGridJobsToRemoteMachine(self, username, host, port, passkey, prompta, promptb, isnfs, listOfInitialOperations=[], home_frontend_directory="./"): if isinstance(passkey, dict): self.connection = managerSSH.Connection(host, username=username, password=passkey["password"], port=port) else: print "Trying connecting with passkey", passkey self.connection = managerSSH.Connection(host, username=username, private_key=passkey, port=port) self.promptA = prompta self.promptB = promptb self.username = username self.host = host self.port = port self.passkey = passkey self.listInitial = listOfInitialOperations self.isnfs = isnfs self.home_directory = home_frontend_directory buff = "" if len(listOfInitialOperations) == 0: self.channel = self.connection.interactive() # return "" else: for opera in listOfInitialOperations: if "sublocalssh" == opera[0]: self.channel, buffo = self.connection.open_sublocal_interactive_ssh(opera[1], opera[2], opera[3], opera[4], opera[5]) buff += buffo else: if hasattr(self, "channel"): buff += self.connection.send_command_to_channel(self.channel, opera[0], opera[1]) else: self.channel = self.connection.interactive() buff += self.connection.send_command_to_channel(self.channel, opera[0], opera[1]) buff += "\n" buff += self.open_exchange_file() return buff def open_exchange_file(self): buff = "" if not self.isExchangeFileOpen and self.exchangeConn == None: try: if self.isnfs: self.exchangeConn = self.connection.open_sftp_channel() self.isExchangeFileOpen = True for opera in self.listInitial: comma = opera[0].split() if len(comma) == 2 and comma[0] == "cd": self.change_remote_dir(comma[1]) else: self.exchangeConn = self.connection.open_sftp_channel() self.sftpsubcha = self.connection.interactive() buff += self.connection.send_command_to_channel(self.sftpsubcha, "rm -rf " + os.path.join(self.home_directory, "borges_middle"), self.promptA) self.exchangeConn.mkdir(os.path.join(self.home_directory, "borges_middle")) self.exchangeConn.chdir(os.path.join(self.home_directory, "borges_middle")) buff += self.connection.send_command_to_channel(self.sftpsubcha, "cd " + os.path.join(self.home_directory, "borges_middle"), self.promptA) for opera in self.listInitial: comma = opera[0].split() if "sublocalssh" == opera[0]: bluff = self.connection.send_command_to_channel(self.sftpsubcha, "sftp -oPort=" + str(opera[3]) + " " + str( opera[1]) + "@" + str(opera[2]), '\'s password: ', stopif=self.promptSFTP) if not isinstance(bluff, tuple): buff += bluff buff += self.connection.send_command_to_channel(self.sftpsubcha, opera[4], self.promptSFTP) else: buff += bluff[0] elif len(comma) == 2 and comma[0] == "cd": self.change_remote_dir(comma[1]) self.isExchangeFileOpen = True return buff except: print "Error on opening sftp connection" # print sys.exc_info() # traceback.print_exc() else: return "File Exchanger Protocol is already open!" def close_remote_connection(self): try: self.connection.close() print "Remote Connection Protocol closed!" except: print "Error on closing ssh connection" # print sys.exc_info() # traceback.print_exc(file=sys.stdout) def close_exchange_file(self): buff = "" if self.isExchangeFileOpen and self.exchangeConn != None: try: if self.isnfs: self.exchangeConn.close() self.isExchangeFileOpen = False else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "exit", self.promptA) self.sftpsubcha.close() self.exchangeConn.chdir("..") # print self.exchangeConn.getcwd() self.exchangeConn.rmdir("./borges_middle") self.exchangeConn.close() self.isExchangeFileOpen = False print "File Exchanger Protocol closed!\n" + buff self.exchangeConn = None except: print "Error on closing sftp connection" # print sys.exc_info() # traceback.print_exc(file=sys.stdout) sys.stdout.flush() else: return "It is not opened!" def create_remote_link(self, fromfile, remotefile, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: nomeu = "./" + os.path.basename(remotefile) if self.isnfs: # print "======",self.exchangeConn.getcwd() # print fromfile,nomeu o = self.exchangeConn.symlink(fromfile, nomeu) """ while 1: try: filestat=self.exchangeConn.stat(nomeu) #print "from frontend:", filestat break except: #print "From frontend: Cannot stat "+str(nomeu)+" Trying again..." timep.sleep(3) esegui = True while esegui: out = self.connection.send_command_to_channel(self.channel,'stat '+str(os.path.join(self.get_remote_pwd(),os.path.basename(remotefile))),self.promptB) #print "From submitter\n",out outlines = out.split() for word in outlines: if not word.strip().startswith("stat:"): esegui = False break """ else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "symlink " + fromfile + " " + nomeu, self.promptSFTP) # buff += self.connection.send_command_to_channel(self.channel,"symlink "+fromfile+" "+nomeu,self.promptB) """ esegui = True while esegui: out = self.connection.send_command_to_channel(self.channel,'stat '+str(nomeu),self.promptB) print out outlines = out.splitlines() for linea in outlines: if not linea.strip().startswith("stat:"): esegui = False break """ # print buff except: print "Error on create a remote link" # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def create_remote_dir(self, remotedir, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: if len(remotedir) >= 1 and remotedir[0] != "/" and not remotedir.startswith("./"): remotedir = "./" + remotedir buff += self.remove_remote_dir(remotedir) if self.isnfs: # print "mkdir -p ",remotedir self.exchangeConn.mkdir(remotedir) # buff += self.connection.send_command_to_channel(self.channel,"mkdir -p "+remotedir,self.promptB) else: buff += self.connection.send_command_to_channel(self.channel, "mkdir -p " + remotedir, self.promptB) except: print "Error on create remote directory", remotedir # print sys.exc_info() # traceback.print_exc() # print "********************" # print buff # print "********************" if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def change_remote_dir(self, remotedir, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: if len(remotedir) >= 1 and remotedir[0] != "/" and not remotedir.startswith("./"): remotedir = "./" + remotedir if self.isnfs: # print "cd ",remotedir self.exchangeConn.chdir(remotedir) # timep.sleep(10) # NOTE: we do not use yy. We really need to take it? yy = self.get_remote_listdir() buff += self.connection.send_command_to_channel(self.channel, "cd " + str(os.path.abspath(self.get_remote_pwd())), self.promptB) else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "cd " + remotedir, self.promptSFTP) buff += self.connection.send_command_to_channel(self.channel, "cd " + remotedir, self.promptB) # print buff except: print "Error on change working remote directory", remotedir # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def remove_remote_dir(self, remotedir, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: remotedir = "./" + os.path.basename(os.path.normpath(remotedir)) if self.isnfs: # print "rm -rf ",remotedir # self.exchangeConn.rmdir(remotedir) buff += self.connection.send_command_to_channel(self.channel, "rm -rf " + remotedir, self.promptB) else: buff += self.connection.send_command_to_channel(self.channel, "rm -rf " + remotedir, self.promptB) except: print "Error on remove remote directory maybe it does not exist" # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def create_remote_file(self, remotefile, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: if self.isnfs: a = self.exchangeConn.open(remotefile, mode='w') a.close() else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "!touch " + remotefile, self.promptSFTP) buff += self.connection.send_command_to_channel(self.sftpsubcha, "put " + remotefile, self.promptSFTP) buff += self.connection.send_command_to_channel(self.sftpsubcha, "!rm " + remotefile, self.promptSFTP) # print buff except: print "Error on create remote file" # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def remove_remote_file(self, remotefile, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: if self.isnfs: self.exchangeConn.remove(remotefile) else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "rm -f" + remotefile, self.promptSFTP) except: print "Error on delete remote file", remotefile # print sys.exc_info() # traceback.print_exc(file=sys.stdout) if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def copy_directory(self, localdir, remotedir, stayalive=True): self.lock.acquire() nomeu = "./" + os.path.basename(os.path.normpath(remotedir)) buff = self.create_remote_dir(nomeu) buff += self.change_remote_dir(nomeu) tarro = tarfile.open(os.path.join(localdir, "transfert.tar.gz"), "w:gz") for root, subFolders, files in os.walk(localdir): for fileu in files: tarro.add(os.path.join(root, fileu), arcname="./" + fileu) tarro.close() buff += self.copy_local_file(os.path.join(localdir, "transfert.tar.gz"), "transfert.tar.gz") buff += self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff += self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join(localdir, "transfert.tar.gz")) buff += self.change_remote_dir("..") if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def copy_local_file(self, localfile, remotefile, remote_path_asitis=False, stayalive=True, force_cumulative=False, send_now=False): self.lock.acquire() aa = "" if send_now: aa = self.__real_copy_local_file(localfile, remotefile, stayalive=stayalive, remote_path_asitis=remote_path_asitis) elif os.path.basename(localfile)[-4:] in self.EXCEPTION_TO_CUMULATIVE and not force_cumulative: aa = self.__real_copy_local_file(localfile, remotefile, stayalive=stayalive, remote_path_asitis=remote_path_asitis) else: self.FILE_TO_COPY.append((localfile, remotefile)) aa = "Appending transfert file request for " + localfile + "\n" self.lock.release() return aa def __real_copy_local_file(self, localfile, remotefile, stayalive=True, remote_path_asitis=False): assert self.isExchangeFileOpen buff = "" nowDir = "" try: if remote_path_asitis: nowDir = self.get_remote_pwd() self.change_remote_dir(os.path.dirname(remotefile)) nomeu = "./" + os.path.basename(remotefile) if self.isnfs: # print "...........",self.exchangeConn.getcwd() o = self.exchangeConn.put(localfile, nomeu) while 1: try: filestat = self.exchangeConn.stat(nomeu) # print "from frontend:", filestat break except: # print "From frontend: Cannot stat "+str(nomeu)+" Trying again..." timep.sleep(3) esegui = True while esegui: out = self.connection.send_command_to_channel(self.channel, 'stat ' + str( os.path.join(self.get_remote_pwd(), os.path.basename(remotefile))), self.promptB) # print "From submitter\n",out outlines = out.split() for word in outlines: if not word.strip().startswith("stat:"): # "File:" is not ok for non english SO esegui = False break else: o = self.exchangeConn.put(localfile, nomeu) while 1: try: filestat = self.exchangeConn.stat(nomeu) # print filestat break except: print "Cannot stat " + str(nomeu) + " Trying again..." timep.sleep(3) buff += self.connection.send_command_to_channel(self.sftpsubcha, "put " + nomeu, self.promptSFTP) buff += self.connection.send_command_to_channel(self.sftpsubcha, "!rm " + nomeu, self.promptSFTP) esegui = True while esegui: out = self.connection.send_command_to_channel(self.channel, 'stat ' + str(nomeu), self.promptB) # print out outlines = out.splitlines() for linea in outlines: if not linea.strip().startswith("stat:"): esegui = False break # print buff except: print "Error on putting local file into remote file" # print sys.exc_info() # traceback.print_exc() finally: if not stayalive: buff += self.close_exchange_file() if remote_path_asitis: self.change_remote_dir(nowDir) return buff def chmod_remote_file(self, remotefile, mode, stayalive=True): self.lock.acquire() assert self.isExchangeFileOpen buff = "" try: nomeu = "./" + os.path.basename(remotefile) if self.isnfs: self.exchangeConn.chmod(nomeu, mode) else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "!chmod " + str(mode) + " " + nomeu, self.promptSFTP) # print buff except: print "Error on getting remote file to local file" # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return buff def get_remote_listdir(self, typef="*"): self.lock.acquire() # out = self.connection.send_command_to_channel(self.channel,'ls -1 '+str(typef)+' | sort -k1 -n',self.promptB) out = self.connection.send_command_to_channel(self.channel, 'echo $SHELL', self.promptB) shelltype = os.path.basename((out.splitlines()[-2])) if shelltype.upper() in ["TCSH"]: out = self.connection.send_command_to_channel(self.channel, 'foreach file (.)', "foreach? ") out = self.connection.send_command_to_channel(self.channel, 'ls -1 $file', "foreach? ") out = self.connection.send_command_to_channel(self.channel, 'end', self.promptB) elif shelltype.upper() in ["BASH", "SH"]: out = self.connection.send_command_to_channel(self.channel, 'for each_file in .; do echo "`ls -1 $each_file`"; done;', self.promptB) listus = out.splitlines() listus = listus[1:-1] ab = [] for li in listus: miax = li.strip() if typef == "*" and miax != "" and len(miax.split()) == 1: ab.append(miax) elif miax != "" and len(miax.split()) == 1 and miax.endswith(typef[1:]): ab.append(miax) self.lock.release() # print sorted(ab,self.__cmp_pdb) return sorted(ab, self.__cmp_pdb) def __cmp_pdb(self, a, b): try: ai = a.split(".")[0] bi = b.split(".")[0] if len(ai.split("_")) > 1: ai = int(ai.split("_")[0]) else: ai = int(ai) if len(bi.split("_")) > 1: bi = int(bi.split("_")[0]) else: bi = int(bi) except: ai = a bi = b return cmp(ai, bi) def get_remote_file(self, remotefile, localfile, stayalive=True, tryonetime=False, relaunch=False, command=None, conditioEND=None, testEND=None, only_get_this=False, lenght_ext=4): # print "requiring",remotefile,"into",localfile self.lock.acquire() buff = "" if remotefile in self.FILE_RECEIVED: del self.FILE_RECEIVED[remotefile] self.lock.release() return "File: " + remotefile + " received!" else: lista_files = [] while 1: try: self.actualDirectory = self.get_remote_pwd() lista_files = self.get_remote_listdir(typef="*" + remotefile[-1 * (lenght_ext):]) if len(lista_files) > 0 and remotefile in lista_files: # print "remotefile is",remotefile break else: # print "Nothing is ready yet, waiting 3 seconds..." self.lock.release() return False except: print sys.exc_info() traceback.print_exc() # RECONNECT print "----- TRYING TO RECONNECT ------" SystemUtility.remote_reconnection(self.actualDirectory) atleastone = False for cus in range(len(lista_files)): if cus > 300: break fileu = lista_files[cus] base_fileu = os.path.basename(fileu) justcheck = True if base_fileu == remotefile: justcheck = False elif not only_get_this: justcheck = True else: continue self.lock.release() # print "JUSTCHECK",justcheck,base_fileu,remotefile response = self.__real_get_remote_file(base_fileu, localfile, stayalive=stayalive, tryonetime=tryonetime, relaunch=relaunch, command=command, conditioEND=conditioEND, testEND=testEND, onlycheck=justcheck) self.lock.acquire() if isinstance(response, bool) and response: out = self.connection.send_command_to_channel(self.channel, 'tar -rf ' + str( os.path.join(self.get_remote_pwd(), '../receveid.tar')) + ' .././' + str( os.path.basename(os.path.dirname(localfile))) + "/" + str(base_fileu), self.promptB) self.FILE_RECEIVED[base_fileu] = 0 buff += self.remove_remote_file(fileu) atleastone = True elif isinstance(response, str): # print "IT SHOULD BE A JUSTCHECK FALSE" buff += self.remove_remote_file(fileu) # print "response: ",response # else: # print "FILE: ",fileu,"give ",response if atleastone: out = self.connection.send_command_to_channel(self.channel, 'gzip -9 ../receveid.tar', self.promptB) buff += self.__real_get_remote_file("../receveid.tar.gz", os.path.join(os.path.dirname(localfile), "../receveid.tar.gz"), stayalive=stayalive) # buff += self.__real_get_remote_file("../receveid.tar.gz",os.path.join(os.path.dirname(localfile),"receveid.tar.gz"),stayalive=stayalive) buff += self.remove_remote_file("../receveid.tar.gz") # print "================================================",buff tarro = tarfile.open(os.path.join(os.path.dirname(localfile), "../receveid.tar.gz"), "r:gz") tarro.extractall(path=os.path.join(os.path.dirname(localfile), "../")) tarro.close() os.remove(os.path.join(os.path.dirname(localfile), "../receveid.tar.gz")) """ tarro = tarfile.open(os.path.join(os.path.dirname(localfile),"receveid.tar.gz"), "r:gz") tarro.extractall(path=os.path.dirname(localfile)) tarro.close() os.remove(os.path.join(os.path.dirname(localfile),"receveid.tar.gz")) """ self.lock.release() # print "AAAAAAAAA",buff return buff def __real_get_remote_file(self, remotefile, localfile, stayalive=True, tryonetime=False, relaunch=False, command=None, conditioEND=None, testEND=None, onlycheck=False): assert self.isExchangeFileOpen buff = "" try: if not remotefile.startswith("../"): nomeu = "./" + os.path.basename(remotefile) else: nomeu = remotefile if self.isnfs: while 1: try: self.lock.acquire() filestat = self.exchangeConn.stat(nomeu) self.lock.release() # print "from frontend:", filestat break except: # print "Cannot stat "+str(nomeu)+" Trying again..." self.lock.release() if tryonetime or onlycheck: return False # print "Vado in sleep A" timep.sleep(3) esegui = True times = 0 correct = False while esegui: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, 'stat ' + str( os.path.join(self.get_remote_pwd(), nomeu)), self.promptB) outlines = out.split() self.lock.release() for word in outlines: if not word.strip().startswith("stat:"): esegui = False correct = True break if tryonetime: esegui = False if times == 10 and relaunch and command != None and not onlycheck: # rilancia self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, command, self.promptB) self.lock.release() timep.sleep(3) # print "Vado in sleep B" break times += 1 if onlycheck and not correct: # print "RETURNING FALSE AT STAGE A" return False elif tryonetime and not correct: # print "RETURNING FALSE AT STAGE Q" return False esegui = True if conditioEND != None: conditioEND = conditioEND.replace("*", os.path.join(self.get_remote_pwd(), nomeu[:-4])) correct = False while esegui: if conditioEND != None and testEND != None: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, conditioEND, self.promptB) edere = out.splitlines() # self.lock.notify() self.lock.release() # TEMPORARY # print out # print edere # print "------------",edere[-2].strip(),str(testEND),edere[-2].strip() == str(testEND) # print self.connection.send_command_to_channel(self.channel,"pwd\n",self.promptB) # print edere[-2].strip(),str(testEND) if edere[-2].strip() == str(testEND): esegui = False correct = True else: # print "Test falso non sono uguali vado in sleep" if onlycheck: # print "RETURNING FALSE AT STAGE MMMMMM" return False # print "Vado in sleep C" timep.sleep(3) else: esegui = False correct = True if onlycheck and not correct: # print "RETURNING FALSE AT STAGE B" return False if not onlycheck: self.lock.acquire() self.exchangeConn.get(nomeu, localfile) self.lock.release() else: # print "RETURNING TRUE AT STAGE MIAO" return True else: times = 0 esegui = True correct = False while esegui: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, 'stat ' + str( os.path.join(self.get_remote_pwd(), nomeu)), self.promptB) outlines = out.split() self.lock.release() for word in outlines: if not word.strip().startswith("stat:"): esegui = False correct = True break if tryonetime: esegui = False if times == 10 and relaunch and command != None and not onlycheck: # rilancia self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, command, self.promptB) self.lock.release() # print "Vado in sleep D" timep.sleep(3) break times += 1 if onlycheck and not correct: # print "RETURNING FALSE AT STAGE C" return False elif tryonetime and not correct: return False # print "Superato primo test" esegui = True if conditioEND != None: conditioEND = conditioEND.replace("*", os.path.join(self.get_remote_pwd(), nomeu[:-4])) correct = False while esegui: if conditioEND != None and testEND != None: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, conditioEND, self.promptB) edere = out.splitlines() self.lock.release() # TEMPORARY # print "uscito" # print out # print edere if edere[-2].strip() == str(testEND): esegui = False correct = True else: if onlycheck: # print "RETURNING FALSE AT STAGE OOOOO" # print out return False # print "Vado in sleep E" timep.sleep(3) else: esegui = False correct = True if onlycheck and not correct: # print "RETURNING FALSE AT STAGE D" return False # print "Superato secondo test" if not onlycheck: self.lock.acquire() buff += self.connection.send_command_to_channel(self.sftpsubcha, "get " + nomeu, self.promptSFTP) self.exchangeConn.get(os.path.basename(nomeu), localfile) buff += self.connection.send_command_to_channel(self.sftpsubcha, "!rm " + os.path.basename(nomeu), self.promptSFTP) self.lock.release() else: # print "Ritorno True" return True except: print "Error on getting remote file to local file" # print sys.exc_info() # traceback.print_exc() traceback.print_exc(file=sys.stdout) if not stayalive: self.lock.acquire() buff += self.close_exchange_file() self.lock.release() return buff def get_remote_pwd(self, stayalive=True): self.lock.acquire() # print "Ho il lock" assert self.isExchangeFileOpen # print "Supero l'assert" buff = "" current = "" try: if self.isnfs: current = self.exchangeConn.getcwd() else: buff += self.connection.send_command_to_channel(self.sftpsubcha, "pwd", self.promptSFTP) # print buff lion = buff.split() for pat in lion: if pat[0] == "/": current = pat break except: print "Error on getting remote working directory" # print sys.exc_info() # traceback.print_exc() if not stayalive: buff += self.close_exchange_file() self.lock.release() return current def submitJob(self, job): pass def getStatus(self, jobid, nqueue): pass def getGridQueue(self, cluster="", queue=""): pass def isGridAlive(self): pass def removeJob(self, jobid, nqueue): pass def removeCluster(self, jobid): pass def submitJobs(self, job, nqueue): pass def setRequirements(self, requiString): pass def setRank(self, rankString): pass def getCMD(self, jobid): pass class SLURMManager(Grid): """Manager for SLURM jobs """ def __init__(self, partition=''): super(SLURMManager, self).__init__() self.type_grid = "slurm" self.partition = partition def submitJob(self, job, isthelast=False, forcesubmit=False): return self.submitJobs(job, 1, is_array_job=False) def getStatus(self, jobid, nqueue): if jobid not in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys(): # print "JNP A version" return "JNP" # Job Never Performed elif jobid not in self.JOBS.keys() and jobid in self.JOBS_DONE.keys(): return "NRA" # Not Registered Anymore elif jobid in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys() and len(self.JOBS[jobid]) == 0: # print "JNP B version" return "JNP" # Job Never Performed while True: try: cluster = ((self.JOBS[jobid])[nqueue])["gridCluster"] nq = ((self.JOBS[jobid])[-1])["queue"] break except: pass if nqueue < 0: return "NCV" # Non Corresponding Values dizio = self.getGridQueue() if cluster not in dizio.keys(): ntot = ((self.JOBS[jobid])[nqueue])["qtotal"] ((self.JOBS[jobid])[nqueue]) = {} completed = True for tr in self.JOBS[jobid]: if len(tr.keys()) > 0: completed = False break if completed and len(self.JOBS[jobid]) == ntot: self.JOBS_DONE[jobid] = copy.deepcopy(self.JOBS[jobid]) del self.JOBS[jobid] return "NRA" # Not Registered Anymore return ((self.JOBS[jobid])[nqueue], dizio[cluster]) def getGridQueue(self, cluster="", queue=""): pass """ def getGridQueue(self,cluster="",queue=""): if not hasattr(self,"channel"): #p = subprocess.Popen(['qstat'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) #out, err = p.communicate() #dit = sge.getJobStatsFromUser(getpass.getuser()) liOut = sge.getQueueRawData() else: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel,PATH_REMOTE_SGEPY+' -qstat '+self.qname,self.promptB) self.lock.release() liOut = out.splitlines() #scrivere nel Log l'eventuale err. joc = {} #(cluster):{owner:user,submitted:time,runtime:time,status:stat,priorized:int,size:kb,cmd:command} toStart = False for linea in liOut: if linea.startswith("-----"): toStart = True continue if toStart: dati = (linea.strip()).split() if len(dati) < 7: toStart = False continue clus = int(dati[0]) prior = float(dati[1]) name = dati[2] user = dati[3] status = dati[4] submitted = dati[5]+" "+dati[6] #queue = dati[7] #master = int(dati[7]) joc[clus] = {"owner":user,"submitted":submitted,"status":status,"priorized":prior,"name":name} return joc """ def isGridAlive(self): pass def removeJob(self, jobid, nqueue): pass def removeCluster(self, jobid): pass def submitJobs(self, job, nqueue, is_array_job=True): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJobs, job, nqueue, is_array_job) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, nqueue def __submitJobs(self, job, nqueue, is_array_job): workDir = "" if hasattr(self, "channel"): workDir = self.get_remote_pwd() print "WORKING DIRECTORY:", workDir if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") all_cmds = [] basecmd = """#! /bin/bash # Template for SLURM array job for ARCIMBOLDO_BORGES and ARCIMBOLDO_LITE # Cambiar el valor de wd, el rango es -t 1-n, empieza en 1 # Una vez empezados los parametros de SLURM, no poner lineas en blanco o deja de leerlos. """ text_partition = "" if self.partition != '': text_partition = "#SBATCH --partition=" + self.partition + "\n" if isinstance(job.initdir, str): basecmd += "#SBATCH --job-name=p" + str(job.getName()) + "\n" + text_partition + "#SBATCH --time=05:00:00\n" if is_array_job: basecmd += "#SBATCH --array=0-" + str(nqueue - 1) + "\n" + "#SBATCH --ntasks=1\n\n" elif isinstance(job.initdir, list): basecmd += "#SBATCH --job-name=p" + str( job.getName()) + "\n" + text_partition + "#SBATCH --time=05:00:00\n#SBATCH --ntasks=1\n\n" last_dire = "" glcount = 1 for i in range(nqueue): cmd = basecmd commandline = "" if isinstance(job.initdir, str): if job.initdir != "": commandline = "-D " + job.initdir + " -o /dev/null -e /dev/null" else: commandline = "-o /dev/null -e /dev/null --requeue" if i > 0: break elif isinstance(job.initdir, list): dire = "" nume = 0 summa = 0 for el in job.initdir: dire, nume = el summa += nume if i < summa: break # print "last_dire",last_dire,dire if last_dire != dire: cmd += "#SBATCH --array=" + str(i) + "-" + str(summa - 1) + "\n\n" commandline = "-D " + str(dire) + " -o /dev/null -e /dev/null --requeue" last_dire = dire else: continue # print "actual cmd" # print cmd cmd += "\nsrun " if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd += '' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd += '' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd += '' + job.executable if len(job.args) > 0: data = job.getArgs(True) if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += " " + data if len(job.stdIn) > 0: data = job.getStdIn(True) if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += "<" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") == job.getOutput(True, "output"): data = job.getOutput(True, "output") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(True, "output") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") != job.getOutput(True, "output"): data = job.getOutput(True, "error") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += "2>" + data cmd += '\n' f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd) f.close() all_cmds.append(("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", commandline)) glcount += 1 sent = 0 tosend = nqueue self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus for cmde in all_cmds: cmd, commandline = cmde try: if not hasattr(self, "channel"): nproc = tosend while True: p = subprocess.Popen(['sbatch'] + commandline.split() + [cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) print "sbatch " + commandline + " " + cmd.replace('"', '').strip() out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" entered = False liout = out.splitlines() for lineacorr in liout: try: lir = int(lineacorr.strip().split()[3]) entered = True break except: continue if not entered: print "SLURM not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split()[3]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue,"queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break else: nproc = tosend # print "Request for lock in submit",cmd print "COPYING", cmd, "INTO", os.path.join(workDir, os.path.basename(cmd)) self.copy_local_file(cmd, os.path.join(workDir, os.path.basename(cmd)), send_now=True, remote_path_asitis=True) while True: self.lock.acquire() print "EXECUTING", 'sbatch ' + commandline + " " + os.path.join(workDir, os.path.basename(cmd)) out = self.connection.send_command_to_channel(self.channel, 'sbatch ' + commandline + " " + os.path.join( workDir, os.path.basename(cmd)), self.promptB) self.lock.release() # print "Lock free" # scrivere nel Log l'eventuale err. # print cmd # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" entered = False liout = out.splitlines() for lineacorr in liout: try: lir = int(lineacorr.strip().split()[3]) entered = True break except: continue if not entered: print "SLURM not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split()[3]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue, "queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def setRequirements(self, requiString): pass def setRank(self, rankString): pass def getCMD(self, jobid): try: return (JOBS[jobid])["cmd"] except: raise TypeError('The Job is not registered to the GridManager') class MOABManager(Grid): """Manager for MOAB jobs """ def __init__(self, partition='', fraction=1.0): super(MOABManager, self).__init__() self.type_grid = "moab" self.partition = partition self.fraction = fraction def submitJob(self, job, isthelast=False, forcesubmit=False): return self.submitJobs(job, 1, is_array_job=False) def getStatus(self, jobid, nqueue): if jobid not in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys(): # print "JNP A version" return "JNP" # Job Never Performed elif jobid not in self.JOBS.keys() and jobid in self.JOBS_DONE.keys(): return "NRA" # Not Registered Anymore elif jobid in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys() and len(self.JOBS[jobid]) == 0: # print "JNP B version" return "JNP" # Job Never Performed while True: try: cluster = ((self.JOBS[jobid])[nqueue])["gridCluster"] nq = ((self.JOBS[jobid])[-1])["queue"] break except: pass if nqueue < 0: return "NCV" # Non Corresponding Values dizio = self.getGridQueue() if cluster not in dizio.keys(): ntot = ((self.JOBS[jobid])[nqueue])["qtotal"] ((self.JOBS[jobid])[nqueue]) = {} completed = True for tr in self.JOBS[jobid]: if len(tr.keys()) > 0: completed = False break if completed and len(self.JOBS[jobid]) == ntot: self.JOBS_DONE[jobid] = copy.deepcopy(self.JOBS[jobid]) del self.JOBS[jobid] return "NRA" # Not Registered Anymore return ((self.JOBS[jobid])[nqueue], dizio[cluster]) def getGridQueue(self, cluster="", queue=""): if not hasattr(self, "channel"): # p = subprocess.Popen(['qstat'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # out, err = p.communicate() # dit = sge.getJobStatsFromUser(getpass.getuser()) liOut = sge.getQueueRawData() else: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qstat ' + self.qname, self.promptB) self.lock.release() liOut = out.splitlines() # scrivere nel Log l'eventuale err. joc = {} # (cluster):{owner:user,submitted:time,runtime:time,status:stat,priorized:int,size:kb,cmd:command} toStart = False for linea in liOut: if linea.startswith("-----"): toStart = True continue if toStart: dati = (linea.strip()).split() if len(dati) < 7: toStart = False continue clus = int(dati[0]) prior = float(dati[1]) name = dati[2] user = dati[3] status = dati[4] submitted = dati[5] + " " + dati[6] # queue = dati[7] # master = int(dati[7]) joc[clus] = {"owner": user, "submitted": submitted, "status": status, "priorized": prior, "name": name} return joc def isGridAlive(self): pass def removeJob(self, jobid, nqueue): pass def removeCluster(self, jobid): pass def submitJobs(self, job, nqueue, is_array_job=True): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJobs, job, nqueue, is_array_job) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, nqueue def __submitJobs(self, job, nqueue, is_array_job): workDir = "" if hasattr(self, "channel"): workDir = self.get_remote_pwd() print "WORKING DIRECTORY:", workDir if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") all_cmds = [] basecmd = """ #! /bin/bash # Template for SGE array job for ARCIMBOLDO-BORGES and ARCIMBOLDO # Cambiar el valor de wd, el rango es -t 1-n, empieza en 1 # Una vez empezados los parametros de SGE, no poner lineas en blanco o deja de leerlos. """ if isinstance(job.initdir, str): basecmd += "#SBATCH --job-name=p" + str( job.getName()) + "\n" + "#SBATCH --partition=" + self.partition + "\n#SBATCH --time=05:00:00\n" if is_array_job: basecmd += "#SBATCH --array=0-" + str(nqueue - 1) + "\n" + "#SBATCH --ntasks=1\n\n" elif isinstance(job.initdir, list): basecmd += "#SBATCH --job-name=p" + str( job.getName()) + "\n" + "#SBATCH --partition=" + self.partition + "\n" + "#SBATCH --time=05:00:00\n#SBATCH --ntasks=1\n\n" last_dire = "" glcount = 1 for i in range(nqueue): cmd = basecmd commandline = "" if isinstance(job.initdir, str): if job.initdir != "": commandline = "-d " + job.initdir + " -o /dev/null -e /dev/null -l walltime=1:00:00:00" else: commandline = "-o /dev/null -e /dev/null -r y -l walltime=1:00:00:00" if i > 0: break elif isinstance(job.initdir, list): dire = "" nume = 0 summa = 0 for el in job.initdir: dire, nume = el summa += nume if i < summa: break # print "last_dire",last_dire,dire if last_dire != dire: cmd += "#SBATCH --array=" + str(i) + "-" + str(summa - 1) + "\n\n" commandline = "-d " + str(dire) + " -o /dev/null -e /dev/null -r y -l walltime=1:00:00:00" last_dire = dire else: continue # print "actual cmd" # print cmd if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd += '' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd += '' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd += '' + job.executable if len(job.args) > 0: data = job.getArgs(True) if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += " " + data if len(job.stdIn) > 0: data = job.getStdIn(True) if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += "<" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") == job.getOutput(True, "output"): data = job.getOutput(True, "output") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(True, "output") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") != job.getOutput(True, "output"): data = job.getOutput(True, "error") if is_array_job: data = data.replace("$(Process)", "$SLURM_ARRAY_TASK_ID") else: data = data.replace("$(Process)", "") cmd += "2>" + data cmd += '\n' f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd) f.close() all_cmds.append(("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", commandline)) glcount += 1 sent = 0 tosend = nqueue self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus for cmde in all_cmds: cmd, commandline = cmde try: if not hasattr(self, "channel"): if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = tosend break (canI, nproc) = sge.canSubmitJobs(getpass.getuser(), qname=self.qname, fraction=self.fraction) # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: if tosend < nproc: nproc = tosend break else: timep.sleep(3) while True: p = subprocess.Popen(['msub'] + commandline.split() + [cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # print cmd.replace('"','').strip() out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" entered = False liout = out.splitlines() for lineacorr in liout: try: lir = int(lineacorr.strip()) entered = True break except: continue if not entered: print "MOAB not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.split(".")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue,"queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break else: if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = tosend break self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qsub ' + str( self.qname) + " " + str( self.fraction) + "\n", self.promptB) self.lock.release() lis = out.splitlines() lineacorr = "" for lineacorr in lis: if lineacorr.strip().startswith("Can"): break canI = bool(lineacorr.split()[1]) nproc = int(lineacorr.split()[3]) # print "I can process:",canI,nproc # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: if tosend < nproc: nproc = tosend break else: timep.sleep(3) # print "Request for lock in submit",cmd print "COPYING", cmd, "INTO", os.path.join(workDir, os.path.basename(cmd)) self.copy_local_file(cmd, os.path.join(workDir, os.path.basename(cmd)), send_now=True, remote_path_asitis=True) while True: self.lock.acquire() print "EXECUTING", 'msub ' + commandline + " " + os.path.join(workDir, os.path.basename(cmd)) out = self.connection.send_command_to_channel(self.channel, 'msub ' + commandline + " " + os.path.join( workDir, os.path.basename(cmd)), self.promptB) self.lock.release() # print "Lock free" # scrivere nel Log l'eventuale err. # print cmd # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" entered = False liout = out.splitlines() for lineacorr in liout: try: lir = int(lineacorr.strip()) entered = True break except: continue if not entered: print "MOAB not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.split(".")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue, "queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def setRequirements(self, requiString): pass def setRank(self, rankString): pass def getCMD(self, jobid): try: return (JOBS[jobid])["cmd"] except: raise TypeError('The Job is not registered to the GridManager') class TORQUEManager(Grid): """Manager for TORQUE jobs """ def __init__(self, qname='all.q', cores_per_node=4, parallel_jobs=100, maui=False): super(TORQUEManager, self).__init__() self.type_grid = "torque" self.qname = str(qname) self.maui = maui # while parallel_jobs%cores_per_node != 0 and parallel_jobs<1000: # parallel_jobs += 1 if parallel_jobs > 1000: print "ERROR: The minimum number of parallel jobs ", parallel_jobs, "that can be efficiently grouped in", cores_per_node, "cores per node is greater than 1000. for security reason is not allowed to send all those jobs in parallel! Contact the support of the ARCIMBOLDO Team, please." sys.exit(1) self.cores_per_node = cores_per_node self.parallel_jobs = parallel_jobs self.node_maximum = int(self.parallel_jobs / self.cores_per_node) def submitJob(self, job, isthelast=False, forcesubmit=False): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): if not forcesubmit and len(self.FILE_TO_COPY) > 0 and ( len(self.FILE_TO_COPY) >= self.CUMULATIVE_TRANSFERING or isthelast): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] self.JOB_TO_SUBMIT.append(job) for ju in self.JOB_TO_SUBMIT: a = self.submitJob(ju, forcesubmit=True) self.JOB_TO_SUBMIT = [] return 0, 0 elif not forcesubmit and len(self.FILE_TO_COPY) > 0: self.JOB_TO_SUBMIT.append(job) return 0, 0 while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJob, job) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, 1 def __submitJob(self, job): workDir = "" nqueue = 1 if hasattr(self, "channel"): workDir = self.get_remote_pwd() print "WORKING DIRECTORY:", workDir if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") all_cmds = [] basecmd = """ #! /bin/bash # Template for TORQUE array job for ARCIMBOLDO-BORGES and ARCIMBOLDO # Cambiar el valor de wd, el rango es -t 1-n, empieza en 1 # Una vez empezados los parametros de SGE, no poner lineas en blanco o deja de leerlos. """ if isinstance(job.initdir, str): if job.initdir != "": basecmd += "#PBS -d " + str(job.initdir) + "\n" + "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" else: basecmd += "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" elif isinstance(job.initdir, list): basecmd += "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" basecmd += "#PBS -l nodes=1:ppn=1\n\n" last_dire = "" glcount = 1 for i in range(nqueue): cmd = basecmd # print "actual cmd" # print cmd if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd += ' ' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd += ' ' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd += ' ' + job.executable if len(job.args) > 0: cmd += " " + job.getArgs(False) if len(job.stdIn) > 0: cmd += "<" + job.getStdIn(False) if len(job.listFilesOut) > 1 and job.getOutput(False, "error") == job.getOutput(False, "output"): data = job.getOutput(False, "output") cmd += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(False, "output") cmd += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(False, "error") != job.getOutput(False, "output"): data = job.getOutput(False, "error") cmd += "2>" + data cmd += '\n' f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd) f.close() all_cmds.append("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh") glcount += 1 sent = 0 tosend = nqueue self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus for cmd in all_cmds: try: if not hasattr(self, "channel"): while True: p = subprocess.Popen(['qsub', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # print cmd.replace('"','').strip() out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" # print "======================ERROR====================" # print err # print "======================ERROR====================" entered = False liout = out.splitlines() for lineacorr in liout: if lineacorr.strip().split(".")[0].isdigit(): entered = True break if not entered: print "TORQUE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split(".")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue,"queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break else: # print "Request for lock in submit",cmd print "COPYING", cmd, "INTO", os.path.join(workDir, os.path.basename(cmd)) self.copy_local_file(cmd, os.path.join(workDir, os.path.basename(cmd)), send_now=True, remote_path_asitis=True) while True: self.lock.acquire() print "EXECUTING", 'nohup qsub ' + os.path.join(workDir, os.path.basename(cmd) + " | tee nohup.out") out = self.connection.send_command_to_channel(self.channel, 'nohup qsub ' + os.path.join(workDir, os.path.basename( cmd)) + " | tee nohup.out", self.promptB) self.lock.release() # print "Lock free" # scrivere nel Log l'eventuale err. # print cmd # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" entered = False liout = out.splitlines() for lineacorr in liout: if lineacorr.strip().split(".")[0].isdigit(): entered = True break if not entered: print "TORQUE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split(".")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue, "queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def getStatus(self, jobid, nqueue): if jobid not in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys(): # print "JNP A version" return "JNP" # Job Never Performed elif jobid not in self.JOBS.keys() and jobid in self.JOBS_DONE.keys(): return "NRA" # Not Registered Anymore elif jobid in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys() and len(self.JOBS[jobid]) == 0: # print "JNP B version" return "JNP" # Job Never Performed while True: try: cluster = ((self.JOBS[jobid])[nqueue])["gridCluster"] nq = ((self.JOBS[jobid])[-1])["queue"] break except: pass if nqueue < 0: return "NCV" # Non Corresponding Values dizio = self.getGridQueue() if cluster not in dizio.keys(): ntot = ((self.JOBS[jobid])[nqueue])["qtotal"] ((self.JOBS[jobid])[nqueue]) = {} completed = True for tr in self.JOBS[jobid]: if len(tr.keys()) > 0: completed = False break if completed and len(self.JOBS[jobid]) == ntot: self.JOBS_DONE[jobid] = copy.deepcopy(self.JOBS[jobid]) del self.JOBS[jobid] return "NRA" # Not Registered Anymore return ((self.JOBS[jobid])[nqueue], dizio[cluster]) def getGridQueue(self, cluster="", queue=""): if not hasattr(self, "channel"): # p = subprocess.Popen(['qstat'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # out, err = p.communicate() # dit = sge.getJobStatsFromUser(getpass.getuser()) liOut = sge.getQueueRawData() else: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qstat ' + self.qname, self.promptB) self.lock.release() liOut = out.splitlines() # scrivere nel Log l'eventuale err. joc = {} # (cluster):{owner:user,submitted:time,runtime:time,status:stat,priorized:int,size:kb,cmd:command} toStart = False for linea in liOut: if linea.startswith("-----"): toStart = True continue if toStart: dati = (linea.strip()).split() if len(dati) < 7: toStart = False continue clus = int(dati[0]) prior = float(dati[1]) name = dati[2] user = dati[3] status = dati[4] submitted = dati[5] + " " + dati[6] # queue = dati[7] # master = int(dati[7]) joc[clus] = {"owner": user, "submitted": submitted, "status": status, "priorized": prior, "name": name} return joc def isGridAlive(self): pass def removeJob(self, jobid, nqueue): pass def removeCluster(self, jobid): pass def submitJobs(self, job, nqueue): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJobs, job, nqueue) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, nqueue def __submitJobs(self, job, nqueue): workDir = "" if hasattr(self, "channel"): workDir = self.get_remote_pwd() print "WORKING DIRECTORY:", workDir if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") all_cmds = [] basecmd = """ #! /bin/bash # Template for TORQUE array job for ARCIMBOLDO-BORGES and ARCIMBOLDO # Cambiar el valor de wd, el rango es -t 1-n, empieza en 1 # Una vez empezados los parametros de SGE, no poner lineas en blanco o deja de leerlos. """ if isinstance(job.initdir, str): if job.initdir != "": basecmd += "#PBS -d " + str(job.initdir) + "\n" + "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" else: basecmd += "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" elif isinstance(job.initdir, list): basecmd += "#PBS -N p" + str( job.getName()) + "\n" + "#PBS -q " + self.qname + "\n" + "#PBS -r y\n#PBS -o output.out\n#PBS -e error.err\n" last_dire = "" cmd_last = "" glcount = 1 for i in range(nqueue): cmd = basecmd if isinstance(job.initdir, str): if i > 0: break elif isinstance(job.initdir, list): dire = "" nume = 0 summa = 0 for el in job.initdir: dire, nume = el summa += nume if i < summa: break # print "====================nqueue:",nqueue,"summa",summa,"dire",dire if last_dire != dire: if (summa - i) == self.parallel_jobs: cmd += "#PBS -d " + str(dire) + "\n" + "#PBS -t " + str(i + 1) + "-" + str(summa) + "\n" if not self.maui: cmd += "#PBS -l nodes=" + str(int(self.node_maximum)) + ":ppn=" + str( int(self.cores_per_node)) + "\n\n" else: todo = int((summa - i) / self.cores_per_node) todo_rest = (summa - i) % self.cores_per_node if todo == 0: cmd += "#PBS -d " + str(dire) + "\n" + "#PBS -t " + str(i + 1) + "-" + str(summa) + "\n" if not self.maui: cmd += "#PBS -l nodes=" + str(1) + ":ppn=" + str(summa) + "\n\n" else: if todo_rest == 0: cmd += "#PBS -d " + str(dire) + "\n" + "#PBS -t " + str(i + 1) + "-" + str(summa) + "\n" if not self.maui: cmd += "#PBS -l nodes=" + str(todo) + ":ppn=" + str( int(self.cores_per_node)) + "\n\n" else: cmd_last = cmd cmd += "#PBS -d " + str(dire) + "\n" + "#PBS -t " + str(i + 1) + "-" + str( i + (todo * self.cores_per_node) + todo_rest) + "\n" if not self.maui: cmd += "#PBS -l nodes=" + str(todo) + ":ppn=" + str( int(self.cores_per_node)) + "\n\n" # cmd += "#PBS -d "+str(dire)+"\n"+"#PBS -t "+str((todo*self.cores_per_node)+1)+"-"+str((todo*self.cores_per_node)+1+todo_rest)+"\n" # if not self.maui: # cmd_last += "#PBS -l nodes="+str(1)+":ppn="+str(todo_rest)+"\n\n" last_dire = dire else: continue cmd_sec = "i=$(($PBS_ARRAYID - 1))\n" # print "actual cmd" # print cmd if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd_sec += '' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd_sec += '' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd_sec += '' + job.executable if len(job.args) > 0: data = job.getArgs(True) data = data.replace("$(Process)", "$i") cmd_sec += " " + data if len(job.stdIn) > 0: data = job.getStdIn(True) data = data.replace("$(Process)", "$i") cmd_sec += "<" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") == job.getOutput(True, "output"): data = job.getOutput(True, "output") data = data.replace("$(Process)", "$i") cmd_sec += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(True, "output") data = data.replace("$(Process)", "$i") cmd_sec += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") != job.getOutput(True, "output"): data = job.getOutput(True, "error") data = data.replace("$(Process)", "$i") cmd_sec += "2>" + data cmd_sec += '\n' cmd += cmd_sec f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd) f.close() all_cmds.append("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh") glcount += 1 if len(cmd_last) > 0: cmd_last += cmd_sec f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd_last) f.close() all_cmds.append("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh") glcount += 1 sent = 0 tosend = nqueue self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus for cmd in all_cmds: try: if not hasattr(self, "channel"): while True: p = subprocess.Popen(['qsub', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # print cmd.replace('"','').strip() out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" # print "======================ERROR====================" # print err # print "======================ERROR====================" entered = False liout = out.splitlines() for lineacorr in liout: if lineacorr.strip().split(".")[0].split("[]")[0].isdigit(): entered = True break if not entered: print "TORQUE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split(".")[0].split("[]")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue,"queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break else: # print "Request for lock in submit",cmd print "COPYING", cmd, "INTO", os.path.join(workDir, os.path.basename(cmd)) self.copy_local_file(cmd, os.path.join(workDir, os.path.basename(cmd)), send_now=True, remote_path_asitis=True) while True: self.lock.acquire() print "EXECUTING", 'nohup qsub ' + os.path.join(workDir, os.path.basename(cmd) + " | tee nohup.out") out = self.connection.send_command_to_channel(self.channel, 'nohup qsub ' + os.path.join(workDir, os.path.basename( cmd)) + " | tee nohup.out", self.promptB) self.lock.release() # print "Lock free" # scrivere nel Log l'eventuale err. # print cmd # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" entered = False liout = out.splitlines() # print "=======",liout for lineacorr in liout: if lineacorr.strip().split(".")[0].split("[]")[0].isdigit(): entered = True break if not entered: print "TORQUE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.strip().split(".")[0].split("[]")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue, "queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def setRequirements(self, requiString): pass def setRank(self, rankString): pass def getCMD(self, jobid): try: return (JOBS[jobid])["cmd"] except: raise TypeError('The Job is not registered to the GridManager') class SGEManager(Grid): """Manager for Sun Grid Engine jobs """ def __init__(self, qname='all.q', fraction=1.0): super(SGEManager, self).__init__() self.type_grid = "sge" self.qname = qname self.fraction = fraction def submitJob(self, job, isthelast=False, forcesubmit=False): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): if not forcesubmit and len(self.FILE_TO_COPY) > 0 and ( len(self.FILE_TO_COPY) >= self.CUMULATIVE_TRANSFERING or isthelast): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] self.JOB_TO_SUBMIT.append(job) for ju in self.JOB_TO_SUBMIT: a = self.submitJob(ju, forcesubmit=True) self.JOB_TO_SUBMIT = [] return 0, 0 elif not forcesubmit and len(self.FILE_TO_COPY) > 0: self.JOB_TO_SUBMIT.append(job) return 0, 0 while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJob, job) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, 1 def __submitJob(self, job): if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") cmd = "" if job.initdir != "": cmd += "qsub -q " + self.qname + " -wd " + str(job.initdir) + " -r y -o /dev/null -e /dev/null -N p" + str( job.getName()) + " -b y" else: cmd += "qsub -q " + self.qname + " -cwd -o /dev/null -r y -e /dev/null -N p" + str(job.getName()) + " -b y" if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd += ' "' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd += ' "' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd += ' "' + job.executable if len(job.args) > 0: cmd += " " + job.getArgs(False) if len(job.stdIn) > 0: cmd += "<" + job.getStdIn(False) if len(job.listFilesOut) > 1 and job.getOutput(False, "error") == job.getOutput(False, "output"): data = job.getOutput(False, "output") cmd += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(False, "output") cmd += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(False, "error") != job.getOutput(False, "output"): data = job.getOutput(False, "error") cmd += "2>" + data cmd += '"\n' self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus try: if not hasattr(self, "channel"): if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = 1 break # TODO: Get the output and process it ti extract nproc (canI, nproc) = sge.canSubmitJobs(getpass.getuser(), qname=self.qname, fraction=self.fraction) # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: nproc = 1 break else: timep.sleep(3) while True: p = subprocess.Popen(cmd.replace('"', '').strip().split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" liout = out.splitlines() entered = False for lineacorr in liout: if lineacorr.strip().startswith("Your"): entered = True break if not entered: print "SGE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = 0 nCluster = int(lineacorr.split()[2]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":1,"queue":nQ}) nproc -= 1 print "job is: ", nCluster, nQ break else: if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = 1 break self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qsub ' + str( self.qname) + " " + str(self.fraction) + "\n", self.promptB) self.lock.release() lis = out.splitlines() lineacorr = "" for lineacorr in lis: if lineacorr.strip().startswith("Can"): break canI = bool(lineacorr.split()[1]) nproc = int(lineacorr.split()[3]) # print "I can process:",canI,nproc # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: nproc = 1 break else: timep.sleep(3) while True: # print "ASKED by submit" self.lock.acquire() # print "RECEVEID by submit" out = self.connection.send_command_to_channel(self.channel, cmd, self.promptB) self.lock.release() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" liout = out.splitlines() entered = False for lineacorr in liout: if lineacorr.strip().startswith("Your"): entered = True break if not entered: print "SGE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = 0 nCluster = int(lineacorr.split()[2]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":1,"queue":nQ}) nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def getStatus(self, jobid, nqueue): if jobid not in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys(): # print "JNP A version" return "JNP" # Job Never Performed elif jobid not in self.JOBS.keys() and jobid in self.JOBS_DONE.keys(): return "NRA" # Not Registered Anymore elif jobid in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys() and len(self.JOBS[jobid]) == 0: # print "JNP B version" return "JNP" # Job Never Performed while True: try: cluster = ((self.JOBS[jobid])[nqueue])["gridCluster"] nq = ((self.JOBS[jobid])[-1])["queue"] break except: pass if nqueue < 0: return "NCV" # Non Corresponding Values dizio = self.getGridQueue() if cluster not in dizio.keys(): ntot = ((self.JOBS[jobid])[nqueue])["qtotal"] ((self.JOBS[jobid])[nqueue]) = {} completed = True for tr in self.JOBS[jobid]: if len(tr.keys()) > 0: completed = False break if completed and len(self.JOBS[jobid]) == ntot: self.JOBS_DONE[jobid] = copy.deepcopy(self.JOBS[jobid]) del self.JOBS[jobid] return "NRA" # Not Registered Anymore return ((self.JOBS[jobid])[nqueue], dizio[cluster]) def getGridQueue(self, cluster="", queue=""): if not hasattr(self, "channel"): # p = subprocess.Popen(['qstat'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # out, err = p.communicate() # dit = sge.getJobStatsFromUser(getpass.getuser()) liOut = sge.getQueueRawData() else: self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qstat ' + self.qname, self.promptB) self.lock.release() liOut = out.splitlines() # scrivere nel Log l'eventuale err. joc = {} # (cluster):{owner:user,submitted:time,runtime:time,status:stat,priorized:int,size:kb,cmd:command} toStart = False for linea in liOut: if linea.startswith("-----"): toStart = True continue if toStart: dati = (linea.strip()).split() if len(dati) < 7: toStart = False continue clus = int(dati[0]) prior = float(dati[1]) name = dati[2] user = dati[3] status = dati[4] submitted = dati[5] + " " + dati[6] # queue = dati[7] # master = int(dati[7]) joc[clus] = {"owner": user, "submitted": submitted, "status": status, "priorized": prior, "name": name} return joc def isGridAlive(self): pass def removeJob(self, jobid, nqueue): pass def removeCluster(self, jobid): pass def submitJobs(self, job, nqueue): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] while True: if len(threading.enumerate()) < 10: # print "Starting Thread Job..." new_t = SystemUtility.OutputThreading(self.__submitJobs, job, nqueue) new_t.start() # print "...started!" break else: print "Too many thread", len(threading.enumerate()) timep.sleep(3) return 0, nqueue def __submitJobs(self, job, nqueue): workDir = "" if hasattr(self, "channel"): workDir = self.get_remote_pwd() print "WORKING DIRECTORY:", workDir if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") all_cmds = [] basecmd = """ #! /bin/bash # Template for SGE array job for ARCIMBOLDO-BORGES and ARCIMBOLDO # Cambiar el valor de wd, el rango es -t 1-n, empieza en 1 # Una vez empezados los parametros de SGE, no poner lineas en blanco o deja de leerlos. """ basecmd += "#$ -S /bin/bash\n" if isinstance(job.initdir, str): if job.initdir != "": basecmd += "#$ -wd " + str(job.initdir) + "\n" + "#$ -N p" + str( job.getName()) + "\n" + "#$ -q " + self.qname + "\n" + "#$ -t 1-" + str( nqueue) + "\n" + "#$ -r y\n#$ -o /dev/null\n#$ -e /dev/null\n\n" else: basecmd += "#$ -cwd\n" + "#$ -N p" + str( job.getName()) + "\n" + "#$ -q " + self.qname + "\n" + "#$ -t 1-" + str( nqueue) + "\n" + "#$ -r y\n#$ -o /dev/null\n#$ -e /dev/null\n\n" elif isinstance(job.initdir, list): basecmd += "#$ -N p" + str( job.getName()) + "\n" + "#$ -q " + self.qname + "\n" + "#$ -r y\n#$ -o /dev/null\n#$ -e /dev/null\n\n" last_dire = "" glcount = 1 for i in range(nqueue): cmd = basecmd if isinstance(job.initdir, str): if i > 0: break elif isinstance(job.initdir, list): dire = "" nume = 0 summa = 0 for el in job.initdir: dire, nume = el summa += nume if i < summa: break # print "last_dire",last_dire,dire if last_dire != dire: cmd += "#$ -wd " + str(dire) + "\n" + "#$ -t " + str(i + 1) + "-" + str(summa) + "\n\n" last_dire = dire else: continue cmd += "i=$(($SGE_TASK_ID - 1))\n" # print "actual cmd" # print cmd if not hasattr(self, "channel") and job.executable.endswith(".py"): cmd += '' + PATH_LOCAL_PYTHON_INTERPRETER + ' ' + job.executable elif job.executable.endswith(".py"): cmd += '' + PATH_REMOTE_PYTHON_INTERPRETER + ' ' + job.executable else: cmd += '' + job.executable if len(job.args) > 0: data = job.getArgs(True) data = data.replace("$(Process)", "$i") cmd += " " + data if len(job.stdIn) > 0: data = job.getStdIn(True) data = data.replace("$(Process)", "$i") cmd += "<" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") == job.getOutput(True, "output"): data = job.getOutput(True, "output") data = data.replace("$(Process)", "$i") cmd += ">&" + data elif len(job.listFilesOut) > 0: data = job.getOutput(True, "output") data = data.replace("$(Process)", "$i") cmd += ">" + data if len(job.listFilesOut) > 1 and job.getOutput(True, "error") != job.getOutput(True, "output"): data = job.getOutput(True, "error") data = data.replace("$(Process)", "$i") cmd += "2>" + data cmd += '\n' f = open("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh", "w") f.write(cmd) f.close() all_cmds.append("./grid_jobs/" + job.getName() + "_" + str(glcount) + ".sh") glcount += 1 sent = 0 tosend = nqueue self.JOBS[job.getName()] = [] nproc = 0 timep.sleep(60) # This should avoid NFS delay errors in telemachus for cmd in all_cmds: try: if not hasattr(self, "channel"): if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = tosend break (canI, nproc) = sge.canSubmitJobs(getpass.getuser(), qname=self.qname, fraction=self.fraction) # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: if tosend < nproc: nproc = tosend break else: timep.sleep(3) while True: p = subprocess.Popen(['qsub', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # print cmd.replace('"','').strip() out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" entered = False liout = out.splitlines() for lineacorr in liout: if lineacorr.strip().startswith("Your"): entered = True break if not entered: print "SGE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.split()[2]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue,"queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break else: if nproc == 0: while True: if float(self.fraction) == 1.0: nproc = tosend break self.lock.acquire() out = self.connection.send_command_to_channel(self.channel, PATH_REMOTE_SGEPY + ' -qsub ' + str( self.qname) + " " + str( self.fraction) + "\n", self.promptB) self.lock.release() lis = out.splitlines() lineacorr = "" for lineacorr in lis: if lineacorr.strip().startswith("Can"): break canI = bool(lineacorr.split()[1]) nproc = int(lineacorr.split()[3]) # print "I can process:",canI,nproc # nproc = 4 #TEMPORARY FOR TESTING if nproc >= 1: if tosend < nproc: nproc = tosend break else: timep.sleep(3) # print "Request for lock in submit",cmd print "COPYING", cmd, "INTO", os.path.join(workDir, os.path.basename(cmd)) self.copy_local_file(cmd, os.path.join(workDir, os.path.basename(cmd)), send_now=True, remote_path_asitis=True) while True: self.lock.acquire() print "EXECUTING", 'nohup qsub ' + os.path.join(workDir, os.path.basename(cmd) + " | tee nohup.out") out = self.connection.send_command_to_channel(self.channel, 'nohup qsub ' + os.path.join(workDir, os.path.basename( cmd)) + " | tee nohup.out", self.promptB) self.lock.release() # print "Lock free" # scrivere nel Log l'eventuale err. print cmd # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" entered = False liout = out.splitlines() for lineacorr in liout: if lineacorr.strip().startswith("Your"): entered = True break if not entered: print "SGE not available sleeping 10 seconds..." timep.sleep(10) continue nQ = sent nCluster = int(lineacorr.split()[2].split(".")[0]) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") # self.JOBS[job.getName()].append({"timeStart":time,"cmd":cmd,"job":job,"gridCluster":nCluster,"qtotal":nqueue, "queue":nQ}) tosend -= 1 sent += 1 nproc -= 1 print "job is: ", nCluster, nQ break except: print "Error on submitting a job..." print sys.exc_info() traceback.print_exc(file=sys.stdout) timep.sleep(3) def setRequirements(self, requiString): pass def setRank(self, rankString): pass def getCMD(self, jobid): try: return (JOBS[jobid])["cmd"] except: raise TypeError('The Job is not registered to the GridManager') class condorManager(Grid): """Manager for Condor jobs""" universe = "" notification = "" nice_user = "" should_transfer_files = "" when_to_transfer_output = "" requiString = "" rank = "" def __init__(self, universe="vanilla", notification="error", nice_user="false", should_transfer_files="IF_NEEDED", when_to_transfer_output="ON_EXIT"): super(condorManager, self).__init__() self.type_grid = "condor" self.universe = universe self.notification = notification self.nice_user = nice_user self.should_transfer_files = should_transfer_files self.when_to_transfer_output = when_to_transfer_output def submitJob(self, job, isthelast=False, forcesubmit=False): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): if len(self.FILE_TO_COPY) >= self.CUMULATIVE_TRANSFERING or isthelast: tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] for ju in self.JOB_TO_SUBMIT: a = self.submitJob(ju, forcesubmit=True) self.JOB_TO_SUBMIT = [] return 0, 1 elif not forcesubmit: self.JOB_TO_SUBMIT.append(job) return 0, 1 if job.getName() in self.JOBS.keys(): raise TypeError( 'Another job is yet registered with ' + job.getName() + " and it is impossible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") cmd = "" f = open("./grid_jobs/" + job.getName() + ".cmd", "w") if not hasattr(self, "channel") and job.executable.endswith(".py"): f.write("executable = " + PATH_LOCAL_PYTHON_INTERPRETER + "\n") cmd += "executable = " + PATH_LOCAL_PYTHON_INTERPRETER + "\n" elif job.executable.endswith(".py"): f.write("executable = " + PATH_LOCAL_PYTHON_INTERPRETER + "\n") cmd += "executable = " + PATH_REMOTE_PYTHON_INTERPRETER + "\n" else: f.write("executable = " + job.executable + "\n") cmd += "executable = " + job.executable + "\n" f.write("universe = " + self.universe + "\n") cmd += "universe = " + self.universe + "\n" if len(job.stdIn) > 0: f.write("input = " + job.getStdIn(False) + "\n") cmd += "input = " + job.getStdIn(False) + "\n" if self.nice_user != "": f.write("nice_user = " + self.nice_user + "\n") cmd += "nice_user = " + self.nice_user + "\n" if self.notification != "": f.write("notification = " + self.notification + "\n") cmd += "notification = " + self.notification + "\n" # if len(job.listFiles) > 0: # f.write("input = "+job.getInput()+"\n") if len(job.listFiles) > 0: f.write("transfer_input_files = " + job.getInput(False) + "\n") cmd += "transfer_input_files = " + job.getInput(False) + "\n" if self.should_transfer_files != "": f.write("should_transfer_files = " + self.should_transfer_files + "\n") cmd += "should_transfer_files = " + self.should_transfer_files + "\n" if self.when_to_transfer_output != "": f.write("when_to_transfer_output = " + self.when_to_transfer_output + "\n") cmd += "when_to_transfer_output = " + self.when_to_transfer_output + "\n" if self.requiString != "": f.write("requirements = " + self.requiString + "\n") cmd += "requirements = " + self.requiString + "\n" if self.rank != "": f.write("Rank = " + self.rank + "\n") cmd += "Rank = " + self.rank + "\n" if job.maxruntime != -1: f.write("maxRunTime = " + str(job.maxruntime) + "\n") cmd += "maxRunTime = " + str(job.maxruntime) + "\n" if job.periodicRemove != "": f.write("periodic_remove = " + job.periodicRemove + "\n") cmd += "periodic_remove = " + job.periodicRemove + "\n" if len(job.args) > 0 or job.executable.endswith(".py"): if job.executable.endswith(".py"): f.write("Arguments = " + job.executable + " " + job.getArgs(False) + "\n") cmd += "Arguments = " + job.executable + " " + job.getArgs(False) + "\n" else: f.write("Arguments = " + job.getArgs(False) + "\n") cmd += "Arguments = " + job.getArgs(False) + "\n" if job.initdir != "": f.write("initialdir = " + job.initdir + "\n") cmd += "initialdir = " + job.initdir + "\n" if len(job.listFilesOut) > 0: f.write("output = " + job.getOutput(False, "output") + "\n") cmd += "output = " + job.getOutput(False, "output") + "\n" if len(job.listFilesOut) > 1: f.write("error = " + job.getOutput(False, "error") + "\n") cmd += "error = " + job.getOutput(False, "error") + "\n" f.write("queue " + str(1) + "\n") cmd += "queue = " + str(1) + "\n" f.close() # timep.sleep(60) #This should avoid NFS delay errors in telemachus while True: try: if not hasattr(self, "channel"): p = subprocess.Popen(['condor_submit', "./grid_jobs/" + job.getName() + ".cmd"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" else: # timep.sleep(5) self.copy_local_file("./grid_jobs/" + job.getName() + ".cmd", job.getName() + ".cmd") out = self.connection.send_command_to_channel(self.channel, 'condor_submit ' + os.path.join(self.get_remote_pwd(), job.getName() + ".cmd"), self.promptB) # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================OUTPUT====================" liOut = out.split("job(s) submitted to cluster") liUno = liOut[0].splitlines() nQueue = int((liUno[-1]).strip()) liDue = liOut[1].split(".") nCluster = int((liDue[0]).strip()) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") self.JOBS[job.getName()] = {"timeStart": time, "cmd": cmd, "job": job, "gridCluster": nCluster, "queue": nQueue} print "job is: ", nCluster, nQueue break except: print "Error on submitting a job..." timep.sleep(3) return (nCluster, nQueue) def getStatus(self, jobid, nqueue): if jobid not in self.JOBS.keys() and jobid not in self.JOBS_DONE.keys(): return "JNP" # Job Never Performed elif jobid not in self.JOBS.keys() and jobid in self.JOBS_DONE.keys(): nq = (self.JOBS_DONE[jobid])["queue"] if nqueue == nq - 1: del self.JOBS_DONE[jobid] return "NRA" # Not Registered Anymore while True: try: cluster = (self.JOBS[jobid])["gridCluster"] # print "RAM reading: cluster ",cluster nq = (self.JOBS[jobid])["queue"] break except: # print "jobid ",jobid,nqueue,"not ready in RAM" # print (self.JOBS[jobid]) pass if nqueue < 0 or nqueue >= nq: return "NCV" # Non Corresponding Values try: dizio = self.getGridQueue(cluster=str(cluster), queue=str(nqueue)) except: return "JNP" if (cluster, nqueue) not in dizio: if (cluster, nq - 1) not in dizio: self.JOBS_DONE[jobid] = copy.deepcopy(self.JOBS[jobid]) del self.JOBS[jobid] return "NRA" # Not Registered Anymore return (self.JOBS[jobid], dizio[(cluster, nqueue)]) def getGridQueue(self, cluster="", queue=""): secarg = "" if cluster != "": secarg += str(cluster) if queue != "": secarg += "." + str(queue) if not hasattr(self, "channel"): p = subprocess.Popen(['condor_q', secarg], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() else: out = self.connection.send_command_to_channel(self.channel, 'condor_q ' + secarg, self.promptB) # scrivere nel Log l'eventuale err. joc = {} # (cluster,nid):{owner:user,submitted:time,runtime:time,status:stat,priorized:int,size:kb,cmd:command} liOut = out.splitlines() toStart = False for linea in liOut: if linea.startswith(" ID"): toStart = True continue if toStart: dati = linea.split() if len(dati) != 9: toStart = False continue clus, nq = dati[0].split(".") clus = int(clus) nq = int(nq) user = dati[1] submitted = dati[2] + " " + dati[3] runtime = dati[4] status = dati[5] prio = int(dati[6]) size = float(dati[7]) command = dati[8] joc[(clus, nq)] = {"owner": user, "submitted": submitted, "runtime": runtime, "status": status, "priorized": prio, "size": size, "command": command} return joc def isGridAlive(self): pass def removeJob(self, jobid, nqueue): if jobid not in self.JOBS.keys(): return "NRA" # Not Registered Anymore cluster = (self.JOBS[jobid])["gridCluster"] nq = (self.JOBS[jobid])["queue"] if nqueue < 0 or nqueue >= nq: return "NCV" # Non Corresponding Values if not hasattr(self, "channel"): p = subprocess.Popen(['condor_rm', '' + str(cluster) + '.' + str(nqueue)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================ERROR====================" return err else: out = self.connection.send_command_to_channel(self.channel, 'condor_rm ' + '' + str(cluster) + '.' + str(nqueue), self.promptB) # print "======================OUTPUT====================" # print out def removeCluster(self, jobid): if jobid not in self.JOBS.keys(): return "NRA" # Not Registered Anymore cluster = (self.JOBS[jobid])["gridCluster"] if not hasattr(self, "channel"): p = subprocess.Popen(['condor_rm', '' + str(cluster)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" # print out # print "======================ERROR====================" return err else: out = self.connection.send_command_to_channel(self.channel, 'condor_rm ' + '' + str(cluster), self.promptB) # print "======================OUTPUT====================" # print out def submitJobs(self, job, nqueue): if not os.path.exists("./temp/"): os.makedirs("./temp/") if hasattr(self, "channel"): tarro = tarfile.open(os.path.join("./temp/", "transfert.tar.gz"), "w:gz") for from_fi, to_fi in self.FILE_TO_COPY: tarro.add(from_fi, arcname=to_fi) tarro.close() buff = self.copy_local_file(os.path.join("./temp/", "transfert.tar.gz"), "transfert.tar.gz") buff = self.connection.send_command_to_channel(self.channel, 'tar -zxf ' + str( os.path.join(self.get_remote_pwd(), 'transfert.tar.gz')), self.promptB) buff = self.remove_remote_file("transfert.tar.gz") os.remove(os.path.join("./temp/", "transfert.tar.gz")) self.FILE_TO_COPY = [] if job.getName() in self.JOBS.keys(): self.JOBS[job.getName()] = None # raise TypeError('Another job is yet registered with '+job.getName()+" and it is not possible to register two jobs with the same name.") if not os.path.exists("./grid_jobs/"): os.makedirs("./grid_jobs/") cmd = "" f = open("./grid_jobs/" + job.getName() + ".cmd", "w") f.write("executable = " + job.executable + "\n") cmd += "executable = " + job.executable + "\n" f.write("universe = " + self.universe + "\n") cmd += "universe = " + self.universe + "\n" if len(job.stdIn) > 0: f.write("input = " + job.getStdIn(True) + "\n") cmd += "input = " + job.getStdIn(True) + "\n" if self.nice_user != "": f.write("nice_user = " + self.nice_user + "\n") cmd += "nice_user = " + self.nice_user + "\n" if self.notification != "": f.write("notification = " + self.notification + "\n") cmd += "notification = " + self.notification + "\n" # if len(job.listFiles) > 0: # f.write("input = "+job.getInput()+"\n") if len(job.listFiles) > 0: f.write("transfer_input_files = " + job.getInput(True) + "\n") cmd += "transfer_input_files = " + job.getInput(True) + "\n" if self.should_transfer_files != "": f.write("should_transfer_files = " + self.should_transfer_files + "\n") cmd += "should_transfer_files = " + self.should_transfer_files + "\n" if self.when_to_transfer_output != "": f.write("when_to_transfer_output = " + self.when_to_transfer_output + "\n") cmd += "when_to_transfer_output = " + self.when_to_transfer_output + "\n" if self.requiString != "": f.write("requirements = " + self.requiString + "\n") cmd += "requirements = " + self.requiString + "\n" if self.rank != "": f.write("Rank = " + self.rank + "\n") cmd += "Rank = " + self.rank + "\n" if job.maxruntime != -1: f.write("maxRunTime = " + str(job.maxruntime) + "\n") cmd += "maxRunTime = " + str(job.maxruntime) + "\n" if job.periodicRemove != "": f.write("periodic_remove = " + job.periodicRemove + "\n") cmd += "periodic_remove = " + job.periodicRemove + "\n" if len(job.args) > 0 or job.executable.endswith(".py"): if job.executable.endswith(".py"): f.write("Arguments = " + job.executable + " " + job.getArgs(True) + "\n") cmd += "Arguments = " + job.executable + " " + job.getArgs(True) + "\n" else: f.write("Arguments = " + job.getArgs(True) + "\n") cmd += "Arguments = " + job.getArgs(True) + "\n" if len(job.listFilesOut) > 0: f.write("output = " + job.getOutput(True, "output") + "\n") cmd += "output = " + job.getOutput(True, "output") + "\n" if len(job.listFilesOut) > 1: f.write("error = " + job.getOutput(True, "error") + "\n") cmd += "error = " + job.getOutput(True, "error") + "\n" if job.initdir != "" and isinstance(job.initdir, str): f.write("initialdir = " + job.initdir + "\n") cmd += "initialdir = " + job.initdir + "\n" if isinstance(job.initdir, list): for el in job.initdir: f.write("initialdir = " + el[0] + "\n") cmd += "initialdir = " + el[0] + "\n" f.write("queue " + str(el[1]) + "\n") cmd += "queue = " + str(el[1]) + "\n" if isinstance(job.initdir, str): f.write("queue " + str(nqueue) + "\n") cmd += "queue = " + str(nqueue) + "\n" f.close() # timep.sleep(60) #This should avoid NFS delay errors in telemachus while True: try: if not hasattr(self, "channel"): p = subprocess.Popen(['condor_submit', "./grid_jobs/" + job.getName() + ".cmd"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" # print "======================ERROR====================" print err # print "======================ERROR====================" else: # timep.sleep(5) self.copy_local_file("./grid_jobs/" + job.getName() + ".cmd", job.getName() + ".cmd") out = self.connection.send_command_to_channel(self.channel, 'condor_submit ' + os.path.join(self.get_remote_pwd(), job.getName() + ".cmd"), self.promptB) # scrivere nel Log l'eventuale err. # print "======================OUTPUT====================" print out # print "======================OUTPUT====================" liOut = out.split("job(s) submitted to cluster") liUno = liOut[0].splitlines() nQueue = int((liUno[-1]).strip()) liDue = liOut[1].split(".") nCluster = int((liDue[0]).strip()) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M") self.JOBS[job.getName()] = {"timeStart": time, "cmd": cmd, "job": job, "gridCluster": nCluster, "queue": nQueue} print "job is: ", nCluster, nQueue break except: print "Error on submitting a job..." # print sys.exc_info() # traceback.print_exc() timep.sleep(3) return (nCluster, nQueue) def setRequirements(self, requiString): self.requiString = requiString def setRank(self, rankString): self.rank = rankString def getCMD(self, jobid): try: return (JOBS[jobid])["cmd"] except: raise TypeError('The Job is not registered to the GridManager') class gridJob: """A job for a Grid Manager""" nameId = "" executable = "" initdir = "" listFiles = [] listFilesOut = [] maxruntime = -1 periodicRemove = "" args = [] stdIn = [] ARG_FILE = [] STDI_FILE = [] change_FILE = [] change_FILE_OUT = [] def __init__(self, nameId): # check if nameId is a string, because it must be self.nameId = "" self.executable = "" self.initdir = "" self.listFiles = [] self.listFilesOut = [] self.maxruntime = -1 self.periodicRemove = "" self.args = [] self.stdIn = [] self.ARG_FILE = [] self.STDI_FILE = [] self.change_FILE = [] self.change_FILE_OUT = [] self.nameId = str(nameId) def getName(self): return self.nameId def setExecutable(self, exe): self.executable = str(exe) def setInitialDir(self, initdir): if isinstance(initdir, list): for s in range(len(initdir)): initdir[s] = (str(initdir[s][0]), initdir[s][1]) self.initdir = initdir else: self.initdir = str(initdir) def addInputFile(self, ifile, couldChange): self.listFiles.append(str(ifile)) self.change_FILE.append(couldChange) try: inde = self.args.index(str(ifile)) self.ARG_FILE.append(inde) except: pass try: inde = self.stdIn.index(str(ifile)) self.STDI_FILE.append(inde) except: pass def addOutputFile(self, ifile, couldChange): self.listFilesOut.append(str(ifile)) self.change_FILE_OUT.append(couldChange) try: inde = self.args.index(str(ifile)) self.ARG_FILE.append(inde) except: pass def setMaxRuntime(self, time): self.maxruntime = time def setPeriodicRemove(self, time): self.periodicRemove = time def setOutput(self, outPath): self.output = outPath def setArguments(self, argsList): self.args = argsList self.ARG_FILE = [] for inde in range(len(self.args)): ar = self.args[inde] if ar in self.listFiles: self.ARG_FILE.append(inde) elif ar in self.listFilesOut: self.ARG_FILE.append(inde) def setStdIn(self, stdIn): self.stdIn = stdIn self.STDI_FILE = [] for inde in range(len(self.stdIn)): ar = self.stdIn[inde] if ar in self.listFiles: self.STDI_FILE.append(inde) def getInput(self, isInQueue): if not isInQueue: return ', '.join(self.listFiles) else: liRe = [] for tr in range(len(self.listFiles)): fil = self.listFiles[tr] change = self.change_FILE[tr] if change: lou = fil.split(".") a = lou[0] + ("$(Process)") + "." + lou[1] liRe.append(a) else: liRe.append(fil) return ', '.join(liRe) def getOutput(self, isInQueue, mode): lfo = [] cfo = [] if mode == "output": lfo = self.listFilesOut[0:1] cfo = self.listFilesOut[0:1] elif mode == "error": lfo = self.listFilesOut[1:2] cfo = self.listFilesOut[1:2] if not isInQueue: return ', '.join(lfo) else: liRe = [] for tr in range(len(lfo)): fil = lfo[tr] change = cfo[tr] if change: lou = fil.split(".") a = lou[0] + ("$(Process)") + "." + lou[1] liRe.append(a) else: liRe.append(fil) return ', '.join(liRe) def getArgs(self, isInQueue): if not isInQueue: return ' '.join(self.args) else: liRe = [] for index in range(len(self.args)): if index in self.ARG_FILE: fil = self.args[index] indr = self.listFiles.index(fil) change = self.change_FILE[indr] if change: lou = fil.split(".") a = lou[0] + ("$(Process)") + "." + lou[1] liRe.append(a) else: liRe.append(fil) else: liRe.append(self.args[index]) return ' '.join(liRe) def getStdIn(self, isInQueue): if not isInQueue: return ' '.join(self.stdIn) else: liRe = [] for index in range(len(self.stdIn)): if index in self.STDI_FILE: fil = self.stdIn[index] indr = self.listFiles.index(fil) change = self.change_FILE[indr] if change: lou = fil.split(".") a = lou[0] + ("$(Process)") + "." + lou[1] liRe.append(a) else: liRe.append(fil) else: liRe.append(self.stdIn[index]) return ' '.join(liRe)