# Parallel by Region (py)

[View full source code on GitHub](https://github.com/dnanexus/dnanexus-example-applets/tree/master/Tutorials/python/samtools_count_para_reg_multiprocess_py)

To take full advantage of the scalability that cloud computing offers, your scripts must implement the correct methodologies. This applet tutorial shows you how to:

1. Install SAMtools
2. Download BAM file
3. Split workload
4. Count regions in parallel

## How is the SAMtools dependency provided?

The SAMtools dependency is resolved by declaring an [Apt-Get](https://help.ubuntu.com/community/AptGet/Howto/) package in the `dxapp.json` `runSpec.execDepends` field.

```json
{
  "runSpec": {
    ...
    "execDepends": [
      {"name": "samtools"}
    ]
  }
```

## Download Inputs

This applet downloads all inputs at once using `dxpy.download_all_inputs`:

```python
inputs = dxpy.download_all_inputs()
# download_all_inputs returns a dictionary that contains a mapping from inputs to file locations.
# Additionally, helper key-value pairs are added to the dictionary, similar to Bash helper functions.
inputs
#     mappings_sorted_bam_path: ['/home/dnanexus/in/mappings_sorted_bam/SRR504516.bam']
#     mappings_sorted_bam_name: ['SRR504516.bam']
#     mappings_sorted_bam_prefix: ['SRR504516']
#     mappings_sorted_bai_path: ['/home/dnanexus/in/mappings_sorted_bai/SRR504516.bam.bai']
#     mappings_sorted_bai_name: ['SRR504516.bam.bai']
#     mappings_sorted_bai_prefix: ['SRR504516']
```

## Split workload

This tutorial processes data in parallel using the Python `multiprocessing` module with a straightforward pattern shown below:

```python
# Get cpu count from multiprocessing
print("Number of cpus: {0}".format(cpu_count()))

# Create a pool of workers, 1 for each core
worker_pool = Pool(processes=cpu_count())

# Map run_cmds to a collection
# Pool.map handles orchestrating the job
results = worker_pool.map(run_cmd, collection)

# Make sure to close and join workers when done
worker_pool.close()
worker_pool.join()
```

This convenient pattern allows you to orchestrate jobs on a worker. For a more detailed overview of the `multiprocessing` module, visit the [Python docs](https://docs.python.org/3/library/multiprocessing.html).

The applet script includes helper functions to manage the workload. One helper is `run_cmd`, which manages subprocess calls:

```python
def run_cmd(cmd_arr):
    """Run shell command.
    Helper function to simplify the pool.map() call in our parallelization.
    Raises OSError if command specified (index 0 in cmd_arr) isn't valid
    """
    proc = subprocess.Popen(
        cmd_arr,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    stdout, stderr = proc.communicate()
    exit_code = proc.returncode
    proc_tuple = (stdout, stderr, exit_code)
    return proc_tuple
```

Before splitting the workload, determine what regions are present in the BAM input file. This initial parsing is handled in the `parse_sam_header_for_region` function:

```python
def parse_sam_header_for_region(bamfile_path):
    """Helper function to match SN regions contained in SAM header

    Returns:
        regions (list[string]): list of regions in bam header
    """
    header_cmd = ['samtools', 'view', '-H', bamfile_path]
    print('parsing SAM headers:', " ".join(header_cmd))
    headers_str = subprocess.check_output(header_cmd).decode("utf-8")
    rgx = re.compile(r'SN:(\S+)\s')
    regions = rgx.findall(headers_str)
    return regions
```

Once the workload is split and processing has started, wait and review the status of each `Pool` worker. Then, merge and output the results.

```python
# Write results to file
resultfn = inputs['mappings_sorted_bam_name'][0]
resultfn = (
    resultfn[:-4] + '_count.txt'
    if resultfn.endswith(".bam")
    else resultfn + '_count.txt')
with open(resultfn, 'w') as f:
    sum_reads = 0
    for res, reg in zip(results, regions):
        read_count = int(res[0])
        sum_reads += read_count
        f.write("Region {0}: {1}\n".format(reg, read_count))
    f.write("Total reads: {0}".format(sum_reads))

count_file = dxpy.upload_local_file(resultfn)
output = {}
output["count_file"] = dxpy.dxlink(count_file)
return output
```

The `run_cmd` function returns a tuple containing the `stdout`, `stderr`, and exit code of the subprocess call. These outputs are parsed from the workers to determine whether the run failed or passed.

```python
def verify_pool_status(proc_tuples):
    """
    Helper to verify worker succeeded.

    As failed commands are detected, the `stderr` from that command is written
    to the job_error.json file. This file is printed to the Platform
    job log on App failure.
    """
    all_succeed = True
    err_msgs = []
    for proc in proc_tuples:
        if proc[2] != 0:
            all_succeed = False
            err_msgs.append(proc[1])
    if err_msgs:
        raise dxpy.exceptions.AppInternalError(b"\n".join(err_msgs))
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://documentation.dnanexus.com/getting-started/developer-tutorials/concurrent-computing-tutorials/parallel/parallel-by-region-py.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
