Running ADAM on Toil¶
Toil is a workflow management tool that supports running multi-tool workflows. Unlike traditional workflow managers that are limited to supporting jobs that run on a single node, Toil includes support for clusters of long lived services through the Service Job abstraction. This abstraction enables workflows that mix Spark-based tools like ADAM in with traditional, single-node tools. (Vivian et al. 2016) describes the Toil architecture and demonstrates the use of Toil at scale in the Amazon Web Services EC2 cloud. Toil can be run on various on-premises High Performance Computing schedulers, and on the Amazon EC2 and Microsoft Azure clouds. A quick start guide to deploying Toil in the cloud or in an on-premises cluster can be found here.
toil-lib is a library downstream from Toil that provides common functionality that is useful across varied genomics workflows. There are two useful modules that help to set up an Apache Spark cluster, and to run an ADAM job:
toil_lib.spark
: This module contains all the code necessary to set up a set of Service Jobs that launch and run an Apache Spark cluster backed by the Apache Hadoop Distributed File System (HDFS).toil_lib.tools.spark_tools
: This module contains functions that run ADAM in Toil using Docker, as well as Conductor, a tool for running transfers between HDFS and Amazon’s S3 storage service.
Several example workflows that run ADAM in Toil can be found in toil-scripts. These workflows include:
- adam-kmers:
this workflow was demonstrated in (Vivian et al. 2016) and sets up a
Spark cluster which then runs ADAM’s
`countKmers
CLI <#countKmers>`__. - adam-pipeline:
this workflow runs several stages in the ADAM
`transformAlignments
CLI <#transformAlignments>`__. This pipeline is the ADAM equivalent to the GATK’s “Best Practice” read preprocessing pipeline. We then stitch together this pipeline with BWA-MEM and the GATK in the adam-gatk-pipeline.
An example workflow: toil_scripts.adam_kmers.count_kmers
¶
For an example of how to use ADAM with Toil, let us look at the toil_scripts.adam_kmers.count_kmers module. This module has three parts:
- A main method that configures and launches a Toil workflow.
- A job function that launches both the Spark cluster service and the ADAM job.
- A child job function that calls ADAM and Conductor to transfer a BAM file from S3, convert that BAM file to Parquet, count k-mers, and upload the k-mer counts back to S3.
Configuring and launching Toil¶
Toil takes most of its configuration from the command line. To make this
easy, Toil includes a function in the toil.job.Job
class to register
Toil’s argument parsing code with the Python standard
``argparse` <https://docs.python.org/2/library/argparse.html>`__
library. E.g., in
``count_kmers.py` <https://github.com/BD2KGenomics/toil-scripts/blob/master/src/toil_scripts/adam_kmers/count_kmers.py#L183-L214>`__,
we set up our arguments and then add the Toil specific arguments by:
parser = argparse.ArgumentParser()
# add parser arguments
parser.add_argument('--input_path',
help='The full path to the input SAM/BAM/ADAM/FASTQ file.')
parser.add_argument('--output-path',
help='full path where final results will be output.')
parser.add_argument('--kmer-length',
help='Length to use for k-mer counting. Defaults to 20.',
default=20,
type=int)
parser.add_argument('--spark-conf',
help='Optional configuration to pass to Spark commands. Either this or --workers must be specified.',
default=None)
parser.add_argument('--memory',
help='Optional memory configuration for Spark workers/driver. This must be specified if --workers is specified.',
default=None,
type=int)
parser.add_argument('--cores',
help='Optional core configuration for Spark workers/driver. This must be specified if --workers is specified.',
default=None,
type=int)
parser.add_argument('--workers',
help='Number of workers to spin up in Toil. Either this or --spark-conf must be specified. If this is specified, --memory and --cores must be specified.',
default=None,
type=int)
parser.add_argument('--sudo',
help='Run docker containers with sudo. Defaults to False.',
default=False,
action='store_true')
Job.Runner.addToilOptions(parser)
Then, we parse the arguments and start Toil:
args = parser.parse_args()
Job.Runner.startToil(Job.wrapJobFn(kmer_dag,
args.kmer_length,
args.input_path,
args.output_path,
args.spark_conf,
args.workers,
args.cores,
args.memory,
args.sudo,
checkpoint=True), args)
Note that we are passing the parsed arguments to the
Job.Runner.startToil
function. The other argument that we are
passing is the
Job
that we would like Toil to run. In this example, Toil is wrapping the
kmer_dag
function (discussed in the next section) up as a Job. The
Job.wrapJobFn
call takes the kmer_dag
function and all of the
arguments that are being passed and serializes them so they can be run
locally or on a remote node. Additionally, we pass the optional argument
checkpoint=True
. This argument indicates that the kmer_dag
Job
function is a “checkpoint” job. If a job is a checkpoint job and any of
its children jobs fail, then we are saying that the workflow can be
successfully rerun from this point. In Toil, service jobs should always
be launched from a checkpointed job in order to allow the service jobs
to successfully resume after a service job failure.
More detailed information about launching a Toil workflow can be found in the Toil documentation.
Launching a Spark Service¶
In the toil_scripts.adam_kmers.count_kmers
example, we wrap the
kmer_dag
function as a job, and then use this function to launch a
Spark cluster as a set of service jobs using the toil_lib.spark
module. Once we’ve done that, we also launch a job to run ADAM by
starting the download_count_upload
child job function. We launch
the Spark service
cluster
by calling the spawn_spark_cluster
function, which was imported
from the toil_lib.spark
module:
master_hostname = spawn_spark_cluster(job,
workers,
cores)
This function takes in three parameters:
job
: A handle to the currently running Toil Job. This is used to enqueue the service jobs needed to start the Spark cluster.workers
: The number of Spark workers to allocate.cores
: The number of cores to request per worker/leader node.
When called, this method does not return a hostname string. Rather, it
returns a
promise
for the hostname string. This promise is not valid inside of the
kmer_dag
job, but will be valid in the child job
(download_count_upload
) that runs Spark. Toil cannot guarantee that
the Spark Service job will start until after the job that enqueues it
completes.
Finally, we enqueue the child job that runs ADAM and Conductor:
job.addChildJobFn(download_count_upload,
masterHostname,
input_file, output_file, kmer_length,
spark_conf, memory, sudo)
Detailed documentation for the toil_lib.spark
module can be found in
the toil-lib
docs.
Running ADAM and other Spark applications¶
Once we have enqueued the Spark service jobs and the child job that interacts with the services, we can launch Spark applications from the child job. In our example application, our child job function does the following work:
if master_ip is not None:
hdfs_dir = "hdfs://{0}:{1}/".format(master_ip, HDFS_MASTER_PORT)
else:
_log.warn('Master IP is not set. If default filesystem is not set, jobs may fail.')
hdfs_dir = ""
# if the file is not already in hdfs, copy it in
hdfs_input_file = hdfs_dir
if input_file.startswith("s3://"):
# append the s3 file name to our hdfs path
hdfs_input_file += input_file.split("/")[-1]
# run the download
_log.info("Downloading input file %s to %s.", input_file, hdfs_input_file)
call_conductor(master_ip, input_file, hdfs_input_file,
memory=memory, override_parameters=spark_conf)
# do we need to convert to adam?
if (hdfs_input_file.endswith('.bam') or
hdfs_input_file.endswith('.sam') or
hdfs_input_file.endswith('.fq') or
hdfs_input_file.endswith('.fastq')):
hdfs_tmp_file = hdfs_input_file
# change the file extension to adam
hdfs_input_file = '.'.join(hdfs_input_file.split('.')[:-1].append('adam'))
# convert the file
_log.info('Converting %s into ADAM format at %s.', hdfs_tmp_file, hdfs_input_file)
call_adam(master_ip,
['transformAlignments',
hdfs_tmp_file, hdfs_input_file],
memory=memory, override_parameters=spark_conf)
# run k-mer counting
_log.info('Counting %d-mers in %s, and saving to %s.',
kmer_length, hdfs_input_file, hdfs_output_file)
call_adam(master_ip,
['countKmers',
hdfs_input_file, hdfs_output_file,
str(kmer_length)],
memory=memory, override_parameters=spark_conf)
# do we need to upload the file back? if so, run upload
if run_upload:
_log.info("Uploading output file %s to %s.", hdfs_output_file, output_file)
call_conductor(master_ip, hdfs_output_file, output_file,
memory=memory, override_parameters=spark_conf)
The call_adam
and call_conductor
functions are imported from the
toil_lib.tools.spark_tools
module. These functions run ADAM and
Conductor using Docker containers from
cgl-docker-lib. [1]
These two functions launch the Docker containers using the
call_docker
function from the toil_lib.programs
module, and do
some basic configuration of the command line. In the ADAM example, all
the user needs to pass is the exact arguments that they would like run
from the ADAM CLI, and the Spark configuration parameters that are
passed to the adam-submit
script are automatically configured.
As you may have noticed, all of this functionality is contained in a single Toil job. This is important for fault tolerance. Toil provides tolerance against data loss through the use of a file store, which manages the persistance of local files to a persistant store (e.g., S3). Since we store intermediate files in HDFS, thus bypassing the file store, our intermediate results are not persistant, and thus individual Spark applications are not atomic.
Using PySpark in Toil¶
As an aside, a nice benefit of Toil is that we can run PySpark jobs
inline with Toil workflows. A small demo of this is seen in the
toil_lib.spark
unit
tests:
def _count_child(job, masterHostname):
# noinspection PyUnresolvedReferences
from pyspark import SparkContext
# start spark context and connect to cluster
sc = SparkContext(master='spark://%s:7077' % masterHostname,
appName='count_test')
# create an rdd containing 0-9999 split across 10 partitions
rdd = sc.parallelize(xrange(10000), 10)
# and now, count it
assert rdd.count() == 10000
[1] | These containers are published on Quay. |