#!/usr/bin/env python # # Launcher script for GATK tools. Delegates to java -jar, spark-submit, or gcloud as appropriate, # and sets many important Spark and htsjdk properties before launch. # # If running a non-Spark tool, or a Spark tool in local mode, will search for GATK executables # as follows: # -If the GATK_LOCAL_JAR environment variable is set, uses that jar # -Otherwise if the GATK_RUN_SCRIPT created by "gradle installDist" exists, uses that # -Otherwise uses the newest local jar in the same directory as the script or the BIN_PATH # (in that order of precedence) # # If running a Spark tool, searches for GATK executables as follows: # -If the GATK_SPARK_JAR environment variable is set, uses that jar # -Otherwise uses the newest Spark jar in the same directory as the script or the BIN_PATH # (in that order of precedence) # import sys from subprocess import check_call, CalledProcessError, call import os import hashlib import signal import re script = os.path.dirname(os.path.realpath(__file__)) projectName = "gatk" BUILD_LOCATION = script +"/build/install/" + projectName + "/bin/" GATK_RUN_SCRIPT = BUILD_LOCATION + projectName GATK_LOCAL_JAR_ENV_VARIABLE = "GATK_LOCAL_JAR" GATK_SPARK_JAR_ENV_VARIABLE = "GATK_SPARK_JAR" BIN_PATH = script + "/build/libs" EXTRA_JAVA_OPTIONS_SPARK= "-DGATK_STACKTRACE_ON_USER_EXCEPTION=true " \ "-Dsamjdk.use_async_io_read_samtools=false " \ "-Dsamjdk.use_async_io_write_samtools=false " \ "-Dsamjdk.use_async_io_write_tribble=false " \ "-Dsamjdk.compression_level=2 " PACKAGED_LOCAL_JAR_OPTIONS= ["-Dsamjdk.use_async_io_read_samtools=false", "-Dsamjdk.use_async_io_write_samtools=true", "-Dsamjdk.use_async_io_write_tribble=false", "-Dsamjdk.compression_level=2"] DEFAULT_SPARK_ARGS_PREFIX = '--conf' DEFAULT_SPARK_ARGS = { "spark.kryoserializer.buffer.max" : "512m", "spark.driver.maxResultSize" : "0", "spark.driver.userClassPathFirst" : "false", "spark.io.compression.codec" : "lzf", "spark.yarn.executor.memoryOverhead" : "600", "spark.driver.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK, "spark.executor.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK } def createSparkConfArgs(javaOptions): sparkConfArgs = DEFAULT_SPARK_ARGS if javaOptions is not None: sparkConfArgs["spark.driver.extraJavaOptions"] = sparkConfArgs["spark.driver.extraJavaOptions"] + ' ' + javaOptions sparkConfArgs["spark.executor.extraJavaOptions"] = sparkConfArgs["spark.executor.extraJavaOptions"] + ' ' + javaOptions return DEFAULT_SPARK_ARGS_PREFIX, sparkConfArgs class GATKLaunchException(Exception): pass def signal_handler(signal, frame): sys.exit(1) def main(args): #suppress stack trace when killed by keyboard interrupt signal.signal(signal.SIGINT, signal_handler) try: if len(args) is 0 or (len(args) is 1 and (args[0] == "--help" or args[0] == "-h")): print("") print(" Usage template for all tools (uses --spark-runner LOCAL when used with a Spark tool)") print(" gatk AnyTool toolArgs") print("") print(" Usage template for Spark tools (will NOT work on non-Spark tools)") print(" gatk SparkTool toolArgs [ -- --spark-runner sparkArgs ]") print("") print(" Getting help") print(" gatk --list Print the list of available tools" ) print("") print(" gatk Tool --help Print help on a particular tool" ) print("") print(" Configuration File Specification") print(" --gatk-config-file PATH/TO/GATK/PROPERTIES/FILE") print("") print(" gatk forwards commands to GATK and adds some sugar for submitting spark jobs") print("") print(" --spark-runner controls how spark tools are run") print(" valid targets are:") print(" LOCAL: run using the in-memory spark runner") print(" SPARK: run using spark-submit on an existing cluster ") print(" --spark-master must be specified") print(" --spark-submit-command may be specified to control the Spark submit command") print(" arguments to spark-submit may optionally be specified after -- ") print(" GCS: run using Google cloud dataproc") print(" commands after the -- will be passed to dataproc") print(" --cluster must be specified after the --") print(" spark properties and some common spark-submit parameters will be translated ") print(" to dataproc equivalents") print("") print(" --dry-run may be specified to output the generated command line without running it") print(" --java-options 'OPTION1[ OPTION2=Y ... ]' optional - pass the given string of options to the ") print(" java JVM at runtime. ") print(" Java options MUST be passed inside a single string with space-separated values.") sys.exit(0) if len(args) is 1 and args[0] == "--list": args[0] = "--help" # if we're invoked with --list, invoke the GATK with --help dryRun = "--dry-run" in args if dryRun: dryRun = True args.remove("--dry-run") javaOptions = getValueForArgument(args, "--java-options") if javaOptions is not None: i = args.index("--java-options") del args[i] #remove javaOptions del args[i] #and its parameter sparkRunner = getValueForArgument(args, "--spark-runner") if sparkRunner is not None: i = args.index("--spark-runner") del args[i] #remove spark target del args[i] #and its parameter sparkSubmitCommand = getValueForArgument(args, "--spark-submit-command") if sparkSubmitCommand is not None: i = args.index("--spark-submit-command") del args[i] #remove sparkSubmitCommand target del args[i] #and its parameter (gatkArgs, sparkArgs) = getSplitArgs(args) sparkMaster = getValueForArgument(sparkArgs, "--spark-master") if sparkMaster is not None: i = sparkArgs.index("--spark-master") del sparkArgs[i] #remove spark target del sparkArgs[i] #and its parameter gatkArgs += ["--spark-master", sparkMaster] runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs, javaOptions) except GATKLaunchException as e: sys.stderr.write(str(e)+"\n") sys.exit(3) except CalledProcessError as e: sys.exit(e.returncode) def getSparkSubmitCommand(sparkSubmitCommand): if sparkSubmitCommand is None: sparkhome = os.environ.get("SPARK_HOME") if sparkhome is not None: return sparkhome +"/bin/spark-submit" else: return "spark-submit" else: return sparkSubmitCommand def getGCloudSubmitCommand(gcloudCmd): if gcloudCmd is None: gcloudHome = os.environ.get("GCLOUD_HOME") if gcloudHome is not None: return gcloudHome +"/gcloud" else: return "gcloud" else: return gcloudCmd def getLocalGatkRunCommand(javaOptions): localJarFromEnv = getJarFromEnv(GATK_LOCAL_JAR_ENV_VARIABLE) # Add java options to our packaged local jar options if javaOptions is not None: PACKAGED_LOCAL_JAR_OPTIONS.extend(javaOptions.split()) if localJarFromEnv is not None: return formatLocalJarCommand(localJarFromEnv) wrapperScript = getGatkWrapperScript(throwIfNotFound=False) if wrapperScript is not None: # Add options to JAVA_OPTS environment var for dispatch script if javaOptions is not None: envJavaOpts = os.environ.get('JAVA_OPTS') if envJavaOpts is not None: envJavaOpts = envJavaOpts + ' ' + javaOptions else: envJavaOpts = javaOptions os.environ['JAVA_OPTS'] = envJavaOpts return [wrapperScript] return formatLocalJarCommand(getLocalJar()) # will throw if local jar not found def formatLocalJarCommand(localJar): return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + [ "-jar", localJar] def getGatkWrapperScript(throwIfNotFound=True): if not os.path.exists(GATK_RUN_SCRIPT): if throwIfNotFound: raise GATKLaunchException("Missing GATK wrapper script: " + GATK_RUN_SCRIPT + "\nTo generate the wrapper run:\n\n " + script + "/gradlew installDist") else: return None sys.stderr.write("Using GATK wrapper script " + GATK_RUN_SCRIPT) return GATK_RUN_SCRIPT def getLocalJar(throwIfNotFound=True): localJar = findJar("local.jar", envVariableOverride=GATK_LOCAL_JAR_ENV_VARIABLE) if localJar is None and throwIfNotFound: raise GATKLaunchException("No local jar was found, please build one by running\n\n " + script + "/gradlew localJar\n" "or\n" " export " + GATK_LOCAL_JAR_ENV_VARIABLE + "=") return localJar def getSparkJar(throwIfNotFound=True): sparkJar = findJar("spark.jar", envVariableOverride=GATK_SPARK_JAR_ENV_VARIABLE) if sparkJar is None and throwIfNotFound: raise GATKLaunchException("No spark jar was found, please build one by running\n\n " + script + "/gradlew sparkJar\n" "or\n" " export " + GATK_SPARK_JAR_ENV_VARIABLE + "=") return sparkJar def findJar(jarSuffix, jarPrefix=projectName, envVariableOverride=None, jarSearchDirs=(script, BIN_PATH)): if envVariableOverride is not None: jarPathFromEnv = getJarFromEnv(envVariableOverride) if jarPathFromEnv is not None: return jarPathFromEnv for jarDir in jarSearchDirs: jar = getNewestJarInDir(jarDir, jarSuffix, jarPrefix) if jar is not None: sys.stderr.write("Using GATK jar " + jar) return jar return None def getJarFromEnv(envVariableName): jarPathFromEnv = os.environ.get(envVariableName) if jarPathFromEnv is not None: if not os.path.exists(jarPathFromEnv): raise GATKLaunchException(envVariableName + " was set to: " + jarPathFromEnv + " but this file doesn't exist. Please fix your environment") else: sys.stderr.write("Using GATK jar " + jarPathFromEnv + " defined in environment variable " + envVariableName) return jarPathFromEnv return None def getNewestJarInDir(dir, jarSuffix, jarPrefix): if not os.path.exists(dir): return None dirContents = os.listdir(dir) jarPattern = re.compile("^" + jarPrefix + ".*" + jarSuffix + "$") jars = [f for f in dirContents if jarPattern.match(f)] if len(jars) != 0: newestJar = max(jars, key=lambda x: os.stat(dir + "/" + x).st_mtime) return dir + "/" + newestJar return None def md5(file): hash = hashlib.md5() with open(file, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash.update(chunk) return hash.hexdigest() def cacheJarOnGCS(jar, dryRun): staging = os.environ.get("GATK_GCS_STAGING") if dryRun is True: return jar elif staging is None: sys.stderr.write( "\njar caching is disabled because GATK_GCS_STAGING is not set\n\n" "please set GATK_GCS_STAGING to a bucket you have write access too in order to enable jar caching\n" "add the following line to you .bashrc or equivalent startup script\n\n" " export GATK_GCS_STAGING=gs:///\n") return jar else: jarname = os.path.basename(jar) (name, ext) = os.path.splitext(jarname) jarmd5 = md5(jar) gcsjar = staging + name + "_"+ jarmd5 + ext try: if call(["gsutil", "-q", "stat", gcsjar]) is 0: sys.stderr.write("\nfound cached jar: " + gcsjar + "\n") return gcsjar else: if call(["gsutil", "cp", jar, gcsjar]) is 0: sys.stderr.write("\nuploaded " + jar + " -> " + gcsjar + "\n") return gcsjar else: sys.stderr.write("\nfailed to upload " + jar + " -> " + gcsjar + "\nThere may be something wrong with your bucket permissions or gsutil installation\n") return jar except OSError: sys.stderr.write("\nTried to execute gsutil to upload the jar but it wasn't available\n " "See https://cloud.google.com/sdk/#Quick_Start for instructions on installing gsutil\n\n") return jar def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs, javaOptions): if sparkRunner is None or sparkRunner == "LOCAL": cmd = getLocalGatkRunCommand(javaOptions) + gatkArgs + sparkArgs runCommand(cmd, dryrun) elif sparkRunner == "SPARK": sparkSubmitCmd = getSparkSubmitCommand(suppliedSparkSubmitCommand) sparkConfArgsPrefix, sparkConfArgs = createSparkConfArgs(javaOptions) sparkConfArgList = [[sparkConfArgsPrefix, "%s=%s" % (a, sparkConfArgs[a])] for a in sparkConfArgs]; cmd = [ sparkSubmitCmd, "--master", getSparkMasterSpecified(gatkArgs)] \ + [s for args in sparkConfArgList for s in args] \ + sparkArgs \ + [getSparkJar()] \ + gatkArgs try: runCommand(cmd, dryrun) except OSError: raise GATKLaunchException("Tried to run %s but failed.\nMake sure %s is available in your path" % (sparkSubmitCmd, sparkSubmitCmd)) elif sparkRunner == "GCS": jarPath = cacheJarOnGCS(getSparkJar(), dryrun) gcloudCmd = getGCloudSubmitCommand(suppliedSparkSubmitCommand) # Note: For GCS we don't need the prefix for the sparkConfArgs, so we ignore it and only grab the second # return value of createSparkConfArgs sparkConfArgs = createSparkConfArgs(javaOptions)[1] dataprocargs = convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs) sys.stderr.write("\nReplacing spark-submit style args with dataproc style args\n\n" + " ".join(sparkArgs) +" -> " + " ".join(dataprocargs) +"\n" ) cmd = [ gcloudCmd, "dataproc", "jobs", "submit", "spark"] \ + dataprocargs \ + ["--jar", jarPath] \ + ["--"] + gatkArgs + ["--spark-master", "yarn"] try: runCommand(cmd, dryrun) except OSError: raise GATKLaunchException("Tried to run gcloud but failed.\nMake sure gcloud is available in your path and you are properly authenticated") else: raise GATKLaunchException("Value: " + sparkRunner + " is not a valid value for --spark-runner. Choose one of LOCAL, SPARK, GCS") def runCommand(cmd, dryrun): if dryrun: print("\nDry run:\n") # Display environment variables for dry run if len(os.environ) != 0: print(" Env:\n") print( '\n'.join([' %s = %s' % (v, os.environ[v]) for v in os.environ]) ) print(" Cmd:\n") print((" " + " ".join(cmd)+"\n")) else: sys.stderr.write( "\nRunning:\n") sys.stderr.write(" " + " ".join(cmd)+"\n") check_call(cmd) def getSplitArgs(args): inFirstGroup = True firstArgs = [] secondArgs = [] for arg in args: if arg == "--": if not inFirstGroup: raise GATKLaunchException("Argument '--' must only be specified once") inFirstGroup = False else: if inFirstGroup: firstArgs.append(arg) else: secondArgs.append(arg) return (firstArgs, secondArgs) def isDryRun(args): return "--dry-run" in args def getValueForArgument(args, argument): if argument in args: i = args.index(argument) if len(args) <= i+1: raise GATKLaunchException("Argument: " + argument + " requires a parameter") return args[i+1] return None def getSparkMasterSpecified(args): value = getValueForArgument(args, "--spark-master") if value is None: raise GATKLaunchException("The argument --spark-master must be specified") else: return value # translate select spark-submit parameters to their gcloud equivalent def convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs): replacements = {"--driver-memory": "spark.driver.memory", "--driver-cores": "spark.driver.cores", "--executor-memory": "spark.executor.memory", "--executor-cores": "spark.executor.cores", "--num-executors": "spark.executor.instances" } dataprocargs = [] filesToAdd = [] properties = [] try: # Arguments passed as the sparkConfArgs should be passed through as properties. # In practice yarn files will be added through the 'sparkArgs' argument that is parsed below, # so we don't need to check for them up here. properties.extend(['%s=%s' % (p, sparkConfArgs[p]) for p in sparkConfArgs]) # Iterate through sparkArgs i = 0 while i < len(sparkArgs): arg = sparkArgs[i] if arg == "--conf": i += 1 property = sparkArgs[i] if "spark.yarn.dist.files" in property: #intercept yarn files and pass it through --files instead files = property.split("=")[1] filesToAdd = filesToAdd + files.split(",") else: properties.append(sparkArgs[i]) elif not replacements.get(arg) is None: i += 1 propertyname = replacements.get(arg) properties.append(propertyname + "=" + sparkArgs[i]) else: dataprocargs.append(arg) i +=1 except IndexError: raise GATKLaunchException("Found an argument: " + arg + "with no matching value.") if not len(properties) is 0: dataprocargs.append("--properties") dataprocargs.append(",".join(properties)) if not len(filesToAdd) is 0: dataprocargs.append("--files") dataprocargs.append(",".join(filesToAdd)) return dataprocargs if __name__ == "__main__": main(sys.argv[1:])