Learn important terminology before using parallel and distributed computing paradigms on the DNAnexus Platform.
There are many definitions and approaches to tackling the concept of parallelization and distributing workloads in the cloud (Here’s a particularly helpful Stack Exchange post on the subject). To help make our documentation easier to understand, when discussing concurrent computing paradigms we’ll refer to:
Parallel: Using multiple threads or logical cores to concurrently process a workload.
Distributed: Using multiple machines (in our case instances in the cloud) that communicate to concurrently process a workload.
Keep these formal definitions in mind as you read through the tutorials and learn how to compute concurrently on the DNAnexus platform.
Distributed bash-interpreter apps use bash functions to declare entry points. Entry points are executed as subjobs on new workers with their own respective system requirements. This app has the following entry points specified as bash functions:
main
count_func
sum_reads
The main function takes the initial *.bam
, generates an index *.bai
if needed, and obtains the list of regions from the *.bam
file. Every 10 regions will be sent, as input, to the count_func entry point using dx-jobutil-new-job
command.
Job outputs from the count_func entry point are referenced as Job Based Object References (JBOR) and used as inputs for the sum_reads entry point.
Job outputs of the sum_reads entry point is used as the output of the main entry point via JBOR reference in the dx-jobutil-add-output
command.
This entry point performs a SAMtools count of the 10 regions passed as input. This execution will be run on a new worker. As a result variables from other functions (e.g. main()
) will not be accessible here.
Once the output file with counts is created, it is uploaded to the platform and assigned as the entry point’s job output counts_txt
via the command dx-jobutil-add-output
.
The main entry point triggers this subjob, providing the output of count_func as an input JBOR. This entry point gathers all the readcount.txt
files generated by the count_func jobs and sums the totals.
This entry point returns read_sum
as a JBOR, which is then referenced as job output.
In the main function, the output is referenced
View full source code on GitHub
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json
file’s runSpec.execDepends
.
For additional information, see execDepends
.
Distributed bash-interpreter apps use bash functions to declare entry points. This app has the following entry points specified as bash functions:
main
count_func
sum_reads
Entry points are executed on a new worker with its own system requirements. The instance type can be set in the dxapp.json
file’s runSpec.systemRequirements
:
The main function slices the initial *.bam
file and generates an index *.bai
if needed. The input *.bam
is the sliced into smaller *.bam
files containing only reads from canonical chromosomes. First, the main function downloads the BAM file and gets the headers.
Sliced *.bam
files are uploaded and their file IDs are passed to the count_func entry point using the dx-jobutil-new-job
command.
Outputs from the count_func entry points are referenced as Job Based Object References (JBOR) and used as inputs for the sum_reads entry point.
The output of the sum_reads entry point is used as the output of the main entry point via JBOR reference using the command dx-jobutil-add-output
.
This entry point downloads and runs the command samtools view -c
on the sliced *.bam
. The generated counts_txt
output file is uploaded as the entry point’s job output via the command dx-jobutil-add-output
.
The main entry point triggers this sub job, providing the output of count_func as an input. This entry point gathers all the files generated by the count_func jobs and sums them.
This function returns read_sum_file
as the entry point output.
This applet creates a count of reads from a BAM format file.
View full source code on GitHub
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json
runSpec.execDepends
.
For additional information, please refer to the execDepends
documentation .
Distributed python-interpreter apps use python decorators on functions to declare entry points. This app has the following entry points as decorated functions:
main
samtoolscount_bam
combine_files
Entry points are executed on a new worker with their own system requirements. In this example, we split and merge our files on basic mem1_ssd1_x2 instances and perform our own, more intensive, processing step on a mem1_ssd1_x4 instance. Instance type can be set in the dxapp.json runSpec.systemRequirements
:
The main function scatters by region bins based on user input. If no *.bai
file is present, the applet generates an index *.bai
.
Regions bins are passed to the samtoolscount_bam entry point using the dxpy.new_dxjob
function.
Outputs from the samtoolscount_bam entry points are used as inputs for the combine_files entry point. The output of the combine_files entry point is used as the output of the main entry point.
This entry point downloads and creates a samtools view -c
command for each region in the input bin. The dictionary returned from dxpy.download_all_inputs()
is used to reference input names and paths.
This entry point returns {"readcount_fileDX": readCountDXlink}
, a JBOR referencing an uploaded text file. This approach to scatter-gather stores the results in files and uploads/downloads the information as needed. This approach exaggerates a scatter-gather for tutorial purposes. You’re able to pass types other than file such as int.
The main entry point triggers this subjob, providing the output of samtoolscount_bam as an input. This entry point gathers all the files generated by the samtoolscount_bam jobs and sums them.
Important: While the main entry point triggers the processing and gathering entry points, keep in mind the main entry point doesn’t do any heavy lifting or processing. Notice in the .runSpec
json above we start with a lightweight instance, scale up for the processing entry point, then finally scale down for the gathering step.
This applet tutorial will perform a SAMtools count using parallel threads.
View full source code on GitHub
In order to take full advantage of the scalability that cloud computing offers, our scripts have to implement the correct methodologies. This applet tutorial will:
Install SAMtools
Download BAM file
Count regions in parallel
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json
runSpec.execDepends
.
For additional information, please refer to the execDepends
documentation.
The dxpy.download_all_inputs()
function downloads all input files into the /home/dnanexus/in
directory. A folder will be created for each input and the file(s) will be downloaded to that directory. For convenience, the dxpy.download_all_inputs
function returns a dictionary containing the following keys:
<var>_path
(string): full absolute path to where the file was downloaded.
<var>_name
(string): name of the file, including extention.
<var>_prefix
(string): name of the file minus the longest matching pattern found in the dxapp.json I/O pattern field.
The path, name, and prefix key-value pattern is repeated for all applet file class inputs specified in the dxapp.json. In this example, our dictionary has the following key-value pairs:
Before we can perform our parallel SAMtools count, we must determine the workload for each thread. We arbitrarily set our number of workers to 10
and set the workload per thread to 1
chromosome at a time. There are various ways to achieve multithreaded processing in python. For the sake of simplicity, we use multiprocessing.dummy
, a wrapper around Python’s threading module.
Each worker creates a string to be called in a subprocess.Popen
call. We use the multiprocessing.dummy.Pool.map(<func>, <iterable>)
function to call the helper function run_cmd
for each string in the iterable of view commands. Because we perform our multithreaded processing using subprocess.Popen
, we will not be alerted to any failed processes. We verify our closed workers in the verify_pool_status
helper function.
Important: In this example we use subprocess.Popen
to process and verify our results in verify_pool_status
. In general, it is considered good practice to use python’s built-in subprocess convenience functions. In this case, subprocess.check_call
would achieve the same goal.
Each worker returns a read count of just one region in the BAM file. We sum and output the results as the job output. We use the dx-toolkit python SDK’s dxpy.upload_local_file
function to upload and generate a DXFile corresponding to our result file. For python, job outputs have to be a dictionary of key-value pairs, with the keys being job output names as defined in the dxapp.json
and the values being the output values for corresponding output classes. For files, the output type is a DXLink. We use the dxpy.dxlink
function to generate the appropriate DXLink value.
This applet tutorial will perform a SAMtools count using parallel threads.
View full source code on GitHub
In order to take full advantage of the scalability that cloud computing offers, our scripts have to implement the correct methodologies. This applet tutorial will:
Install SAMtools
Download BAM file
Split workload
Count regions in parallel
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json
runSpec.execDepends
field.
This applet downloads all inputs at once using dxpy.download_all_inputs
:
We process in parallel using the python multiprocessing
module using a rather simple pattern shown below:
This convenient pattern allows you to quickly orchestrate jobs on a worker. For more detailed overview of the multiprocessing
module, visit the python docs.
We create several helpers in our applet script to manage our workload. One helper you may have seen before is run_cmd
; we use this function to manage or subprocess calls:
Before we can split our workload, we need to know what regions are present in our BAM input file. We handle this initial parsing in the parse_sam_header_for_region
function:
Once our workload is split and we’ve started processing, we wait and review the status of each Pool
worker. Then, we merge and output our results.
Note: The run_cmd
function returns a tuple containing the stdout, stderr, and exit code of the subprocess call. We parse these outputs from our workers to determine whether the run failed or passed.
This applet performs a basic SAMtools count on a series of sliced (by canonical chromosome) BAM files in parallel using wait.
View full source code on GitHub
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json
runSpec.execDepends
.
The command set -e -x -o pipefail
will assist you in debugging this applet:
-e
causes the shell to immediately exit if a command returns a non-zero exit code.
-x
prints commands as they are executed, which is very useful for tracking the job’s status or pinpointing the exact execution failure.
-o pipefail
makes the return code the first non-zero exit code. (Typically, the return code of pipes is the exit code of the last command, which can create difficult to debug problems.)
The *.bai
file was an optional job input. You can check for an empty or unset var
using the bash built-in test [[ - z ${var}} ]]
. Then, you can download or create a *.bai
index as needed.
Bash’s job control system allows for easy management of multiple processes. In this example, you can run bash commands in the background as you control maximum job executions in the foreground. Place processes in the background using the character &
after a command.
Once the input bam has been sliced, counted, and summed, the output counts_txt
is uploaded using the command dx-upload-all-outputs
. The following directory structure required for dx-upload-all-outputs is below:
In your applet, upload all outputs by:
This applet slices a BAM file by canonical chromosome then performs a parallelized samtools view -c using xargs. Type man xargs for general usage information.
View full source code on GitHub
The SAMtools compiled binary is placed directory in the <applet dir>/resources
directory. Any files found in the resources/
directory will be uploaded so that they will be present in the root directory of the worker. In our case:
When this applet is run on a worker, the resources/
folder will be placed in the worker’s root directory /
:
/usr/bin
is part of the $PATH
variable, so in our script, we can reference the samtools command directly, as in samtools view -c ...
First, we download our BAM file and slice it by canonical chromosome, writing the *bam
file names to another file.
In order to split a BAM by regions, we need to have a *.bai
index. You can either create an app(let) which takes the *.bai
as an input or generate a *.bai
in the applet. In this tutorial, we generate the *.bai
in the applet, sorting the BAM if necessary.
In the previous section, we recorded the name of each sliced BAM file into a record file. Now we will perform a samtools view -c
on each slice using the record file as input.
The results file is uploaded using the standard bash process:
Upload a file to the job execution’s container.
Provide the DNAnexus link as a job’s output using the script dx-jobutil-add-output <output name>