#!/usr/bin/env ccp4-python # # Copyright (C) 2016 Adam Simpkin # # This code is distributed under the terms and conditions of the # CCP4 Program Suite Licence Agreement as a CCP4 Application. # A copy of the CCP4 licence can be obtained by writing to the # CCP4 Secretary, Daresbury Laboratory, Warrington WA4 4AD, UK. # # Class for running Shelxc/d/e in Ample for c-alpha tracing # # Adam Simpkin 15/06/2016 # System import argparse import cmd import os import shlex import shutil import string import subprocess import sys import unittest # Custom import clipper import parse_shelxe import MRBUMP_Buccaneer import MRBUMP_Shelxe import MRBUMP_phs2mtz def which(program): def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None if "CCP4_AMPLE_ROOT" in os.environ.keys() and "CCP4" in os.environ.keys(): ample_root = os.environ["CCP4_AMPLE_ROOT"] elif "CCP4" in os.environ.keys(): ample_root = os.path.join(os.environ["CCP4"], "share", "ample") else: raise RuntimeError('CCP4 not found') if not which("shelxe"): raise RuntimeError('shelxe not found') def run_job(command_line, logfile, key=""): """ Generic job runner """ if os.name == "nt": process_args = shlex.split(command_line, posix=False) p = subprocess.Popen(process_args, bufsize=0, shell="False", stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) else: process_args = shlex.split(command_line) p = subprocess.Popen(process_args, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.STDOUT) (child_stdin, child_stdout_and_stderr) = (p.stdin, p.stdout) # Write the keyword input child_stdin.write(key) child_stdin.close() # Watch the output for successful termination out=child_stdout_and_stderr.readline() log=open(logfile, "w") while out: out=child_stdout_and_stderr.readline() log.write(out) child_stdout_and_stderr.close() log.close() class Shelxe(object): def __init__(self, optd=None): # Files self.hkl_in_file = None self.mtz_in_file = None self.name_in_file = None self.native = None self.pdb_in_file = None self.sca_in_file = None self.seq_in_file = None self.shelx_d_in_file = None self.shelx_e_ins_file = None self.shelx_e_lst_file = None self.working_dir = None # Crystallographic parameters self.cell_parameters = None self.density_modification = 20 self.heavy_atom = None self.space_group = None self.n_sites = None self.n_tries = 100 self.resolution = None self.solvent = None self.trace_cycles = 20 # log files self.mtz_2_sca_log = "mtz2sca.log" self.shelx_c_log = "shelxc.log" self.shelx_d_log = "shelxd.log" self.shelx_e_log = "shelxe.log" self.shelx_e_log_i = "shelxe_i.log" # mtz column names self.fp = "FP" self.sigfp = "SIGFP" self.free = "FreeR_flag" self.run_buccaneer = False #script files self.script = "" if optd: self.init(optd) if self.mtz_in_file: self.set_crystal_data(self.mtz_in_file) return def init(self, optd): """Store dictionary entries as class variables""" if 'mtzin' in optd.keys() and optd['mtzin'] and os.path.exists(optd['mtzin']): self.mtz_in_file = os.path.abspath(optd['mtzin']) else: raise RuntimeError("MTZ not defined") if 'pdbin' in optd.keys() and optd['pdbin'] and os.path.exists(optd['pdbin']): self.pdb_in_file = os.path.abspath(optd['pdbin']) else: raise RuntimeError("PDB not defined") if 'wdir' in optd.keys() and optd['wdir'] and os.path.exists(optd['wdir']): self.working_dir = os.path.relpath(optd['wdir']) else: self.working_dir = os.path.relpath(os.getcwd()) if 'Buccaneer' in optd.keys() and optd['Buccaneer']: self.RUN_BUCCANEER = optd['Buccaneer'] if self.run_buccaneer != False: if 'seqin' in optd.keys() and optd['seqin'] and os.path.exists(optd['seqin']): self.seq_in_file = os.path.abspath(optd['seqin']) else: raise RuntimeError("Sequence not defined") if 'a' in optd.keys() and optd['a']: self.trace_cycles = optd['a'] if 'FIND' in optd.keys() and optd['FIND']: self.n_sites = optd['FIND'] if 'SFAC' in optd.keys() and optd['SFAC']: self.heavy_atom = optd['SFAC'] if 'm' in optd.keys() and optd['m']: self.density_modification = optd['m'] if 'native' in optd.keys() and optd['native']: self.native = optd['native'] if 'NTRY' in optd.keys() and optd['NTRY']: self.n_tries = optd['NTRY'] if 's' in optd.keys() and optd['s']: self.solvent = optd['s'] if 'FP' in optd.keys() and optd['FP']: self.fp = optd['FP'] if 'SIGFP' in optd.keys() and optd['SIGFP']: self.sigfp = optd['SIGFP'] if 'free' in optd.keys() and optd['free']: self.free = optd['free'] return def set_crystal_data(self, mtz_in_file): """Set crystallographic parameters from mtz file""" hkl_info=clipper.HKL_info() mtz_file=clipper.CCP4MTZfile() mtz_file.open_read(mtz_in_file) mtz_file.import_hkl_info(hkl_info) sg, cell = hkl_info.spacegroup(), hkl_info.cell() self.space_group = sg.symbol_hm() self.resolution = "%.2lf" % hkl_info.resolution().limit() self.cell_parameters = "%.2lf %.2lf %.2lf %.2lf %.2lf %.2lf" % (cell.a(), cell.b(), cell.c(), cell.alpha_deg(), cell.beta_deg(), cell.gamma_deg()) return def produce_script(self): """Wrapper function to produce a script 1) mtz_2_sca 2) shelx_c 3) shelx_d 4) shelx_e """ assert (self.mtz_in_file and self.pdb_in_file), "Cannot find required files" # Define some extra files file_root, _ = os.path.splitext(self.mtz_in_file) file_name = os.path.basename(file_root) path = os.path.join(self.working_dir, file_name) self.name_in_file = file_name self.sca_in_file = path + ".sca" self.shelx_d_in_file = path + "_fa" self.shelx_e_ins_file = path + "_fa.ins" self.shelx_e_lst_file = path + "_fa.lst" self.hkl_in_file= path + ".hkl" self.script_file = path + ".sh" # Convert mtz to sca file mtz_2_sca = self.mtz_2_sca(self.mtz_in_file, self.sca_in_file) # Get the shelx_c command assert (self.cell_parameters and self.n_sites and self.n_tries and self.space_group), \ "Crystallographic data not defined" shelx_c = self.shelx_c(self.name_in_file, self.sca_in_file, self.cell_parameters, self.space_group, self.heavy_atom, self.n_sites, self.n_tries) # Get the shelx_d command shelx_d = self.shelx_d(self.shelx_d_in_file) #Get the shelx_e command assert (self.solvent and self.resolution), "Crystallographic data not defined" shelx_e = self.shelx_e(self.solvent, self.resolution, self.pdb_in_file, self.shelx_e_ins_file, self.shelx_e_lst_file, self.hkl_in_file, self.native, self.trace_cycles, self.density_modification) with open(self.script_file, "w") as out: out.write(self.script) return def mtz_2_sca(self, mtz_in_file, sca_in_file): """ Run mtz2sca to convert the MTZ file for Shelxc """ mtz_2_sca_command = self._mtz_2_sca(mtz_in_file, sca_in_file ) run_job(mtz_2_sca_command, self.mtz_2_sca_log) self.script += mtz_2_sca_command + os.linesep return self.script def _mtz_2_sca(self, mtz_in_file, sca_in_file): cmd = ["mtz2sca", mtz_in_file, sca_in_file ] command_line = " ".join(map(str,cmd)) + os.linesep return command_line def shelx_c(self, name_in_file, sca_in_file, cell_paras, space_group, heavy_atom, n_sites, n_tries): """ Run Shelxc to prepare files for Shelxd """ # Give a header output sys.stdout.write("##############################################\n") sys.stdout.write("Running Shelxc to prepare files for Shelxd...\n") sys.stdout.write("##############################################\n") sys.stdout.write("\n") shelx_c_command, shelx_c_key = self._shelx_c(name_in_file, sca_in_file, cell_paras, space_group, heavy_atom, n_sites, n_tries ) self.script += shelx_c_command + " << eof" + os.linesep + shelx_c_key + \ "eof" + os.linesep self.script = self.script.format(sca_in=sca_in_file, CELL=cell_paras, SPAG=(space_group.replace(" ", "")), ATOM=heavy_atom, SITES=n_sites, TRIES=n_tries ) run_job(shelx_c_command, self.shelx_c_log, shelx_c_key) return self.script def _shelx_c(self, name_in_file, sca_in_file, cell_paras, space_group, heavy_atom, n_sites, n_tries): cmd = ["shelxc", name_in_file ] key_lst = ["SAD ", sca_in_file, os.linesep, "CELL ", cell_paras, os.linesep, "SPAG ", space_group.replace(" ", ""), os.linesep, "SFAC ", heavy_atom, os.linesep, "FIND ", n_sites, os.linesep, "NTRY ", n_tries ] key = "".join(map(str, key_lst)) + os.linesep command_line = " ".join(map(str,cmd)) return command_line, key def shelx_d(self, shelx_d_in_file): """ Run Shelxd to prepare files for Shelxe """ # Give a header output sys.stdout.write("###################################################\n") sys.stdout.write("Running Shelxd to locate heavy atoms for Shelxe...\n") sys.stdout.write("###################################################\n") sys.stdout.write("\n") shelx_d_command = self._shelx_d(shelx_d_in_file ) self.script += shelx_d_command + os.linesep run_job(shelx_d_command, self.shelx_d_log) def _shelx_d(self, shelx_d_in_file): cmd = ["shelxd", shelx_d_in_file ] command_line = " ".join(map(str,cmd)) + os.linesep return command_line def shelx_e(self, solvent, resolution, pdb_in_file, shelx_e_ins_file, shelx_e_lst_file, hkl_in_file, native, traceCycles, densityModification, shelxeEXE="shelxe"): """ Run shelxe """ # Give a header output sys.stdout.write("############################################################\n") sys.stdout.write("Running Shelxe to do phase improvement and c-alpha trace...\n") sys.stdout.write("############################################################\n") sys.stdout.write("\n") # Set up input files for shelxe if os.path.isfile("shelxe-input_fa.ins"): os.remove("shelxe-input_fa.ins") self.script += "cp {0} {1}".format(shelx_e_ins_file, "shelxe-input_fa.ins") + os.linesep shutil.copyfile(shelx_e_ins_file, "shelxe-input_fa.ins") if os.path.isfile("shelxe-input_fa.lst"): os.remove("shelxe-input_fa.lst") self.script += "cp {0} {1}".format(shelx_e_lst_file, "shelxe-input_fa.lst") + os.linesep shutil.copyfile(shelx_e_lst_file, "shelxe-input_fa.lst") if os.path.isfile("shelxe-input.hkl"): os.remove("shelxe-input.hkl") self.script += "cp {0} {1}".format(hkl_in_file, "shelxe-input.hkl") + os.linesep shutil.copyfile(hkl_in_file, "shelxe-input.hkl") if os.path.isfile("shelxe-input_fa.hkl"): os.remove("shelxe-input_fa.hkl") self.script += "cp {0} {1}".format(hkl_in_file, "shelxe-input_fa.hkl") + os.linesep shutil.copyfile(hkl_in_file, "shelxe-input_fa.hkl") if os.path.isfile("shelxe-input.pda"): os.remove("shelxe-input.pda") self.script += "cp {0} {1}".format(pdb_in_file, "shelxe-input.pda") + os.linesep shutil.copyfile(pdb_in_file, "shelxe-input.pda") if native != None: old_native = native native = os.path.abspath("shelxe-input.ent") shutil.copyfile(old_native, native) # two scripts - inverted space group for invert_space_group in [False, True]: #shelx_e_command = self._shelx_e(solvent, # resolution, # pdb_in_file, # shelx_e_ins_file, # shelx_e_lst_file, # hkl_in_file, # traceCycles, # densityModification, # shelxeEXE, # native, # invert_space_group, #) log = self.shelx_e_log_i if invert_space_group else self.shelx_e_log #run_job(shelx_e_command, log) # Set the output pdb and phs file from SHELXE pdboutFile="shelxe-output.pdb" phsoutFile="shelxe-output.phs" # Create an instance of the mrbump shelxe module SHELXE=MRBUMP_Shelxe.Shelxe() # Create the directory for this hand if not invert_space_group: invertDIR=os.path.join(self.working_dir, "hand1") else: invertDIR=os.path.join(self.working_dir, "hand2") # If the directory exists then delete it and remake it if os.path.isdir(invertDIR): shutil.rmtree(invertDIR) os.mkdir(invertDIR) # Copy in the native data mtz file baseFilename=os.path.splitext(os.path.split(self.mtz_in_file)[-1])[0] shutil.copyfile(pdb_in_file, os.path.join(invertDIR, "shelxe-input.pda")) shutil.copyfile(baseFilename + "_fa.hkl", os.path.join(invertDIR, "shelxe-input_fa.hkl")) # Set the SHELXE working dir and make the native hkl from from the input mtz SHELXE.setShelxWorkingDIR(invertDIR) SHELXE.runMtz2Various(fp=self.fp, sigfp=self.sigfp, free=self.free) # Run SHELXE for this hand SHELXE.runShelxe(solvent, resolution, "PHASER", pdb_in_file, self.mtz_in_file, \ pdboutFile=pdboutFile, phsoutFile=phsoutFile, traceCycles=self.trace_cycles, \ fp=self.fp, sigfp=self.sigfp, free=self.free, \ shelxeEXE="shelxe", WEB_PATH_START=0, native=None, PHASES=True, \ invert_space_group=invert_space_group) # Convert the phs file to an mtz if os.path.isfile(os.path.join(invertDIR,"shelxe-output.phs")) and os.path.isfile(os.path.join(invertDIR,"shelxe-output.pdb")): currDIR=os.getcwd() os.chdir(invertDIR) p2m = MRBUMP_phs2mtz.PHS2MTZ() p2m.phs2mtz(phsin="shelxe-output.phs", pdbin="shelxe-output.pdb", hklout="shelxe-output.mtz", workingDIR=os.getcwd(), hklref=self.mtz_in_file, f=self.fp, sigf=self.sigfp, freeLabel=self.free, resolution=float(resolution), debug=False) os.chdir(currDIR) else: sys.stdout.write("Warning: no PHS file found for SHELXE in hand directory %s:\n" % invertDIR) if self.run_buccaneer: self.buccaneer(working_dir=invertDIR) return self.script def _shelx_e(self, solvent, resolution, pdb_in_file, shelx_e_ins_file, shelx_e_lst_file, hkl_in_file, trace_cycles, density_modification, shelxe_exe, native, invert_space_group): if native != None: shutil.copyfile(native, "shelxe-input.ent") cmd = [shelxe_exe, "shelxe-input.pda", " shelxe-input_fa", " -a", str(trace_cycles), " -s", str(solvent), " -m", str(density_modification), " -x", " -h" ] else: cmd = [shelxe_exe, " shelxe-input.pda", " shelxe-input_fa", " -a", str(trace_cycles), " -s", str(solvent), " -m", str(density_modification), " -h" ] free_lunch = " -e1.0 -l5" if resolution <= 2.0 else None if free_lunch: cmd.append(free_lunch) if invert_space_group: cmd.append(" -i") command_line = "".join(map(str,cmd)) + os.linesep return command_line def buccaneer(self, working_dir): """Runs buccaneer after shelxe""" buccaneer = MRBUMP_Buccaneer.Buccaneer() buccaneer.runBuccaneer(seqinFile=self.seq_in_file, mtzinFile="shelxe-output.mtz", pdboutFile="buccaneer-output.pdb", workingDIR=working_dir, F=self.fp, SIGF=self.sigfp, FreeR_flag=self.free, PHIC="PHI_SHELXE", FOM="FOM_SHELXE", cycles=5, pdbinFile="shelxe-output.pdb") return if __name__ == "__main__": options = argparse.ArgumentParser() #General arguments options.add_argument("-pdbin", type=str, help="pdb input file") options.add_argument("-mtzin", type=str, help="mtz input file") options.add_argument("-s", type=float, help="For SHELXE, Solvent content of the crystal") options.add_argument("-a", type=str, help="For SHELXE, N cycles for autotracing") options.add_argument("-m", type=str, help="For SHELXE, N iterations of density modification per global cycle") options.add_argument("-FIND", type=float, help="For SHELXC, Number of sites to find") options.add_argument("-SFAC", type=str, help="For SHELXC, Heavy atom to find") options.add_argument("-native", type=str, help="Native structure for diagnostics, requires PDB file xx.ent with same origin") options.add_argument("-NTRY", type=float, help="For SHELXC, Number of random tries is starting from random atoms") options.add_argument("-wdir", type=str, help="working directory") # arguments for buccaneer options.add_argument("-Buccaneer", type=bool, help="Run buccaneer True/False") options.add_argument("-seqin", type=str, help="Sequence input file (xxx.seq)") # FP, SIGFP, and Free to be parsed in options.add_argument("-FP", type=str, help="Column label for FP") options.add_argument("-SIGFP", typ=str, help="Column label for SIGFP") options.add_argument("-free", type=str, help="Column label for free") args = vars(options.parse_args()) SHELX = Shelxe(args) SHELX.produce_script() class TestCases(unittest.TestCase): def setUp(self): self.SHELX=Shelxe() def test_mtz2sca(self): mtz = os.path.join(ample_root, "tests", "testfiles", "1dtx.mtz") sca = os.path.join(ample_root, "tests", "testfiles", "1dtx.sca") reference_cmd = "mtz2sca {mtz} {sca}".format(mtz=mtz, sca=sca) + os.linesep cmd = self.SHELX._mtz_2_sca(mtz, sca) self.assertEqual(reference_cmd, cmd) def test_runShelxc(self): sca = "1dtx.sca" nmin = "1dtx" CELL = "49.70 57.90 74.17 90.0 90.0 90.0" SFAC = "S" SPAG = "P21 21 21" FIND = float(6) NTRY = float(100) reference_cmd = "shelxc {name}" reference_cmd += "SAD {sca}" + os.linesep reference_cmd += "CELL {para}" + os.linesep reference_cmd += "SPAG {space}" + os.linesep reference_cmd += "SFAC {atom}" + os.linesep reference_cmd += "FIND {sites}" + os.linesep reference_cmd += "NTRY {tries}" + os.linesep reference_cmd = reference_cmd.format(name=nmin, sca=sca, para=CELL, space=(SPAG.replace(" ", "")), atom=SFAC, sites=FIND, tries=NTRY) cmd = self.SHELX._shelx_c(nmin, sca, CELL, SPAG, SFAC, FIND, NTRY) cmd = "".join(map(str,cmd)) self.assertEqual(reference_cmd, cmd) def test_runShelxd(self): file_in = "1dtx_fa" reference_cmd = "shelxd {file_in}".format(file_in=file_in) + os.linesep cmd = self.SHELX._shelx_d(file_in) self.assertEqual(reference_cmd, cmd) def test_runShelxe(self): solvent = float(0.5) resolution = float(1.5) pdbin = "1dtx.pdb" ins_file = "1dtx_fa.ins" lst_file = "1dtx_fa.lst" hkl_in = "1dtx_fa.hkl" trace_cycles = "20" density_modification = "20" shelxe_exe = "shelxe" native = None invert_space_group = True reference_cmd = "shelxe" + ' shelxe-input.pda shelxe-input_fa -a' \ + "20" + ' -s' + str(solvent) + ' -m' + "20" + ' -h -e1.0 -l5 -i' \ + os.linesep cmd = self.SHELX._shelx_e(solvent, resolution, pdbin, ins_file, lst_file, hkl_in, trace_cycles, density_modification, shelxe_exe, native, invert_space_group) self.assertEqual(reference_cmd, cmd) def test_runShelxe_2(self): solvent = float(0.5) resolution = float(1.5) pdbin = "1dtx.pdb" ins_file = "1dtx_fa.ins" lst_file = "1dtx_fa.lst" hkl_in = "1dtx_fa.hkl" trace_cycles = "20" density_modification = "20" shelxe_exe = "shelxe" native = None invert_space_group = False reference_cmd = "shelxe" + ' shelxe-input.pda shelxe-input_fa -a' \ + "20" + ' -s' + str(solvent) + ' -m' + "20" + ' -h -e1.0 -l5' \ + os.linesep cmd = self.SHELX._shelx_e(solvent, resolution, pdbin, ins_file, lst_file, hkl_in, trace_cycles, density_modification, shelxe_exe, native, invert_space_group) self.assertEqual(reference_cmd, cmd) def test_runShelxe_3(self): solvent = float(0.5) resolution = float(2.5) pdbin = "1dtx.pdb" ins_file = "1dtx_fa.ins" lst_file = "1dtx_fa.lst" hkl_in = "1dtx_fa.hkl" trace_cycles = "20" density_modification = "20" shelxe_exe = "shelxe" native = None invert_space_group = True reference_cmd = "shelxe" + ' shelxe-input.pda shelxe-input_fa -a' \ + "20" + ' -s' + str(solvent) + ' -m' + "20" + ' -h -i' \ + os.linesep cmd = self.SHELX._shelx_e(solvent, resolution, pdbin, ins_file, lst_file, hkl_in, trace_cycles, density_modification, shelxe_exe, native, invert_space_group) self.assertEqual(reference_cmd, cmd)