# # utils.py # # # Copyright 2017 EMBL-EBI, Hinxton outstation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import base64 import ftplib import hashlib import re import os import ssl import subprocess import sys import urllib.request as urlrequest import xml.etree.ElementTree as ElementTree import urllib.error as urlerror from configparser import SafeConfigParser from http.client import HTTPSConnection ASPERA_BIN = 'ascp' # ascp binary ASPERA_PRIVATE_KEY = '/path/to/aspera_dsa.openssh' # ascp private key file ASPERA_OPTIONS = '' # set any extra aspera options ASPERA_SPEED = '100M' # set aspera download speed SUPPRESSED = 'suppressed' PUBLIC = 'public' XML_FORMAT = 'xml' EMBL_FORMAT = 'embl' FASTA_FORMAT = 'fasta' MASTER_FORMAT = 'master' SUBMITTED_FORMAT = 'submitted' FASTQ_FORMAT = 'fastq' SRA_FORMAT = 'sra' XML_EXT = '.xml' EMBL_EXT = '.dat' FASTA_EXT = '.fasta' WGS_EMBL_EXT = '.dat.gz' WGS_FASTA_EXT = '.fasta.gz' WGS_MASTER_EXT = '.master.dat' SEQUENCE = 'sequence' CODING = 'coding' WGS = 'wgs' RUN = 'run' EXPERIMENT = 'experiment' ANALYSIS = 'analysis' ASSEMBLY = 'assembly' STUDY = 'study' READ = 'read' SAMPLE = 'sample' TAXON = 'taxon' VIEW_URL_BASE = 'https://www.ebi.ac.uk/ena/data/view/' XML_DISPLAY = '&display=xml' EMBL_DISPLAY = '&display=text' FASTA_DISPLAY = '&display=fasta' READ_RESULT_ID='read_run' ANALYSIS_RESULT_ID='analysis' WGS_RESULT_ID='wgs_set' ASSEMBLY_RESULT_ID='assembly' SEQUENCE_UPDATE_ID='sequence_update' SEQUENCE_RELEASE_ID='sequence_release' WGS_FTP_BASE = 'ftp://ftp.ebi.ac.uk/pub/databases/ena/wgs' WGS_FTP_DIR = 'pub/databases/ena/wgs' PORTAL_SEARCH_BASE = 'https://www.ebi.ac.uk/ena/portal/api/search?' RUN_RESULT = 'result=read_run' ANALYSIS_RESULT = 'result=analysis' WGS_RESULT = 'result=wgs_set' ASSEMBLY_RESULT = 'result=assembly' SEQUENCE_UPDATE_RESULT = 'result=sequence_update' SEQUENCE_RELEASE_RESULT = 'result=sequence_release' FASTQ_FIELD = 'fastq_ftp' SUBMITTED_FIELD = 'submitted_ftp' SRA_FIELD = 'sra_ftp' INDEX_FIELD = 'cram_index_ftp' FASTQ_MD5_FIELD = 'fastq_md5' SUBMITTED_MD5_FIELD = 'submitted_md5' SRA_MD5_FIELD = 'sra_md5' INDEX_MD5_FIELD = None FASTQ_ASPERA_FIELD = 'fastq_aspera' SUBMITTED_ASPERA_FIELD = 'submitted_aspera' SRA_ASPERA_FIELD = 'sra_aspera' INDEX_ASPERA_FIELD = 'cram_index_aspera' sequence_pattern_1 = re.compile('^[A-Z]{1}[0-9]{5}(\.[0-9]+)?$') sequence_pattern_2 = re.compile('^[A-Z]{2}[0-9]{6}(\.[0-9]+)?$') wgs_sequence_pattern = re.compile('^[A-Z]{4}[0-9]{8,9}(\.[0-9]+)?$') coding_pattern = re.compile('^[A-Z]{3}[0-9]{5}(\.[0-9]+)?$') wgs_prefix_pattern = re.compile('^[A-Z]{4}[0-9]{2}$') wgs_master_pattern = re.compile('^[A-Z]{4}[0-9]{2}[0]{6}$') unversion_wgs_prefix_pattern = re.compile('^[A-Z]{4}$') unversion_wgs_master_pattern = re.compile('^[A-Z]{4}[0]{8}$') run_pattern = re.compile('^[EDS]RR[0-9]{6,7}$') experiment_pattern = re.compile('^[EDS]RX[0-9]{6,7}$') analysis_pattern = re.compile('^[EDS]RZ[0-9]{6,7}$') assembly_pattern = re.compile('^GCA_[0-9]{9}(\.[0-9]+)?$') study_pattern_1 = re.compile('^[EDS]RP[0-9]{6,7}$') study_pattern_2 = re.compile('^PRJ[EDN][AB][0-9]+$') sample_pattern_1 = re.compile('^SAM[ND][0-9]{8}$') sample_pattern_2 = re.compile('^SAMEA[0-9]{6,}$') sample_pattern_3 = re.compile('^[EDS]RS[0-9]{6,7}$') enaBrowserTools_path = os.path.dirname(os.path.dirname(__file__)) def is_sequence(accession): return sequence_pattern_1.match(accession) or sequence_pattern_2.match(accession) def is_wgs_sequence(accession): return wgs_sequence_pattern.match(accession) def is_coding(accession): return coding_pattern.match(accession) def is_wgs_set(accession): return wgs_prefix_pattern.match(accession) \ or wgs_master_pattern.match(accession) \ or unversion_wgs_prefix_pattern.match(accession) \ or unversion_wgs_master_pattern.match(accession) def is_unversioned_wgs_set(accession): return unversion_wgs_prefix_pattern.match(accession) \ or unversion_wgs_master_pattern.match(accession) def is_run(accession): return run_pattern.match(accession) def is_experiment(accession): return experiment_pattern.match(accession) def is_analysis(accession): return analysis_pattern.match(accession) def is_assembly(accession): return assembly_pattern.match(accession) def is_study(accession): return study_pattern_1.match(accession) or study_pattern_2.match(accession) def is_sample(accession): return sample_pattern_1.match(accession) or sample_pattern_2.match(accession) \ or sample_pattern_3.match(accession) def is_primary_study(accession): return study_pattern_2.match(accession) def is_secondary_study(accession): return study_pattern_1.match(accession) def is_primary_sample(accession): return sample_pattern_1.match(accession) or sample_pattern_2.match(accession) def is_secondary_sample(accession): return sample_pattern_3.match(accession) def is_taxid(accession): try: int(accession) return True except ValueError: return False def get_accession_type(accession): if is_study(accession): return STUDY elif is_run(accession): return RUN elif is_experiment(accession): return EXPERIMENT elif is_analysis(accession): return ANALYSIS elif is_assembly(accession): return ASSEMBLY elif is_wgs_set(accession): return WGS elif is_coding(accession): return CODING elif is_sequence(accession): return SEQUENCE elif is_taxid(accession): return TAXON return None def accession_format_allowed(accession, output_format, aspera): if is_analysis(accession): return output_format == SUBMITTED_FORMAT if is_run(accession) or is_experiment(accession): if aspera: return output_format in [SUBMITTED_FORMAT, FASTQ_FORMAT, SRA_ASPERA_FIELD] else: return output_format in [SUBMITTED_FORMAT, FASTQ_FORMAT, SRA_FIELD] return output_format in [EMBL_FORMAT, FASTA_FORMAT] def group_format_allowed(group, output_format, aspera): if group == ANALYSIS: return output_format == SUBMITTED_FORMAT if group == READ: if aspera: return output_format in [SUBMITTED_FORMAT, FASTQ_FORMAT, SRA_ASPERA_FIELD] else: return output_format in [SUBMITTED_FORMAT, FASTQ_FORMAT, SRA_FIELD] return output_format in [EMBL_FORMAT, FASTA_FORMAT] # assumption is that accession and format have already been vetted before this method is called def get_record_url(accession, output_format): if output_format == XML_FORMAT: return VIEW_URL_BASE + accession + XML_DISPLAY elif output_format == EMBL_FORMAT: return VIEW_URL_BASE + accession + EMBL_DISPLAY elif output_format == FASTA_FORMAT: return VIEW_URL_BASE + accession + FASTA_DISPLAY return None def is_available(accession): if is_taxid(accession): url = get_record_url('Taxon:{0}'.format(accession), XML_FORMAT) else: url = get_record_url(accession, XML_FORMAT) try: response = urlrequest.urlopen(url) record = ElementTree.parse(response).getroot() return (not 'entry is not found' in record.text) and (len(record.getchildren()) > 0) except urlerror.URLError as e: if 'CERTIFICATE_VERIFY_FAILED' in str(e): print ('Error verifying SSL certificate. Have you run "Install Certificates" as part of your Python3 installation?') print ('This is a commonly missed step in Python3 installation on a Mac.') print ('Please run the following from a terminal window (update to your Python3 version as needed):') print ('open "/Applications/Python 3.6/Install Certificates.command"') raise def get_filename(base_name, output_format): if output_format == XML_FORMAT: return base_name + XML_EXT elif output_format == EMBL_FORMAT: return base_name + EMBL_EXT elif output_format == FASTA_FORMAT: return base_name + FASTA_EXT return None def get_destination_file(dest_dir, accession, output_format): filename = get_filename(accession, output_format) if filename is not None: return os.path.join(dest_dir, filename) return None def download_single_record(url, dest_file): urlrequest.urlretrieve(url, dest_file) def download_record(dest_dir, accession, output_format, expanded=False): try: dest_file = get_destination_file(dest_dir, accession, output_format) url = get_record_url(accession, output_format) if expanded: url = url + '&expanded=true' download_single_record(url, dest_file) return True except Exception as e: print ("Error downloading read record: {0}".format(e)) return False def write_record(url, dest_file): try: response = urlrequest.urlopen(url) linenum = 1 for line in response: if linenum == 1 and line.startswith(b'Entry:'): return False chars = dest_file.write(line) linenum += 1 dest_file.flush() return True except Exception: return False def get_ftp_file(ftp_url, dest_dir): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) urlrequest.urlretrieve(ftp_url, dest_file) return True except Exception as e: print ("Error with FTP transfer: {0}".format(e)) return False def get_aspera_file(aspera_url, dest_dir): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) asperaretrieve(aspera_url, dest_dir, dest_file) return True except Exception: print ("Error with FTP transfer: {0}".format(e)) return False def get_md5(filepath): hash_md5 = hashlib.md5() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def check_md5(filepath, expected_md5): generated_md5 = get_md5(filepath) if expected_md5 != generated_md5: print ('MD5 mismatch for downloaded file ' + filepath + '. Deleting file') print ('generated md5', generated_md5) print ('expected md5', expected_md5) os.remove(filepath) return False return True def file_exists(file_url, dest_dir, md5): filename = file_url.split('/')[-1] local_file = os.path.join(dest_dir, filename) if os.path.isfile(local_file): generated_md5 = get_md5(local_file) if generated_md5 == md5: print('{0} already exists in local directory, skipping'.format(filename)) return True return False def get_ftp_file_with_md5_check(ftp_url, dest_dir, md5): try: filename = ftp_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) urlrequest.urlretrieve(ftp_url, dest_file) return check_md5(dest_file, md5) except Exception as e: sys.stderr.write("Error with FTP transfer: {0}\n".format(e)) return False def get_aspera_file_with_md5_check(aspera_url, dest_dir, md5): try: filename = aspera_url.split('/')[-1] dest_file = os.path.join(dest_dir, filename) success = asperaretrieve(aspera_url, dest_dir, dest_file) if success: return check_md5(dest_file, md5) return False except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) return False def set_aspera_variables(filepath): try: parser = SafeConfigParser() parser.read(filepath) # set and check binary location global ASPERA_BIN ASPERA_BIN = parser.get('aspera', 'ASPERA_BIN') if not os.path.exists(ASPERA_BIN): print ('Aspera binary ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_BIN)) return False if not os.access(ASPERA_BIN, os.X_OK): print ('You do not have permissions to execute the aspera binary ({0}). Defaulting to FTP transfer'.format(ASPERA_BIN)) return False # set and check private key location global ASPERA_PRIVATE_KEY ASPERA_PRIVATE_KEY = parser.get('aspera', 'ASPERA_PRIVATE_KEY') if not os.path.exists(ASPERA_PRIVATE_KEY): print ('Private key file ({0}) does not exist. Defaulting to FTP transfer'.format(ASPERA_PRIVATE_KEY)) return False # set non-file variables global ASPERA_SPEED ASPERA_SPEED = parser.get('aspera', 'ASPERA_SPEED') global ASPERA_OPTIONS ASPERA_OPTIONS = parser.get('aspera', 'ASPERA_OPTIONS') return True except Exception as e: sys.stderr.write("ERROR: cannot read aspera settings from {0}.\n".format(filepath)) sys.stderr.write("{0}\n".format(e)) sys.exit(1) def set_aspera(aspera_filepath): aspera = True if aspera_filepath is not None: if os.path.exists(aspera_filepath): aspera = set_aspera_variables(aspera_filepath) else: print('Cannot find {0} file, defaulting to FTP transfer'.format(aspera_filepath)) aspera = False elif os.environ.get('ENA_ASPERA_INIFILE'): aspera = set_aspera_variables(os.environ.get('ENA_ASPERA_INIFILE')) else: if os.path.exists(os.path.join(enaBrowserTools_path, 'aspera_settings.ini')): aspera = set_aspera_variables(os.path.join(enaBrowserTools_path, 'aspera_settings.ini')) else: print('Cannot find aspera_settings.ini file, defaulting to FTP transfer') aspera = False return aspera def asperaretrieve(url, dest_dir, dest_file): try: logdir=os.path.abspath(os.path.join(dest_dir, "logs")) print ('Creating', logdir) create_dir(logdir) aspera_line="{bin} -QT -L {logs} -l {speed} -P33001 {aspera} -i {key} era-fasp@{file} {outdir}" aspera_line=aspera_line.format( bin=ASPERA_BIN, outdir=dest_dir, logs=logdir, file=url, aspera=ASPERA_OPTIONS, key=ASPERA_PRIVATE_KEY, speed=ASPERA_SPEED, ) print(aspera_line) subprocess.call(aspera_line, shell=True) # this blocks so we're fine to wait and return True return True except Exception as e: sys.stderr.write("Error with Aspera transfer: {0}\n".format(e)) return False def get_wgs_file_ext(output_format): if output_format == EMBL_FORMAT: return WGS_EMBL_EXT elif output_format == FASTA_FORMAT: return WGS_FASTA_EXT elif output_format == MASTER_FORMAT: return WGS_MASTER_EXT def get_wgs_ftp_url(wgs_set, status, output_format): base_url = WGS_FTP_BASE + '/' + status + '/' + wgs_set[:2].lower() + '/' + wgs_set return base_url + get_wgs_file_ext(output_format) def get_nonversioned_wgs_ftp_url(wgs_set, status, output_format): ftp_url = 'ftp.ebi.ac.uk' base_dir = WGS_FTP_DIR + '/' + status + '/' + wgs_set[:2].lower() base_url = WGS_FTP_BASE + '/' + status + '/' + wgs_set[:2].lower() ftp = ftplib.FTP(ftp_url) ftp.login() ftp.cwd(base_dir) supp = ftp.nlst() ftp.close() files = [f for f in supp if f.startswith(wgs_set) and f.endswith(get_wgs_file_ext(output_format))] if len(files) == 0: return None else: return base_url + '/' + max(files) def get_report_from_portal(url): request = urlrequest.Request(url) gcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1) return urlrequest.urlopen(request, context=gcontext) def download_report_from_portal(url, dest_file): response = get_report_from_portal(url) f = open(dest_file, 'wb') for line in response: chars = f.write(line) f.flush() f.close() def get_accession_query(accession): query = 'query=' if is_run(accession): query += 'run_accession="{0}"'.format(accession) elif is_experiment(accession): query += 'experiment_accession="{0}"'.format(accession) elif is_analysis(accession): query += 'analysis_accession="{0}"'.format(accession) return query def get_ftp_file_fields(accession): fields = 'fields=' fields += SUBMITTED_FIELD + ',' + SUBMITTED_MD5_FIELD if is_analysis(accession): return fields fields += ',' + INDEX_FIELD fields += ',' + SRA_FIELD + ',' + SRA_MD5_FIELD fields += ',' + FASTQ_FIELD + ',' + FASTQ_MD5_FIELD return fields def get_aspera_file_fields(accession): fields = 'fields=' fields += SUBMITTED_ASPERA_FIELD + ',' + SUBMITTED_MD5_FIELD if is_analysis(accession): return fields fields += ',' + INDEX_ASPERA_FIELD fields += ',' + SRA_ASPERA_FIELD + ',' + SRA_MD5_FIELD fields += ',' + FASTQ_ASPERA_FIELD + ',' + FASTQ_MD5_FIELD return fields def get_file_fields(accession, aspera): if aspera: return get_aspera_file_fields(accession) else: return get_ftp_file_fields(accession) def get_result(accession): if is_run(accession) or is_experiment(accession): return RUN_RESULT else: # is_analysis(accession) return ANALYSIS_RESULT def get_file_search_query(accession, aspera): return PORTAL_SEARCH_BASE + get_accession_query(accession) + '&' + \ get_result(accession) + '&' + \ get_file_fields(accession, aspera) + '&limit=0' def split_filelist(filelist_string): if filelist_string.strip() == '': return [] return filelist_string.strip().split(';') def parse_file_search_result_line(line, accession, output_format): cols = line.split('\t') data_acc = cols[0].strip() sub_filelist = split_filelist(cols[1]) sub_md5list = split_filelist(cols[2]) if is_analysis(accession): return data_acc, sub_filelist, sub_md5list, [] indexlist = split_filelist(cols[3]) sra_filelist = split_filelist(cols[4]) sra_md5list = split_filelist(cols[5]) fastq_filelist = split_filelist(cols[6]) fastq_md5list = split_filelist(cols[7]) if (output_format is None and len(sub_filelist) > 0): output_format = SUBMITTED_FORMAT elif (output_format is None and len(sra_filelist) > 0): output_format = SRA_FORMAT elif output_format is None: output_format = FASTQ_FORMAT if output_format == SUBMITTED_FORMAT: return data_acc, sub_filelist, sub_md5list, indexlist if output_format == SRA_FORMAT: return data_acc, sra_filelist, sra_md5list, [] return data_acc, fastq_filelist, fastq_md5list, [] def create_dir(dir_path): if not os.path.exists(dir_path): os.makedirs(dir_path) def get_group_query(accession, subtree): query='query=' if is_primary_study(accession): query += 'study_accession="{0}"'.format(accession) elif is_secondary_study(accession): query += 'secondary_study_accession="{0}"'.format(accession) elif is_primary_sample(accession): query += 'sample_accession="{0}"'.format(accession) elif is_secondary_sample(accession): query += 'secondary_sample_accession="{0}"'.format(accession) elif is_taxid(accession): if subtree: query += 'tax_tree({0})'.format(accession) else: query += 'tax_eq({0})'.format(accession) return query def get_group_fields(group): fields = 'fields=' if group in [SEQUENCE, WGS, ASSEMBLY]: fields += 'accession' elif group == READ: fields += 'run_accession' elif group == ANALYSIS: fields += 'analysis_accession' return fields def get_group_result(group): if group == READ: return RUN_RESULT elif group == ANALYSIS: return ANALYSIS_RESULT elif group == WGS: return WGS_RESULT elif group == ASSEMBLY: return ASSEMBLY_RESULT def get_group_search_query(group, result, accession, subtree): return PORTAL_SEARCH_BASE + get_group_query(accession, subtree) + '&' \ + result + '&' + get_group_fields(group) + '&limit=0' def get_experiment_search_query(run_accession): return PORTAL_SEARCH_BASE + 'query=run_accession="' + run_accession + '"' \ + '&result=read_run&fields=experiment_accession&limit=0' def is_empty_dir(target_dir): for dirpath, dirnames, files in os.walk(target_dir): if files: return False return True def print_error(): sys.stderr.write('ERROR: Something unexpected went wrong please try again.\n') sys.stderr.write('If problem persists, please contact datasubs@ebi.ac.uk for assistance, with the above error details.\n') def get_divisor(record_cnt): if record_cnt <= 10000: return 1000 elif record_cnt <= 50000: return 5000 return 10000 def record_start_line(line, output_format): if output_format == FASTA_FORMAT: return line.startswith(b'>') elif output_format == EMBL_FORMAT: return line.startswith(b'ID ') else: return False def extract_acc_from_line(line, output_format): if output_format == FASTA_FORMAT: return line.split(b'|')[1] elif output_format == EMBL_FORMAT: return line.split()[1][:-1] else: return ''