Parallel by Region (py)

This applet tutorial will perform a SAMtools count using parallel threads.

View full source code on GitHub

In order to take full advantage of the scalability that cloud computing offers, our scripts have to implement the correct methodologies. This applet tutorial will:

  1. Install SAMtools

  2. Download BAM file

  3. Split workload

  4. Count regions in parallel

How is the SAMtools dependency provided?

The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json runSpec.execDepends field.

{
  "runSpec": {
    ...
    "execDepends": [
      {"name": "samtools"}
    ]
  }

Download Inputs

This applet downloads all inputs at once using dxpy.download_all_inputs:

inputs = dxpy.download_all_inputs()
# download_all_inputs returns a dictionary that contains mapping from inputs to file locations.
# Additionaly, helper keys, value pairs are added to the dicitonary, similar to bash helper functions
inputs
#     mappings_sorted_bam_path: [u'/home/dnanexus/in/mappings_sorted_bam/SRR504516.bam']
#     mappings_sorted_bam_name: u'SRR504516.bam'
#     mappings_sorted_bam_prefix: u'SRR504516'
#     mappings_sorted_bai_path: u'/home/dnanexus/in/mappings_sorted_bai/SRR504516.bam.bai'
#     mappings_sorted_bai_name: u'SRR504516.bam.bai'
#     mappings_sorted_bai_prefix: u'SRR504516'

Split workload

We process in parallel using the python multiprocessing module using a rather simple pattern shown below:

print("Number of cpus: {0}".format(cpu_count()))  # Get cpu count from multiprocessing
worker_pool = Pool(processes=cpu_count())         # Create a pool of workers, 1 for each core
results = worker_pool.map(run_cmd, collection)    # map run_cmds to a collection
                                                  # Pool.map will handle orchestrating the job
worker_pool.close()
worker_pool.join()  # Make sure to close and join workers when done

This convenient pattern allows you to quickly orchestrate jobs on a worker. For more detailed overview of the multiprocessing module, visit the python docs.

We create several helpers in our applet script to manage our workload. One helper you may have seen before is run_cmd; we use this function to manage or subprocess calls:

def run_cmd(cmd_arr):
    """Run shell command.
    Helper function to simplify the pool.map() call in our parallelization.
    Raises OSError if command specified (index 0 in cmd_arr) isn't valid
    """
    proc = subprocess.Popen(
        cmd_arr,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    stdout, stderr = proc.communicate()
    exit_code = proc.returncode
    proc_tuple = (stdout, stderr, exit_code)
    return proc_tuple

Before we can split our workload, we need to know what regions are present in our BAM input file. We handle this initial parsing in the parse_sam_header_for_region function:

def parse_sam_header_for_region(bamfile_path):
    """Helper function to match SN regions contained in SAM header

    Returns:
        regions (list[string]): list of regions in bam header
    """
    header_cmd = ['samtools', 'view', '-H', bamfile_path]
    print('parsing SAM headers:', " ".join(header_cmd))
    headers_str = subprocess.check_output(header_cmd).decode("utf-8")
    rgx = re.compile(r'SN:(\S+)\s')
    regions = rgx.findall(headers_str)
    return regions

Once our workload is split and we’ve started processing, we wait and review the status of each Pool worker. Then, we merge and output our results.

# Write results to file
resultfn = inputs['mappings_sorted_bam_name'][0]
resultfn = (
    resultfn[:-4] + '_count.txt'
    if resultfn.endswith(".bam")
    else resultfn + '_count.txt')
with open(resultfn, 'w') as f:
    sum_reads = 0
    for res, reg in zip(results, regions):
        read_count = int(res[0])
        sum_reads += read_count
        f.write("Region {0}: {1}\n".format(reg, read_count))
    f.write("Total reads: {0}".format(sum_reads))

count_file = dxpy.upload_local_file(resultfn)
output = {}
output["count_file"] = dxpy.dxlink(count_file)
return output

Note: The run_cmd function returns a tuple containing the stdout, stderr, and exit code of the subprocess call. We parse these outputs from our workers to determine whether the run failed or passed.

def verify_pool_status(proc_tuples):
    """
    Helper to verify worker succeeded.

    As failed commands are detected we write the stderr from that command
    to the job_error.json file. This file will be printed to the platform
    job log on App failure.
    """
    all_succeed = True
    err_msgs = []
    for proc in proc_tuples:
        if proc[2] != 0:
            all_succeed = False
            err_msgs.append(proc[1])
    if err_msgs:
        raise dxpy.exceptions.AppInternalError(b"\n".join(err_msgs))

Last updated