DNAnexus Documentation
APIDownloadsIndex of dx CommandsLegal
  • Overview
  • Getting Started
    • DNAnexus Essentials
    • Key Concepts
      • Projects
      • Organizations
      • Apps and Workflows
    • User Interface Quickstart
    • Command Line Quickstart
    • Developer Quickstart
    • Developer Tutorials
      • Bash
        • Bash Helpers
        • Distributed by Chr (sh)
        • Distributed by Region (sh)
        • SAMtools count
        • TensorBoard Example Web App
        • Git Dependency
        • Mkfifo and dx cat
        • Parallel by Region (sh)
        • Parallel xargs by Chr
        • Precompiled Binary
        • R Shiny Example Web App
      • Python
        • Dash Example Web App
        • Distributed by Region (py)
        • Parallel by Chr (py)
        • Parallel by Region (py)
        • Pysam
      • Web App(let) Tutorials
        • Dash Example Web App
        • TensorBoard Example Web App
      • Concurrent Computing Tutorials
        • Distributed
          • Distributed by Region (sh)
          • Distributed by Chr (sh)
          • Distributed by Region (py)
        • Parallel
          • Parallel by Chr (py)
          • Parallel by Region (py)
          • Parallel by Region (sh)
          • Parallel xargs by Chr
  • User
    • Login and Logout
    • Projects
      • Project Navigation
      • Path Resolution
    • Running Apps and Workflows
      • Running Apps and Applets
      • Running Workflows
      • Running Nextflow Pipelines
      • Running Batch Jobs
      • Monitoring Executions
      • Job Notifications
      • Job Lifecycle
      • Executions and Time Limits
      • Executions and Cost and Spending Limits
      • Smart Reuse (Job Reuse)
      • Apps and Workflows Glossary
      • Tools List
    • Cohort Browser
      • Chart Types
        • Row Chart
        • Histogram
        • Box Plot
        • List View
        • Grouped Box Plot
        • Stacked Row Chart
        • Scatter Plot
        • Kaplan-Meier Survival Curve
      • Locus Details Page
    • Using DXJupyterLab
      • DXJupyterLab Quickstart
      • Running DXJupyterLab
        • FreeSurfer in DXJupyterLab
      • Spark Cluster-Enabled DXJupyterLab
        • Exploring and Querying Datasets
      • Stata in DXJupyterLab
      • Running Older Versions of DXJupyterLab
      • DXJupyterLab Reference
    • Using Spark
      • Apollo Apps
      • Connect to Thrift
      • Example Applications
        • CSV Loader
        • SQL Runner
        • VCF Loader
      • VCF Preprocessing
    • Environment Variables
    • Objects
      • Describing Data Objects
      • Searching Data Objects
      • Visualizing Data
      • Filtering Objects and Jobs
      • Archiving Files
      • Relational Database Clusters
      • Symlinks
      • Uploading and Downloading Files
        • Small File Sets
          • dx upload
          • dx download
        • Batch
          • Upload Agent
          • Download Agent
    • Platform IDs
    • Organization Member Guide
    • Index of dx commands
  • Developer
    • Developing Portable Pipelines
      • dxCompiler
    • Cloud Workstation
    • Apps
      • Introduction to Building Apps
      • App Build Process
      • Advanced Applet Tutorial
      • Bash Apps
      • Python Apps
      • Spark Apps
        • Table Exporter
        • DX Spark Submit Utility
      • HTTPS Apps
        • Isolated Browsing for HTTPS Apps
      • Transitioning from Applets to Apps
      • Third Party and Community Apps
        • Community App Guidelines
        • Third Party App Style Guide
        • Third Party App Publishing Checklist
      • App Metadata
      • App Permissions
      • App Execution Environment
        • Connecting to Jobs
      • Dependency Management
        • Asset Build Process
        • Docker Images
        • Python package installation in Ubuntu 24.04 AEE
      • Job Identity Tokens for Access to Clouds and Third-Party Services
      • Enabling Web Application Users to Log In with DNAnexus Credentials
      • Types of Errors
    • Workflows
      • Importing Workflows
      • Introduction to Building Workflows
      • Building and Running Workflows
      • Workflow Build Process
      • Versioning and Publishing Global Workflows
      • Workflow Metadata
    • Ingesting Data
      • Molecular Expression Assay Loader
        • Common Errors
        • Example Usage
        • Example Input
      • Data Model Loader
        • Data Ingestion Key Steps
        • Ingestion Data Types
        • Data Files Used by the Data Model Loader
        • Troubleshooting
      • Dataset Extender
        • Using Dataset Extender
    • Dataset Management
      • Rebase Cohorts and Dashboards
      • Assay Dataset Merger
      • Clinical Dataset Merger
    • Apollo Datasets
      • Dataset Versions
      • Cohorts
    • Creating Custom Viewers
    • Client Libraries
      • Support for Python 3
    • Walkthroughs
      • Creating a Mixed Phenotypic Assay Dataset
      • Guide for Ingesting a Simple Four Table Dataset
    • DNAnexus API
      • Entity IDs
      • Protocols
      • Authentication
      • Regions
      • Nonces
      • Users
      • Organizations
      • OIDC Clients
      • Data Containers
        • Folders and Deletion
        • Cloning
        • Project API Methods
        • Project Permissions and Sharing
      • Data Object Lifecycle
        • Types
        • Object Details
        • Visibility
      • Data Object Metadata
        • Name
        • Properties
        • Tags
      • Data Object Classes
        • Records
        • Files
        • Databases
        • Drives
        • DBClusters
      • Running Analyses
        • I/O and Run Specifications
        • Instance Types
        • Job Input and Output
        • Applets and Entry Points
        • Apps
        • Workflows and Analyses
        • Global Workflows
        • Containers for Execution
      • Search
      • System Methods
      • Directory of API Methods
      • DNAnexus Service Limits
  • Administrator
    • Billing
    • Org Management
    • Single Sign-On
    • Audit Trail
    • Integrating with External Services
    • Portal Setup
    • GxP
      • Controlled Tool Access (allowed executables)
  • Science Corner
    • Scientific Guides
      • Somatic Small Variant and CNV Discovery Workflow Walkthrough
      • SAIGE GWAS Walkthrough
      • LocusZoom DNAnexus App
      • Human Reference Genomes
    • Using Hail to Analyze Genomic Data
    • Open-Source Tools by DNAnexus Scientists
    • Using IGV Locally with DNAnexus
  • Downloads
  • FAQs
    • EOL Documentation
      • Python 3 Support and Python 2 End of Life (EOL)
    • Automating Analysis Workflow
    • Backups of Customer Data
    • Developing Apps and Applets
    • Importing Data
    • Platform Uptime
    • Legal and Compliance
    • Sharing and Collaboration
    • Product Version Numbering
  • Release Notes
  • Technical Support
  • Legal
Powered by GitBook

Copyright 2025 DNAnexus

On this page
  • How is the SAMtools dependency provided?
  • Download Inputs
  • Split workload

Was this helpful?

Export as PDF
  1. Getting Started
  2. Developer Tutorials
  3. Concurrent Computing Tutorials
  4. Parallel

Parallel by Region (py)

This applet tutorial will perform a SAMtools count using parallel threads.

Last updated 1 year ago

Was this helpful?

In order to take full advantage of the scalability that cloud computing offers, our scripts have to implement the correct methodologies. This applet tutorial will:

  1. Install SAMtools

  2. Download BAM file

  3. Split workload

  4. Count regions in parallel

How is the SAMtools dependency provided?

The SAMtools dependency is resolved by declaring an package in the dxapp.json runSpec.execDepends field.

{
  "runSpec": {
    ...
    "execDepends": [
      {"name": "samtools"}
    ]
  }

Download Inputs

This applet downloads all inputs at once using dxpy.download_all_inputs:

inputs = dxpy.download_all_inputs()
# download_all_inputs returns a dictionary that contains mapping from inputs to file locations.
# Additionaly, helper keys, value pairs are added to the dicitonary, similar to bash helper functions
inputs
#     mappings_sorted_bam_path: [u'/home/dnanexus/in/mappings_sorted_bam/SRR504516.bam']
#     mappings_sorted_bam_name: u'SRR504516.bam'
#     mappings_sorted_bam_prefix: u'SRR504516'
#     mappings_sorted_bai_path: u'/home/dnanexus/in/mappings_sorted_bai/SRR504516.bam.bai'
#     mappings_sorted_bai_name: u'SRR504516.bam.bai'
#     mappings_sorted_bai_prefix: u'SRR504516'

Split workload

We process in parallel using the python multiprocessing module using a rather simple pattern shown below:

print("Number of cpus: {0}".format(cpu_count()))  # Get cpu count from multiprocessing
worker_pool = Pool(processes=cpu_count())         # Create a pool of workers, 1 for each core
results = worker_pool.map(run_cmd, collection)    # map run_cmds to a collection
                                                  # Pool.map will handle orchestrating the job
worker_pool.close()
worker_pool.join()  # Make sure to close and join workers when done

We create several helpers in our applet script to manage our workload. One helper you may have seen before is run_cmd; we use this function to manage or subprocess calls:

def run_cmd(cmd_arr):
    """Run shell command.
    Helper function to simplify the pool.map() call in our parallelization.
    Raises OSError if command specified (index 0 in cmd_arr) isn't valid
    """
    proc = subprocess.Popen(
        cmd_arr,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    stdout, stderr = proc.communicate()
    exit_code = proc.returncode
    proc_tuple = (stdout, stderr, exit_code)
    return proc_tuple

Before we can split our workload, we need to know what regions are present in our BAM input file. We handle this initial parsing in the parse_sam_header_for_region function:

def parse_sam_header_for_region(bamfile_path):
    """Helper function to match SN regions contained in SAM header

    Returns:
        regions (list[string]): list of regions in bam header
    """
    header_cmd = ['samtools', 'view', '-H', bamfile_path]
    print('parsing SAM headers:', " ".join(header_cmd))
    headers_str = subprocess.check_output(header_cmd).decode("utf-8")
    rgx = re.compile(r'SN:(\S+)\s')
    regions = rgx.findall(headers_str)
    return regions

Once our workload is split and we’ve started processing, we wait and review the status of each Pool worker. Then, we merge and output our results.

# Write results to file
resultfn = inputs['mappings_sorted_bam_name'][0]
resultfn = (
    resultfn[:-4] + '_count.txt'
    if resultfn.endswith(".bam")
    else resultfn + '_count.txt')
with open(resultfn, 'w') as f:
    sum_reads = 0
    for res, reg in zip(results, regions):
        read_count = int(res[0])
        sum_reads += read_count
        f.write("Region {0}: {1}\n".format(reg, read_count))
    f.write("Total reads: {0}".format(sum_reads))

count_file = dxpy.upload_local_file(resultfn)
output = {}
output["count_file"] = dxpy.dxlink(count_file)
return output

Note: The run_cmd function returns a tuple containing the stdout, stderr, and exit code of the subprocess call. We parse these outputs from our workers to determine whether the run failed or passed.

def verify_pool_status(proc_tuples):
    """
    Helper to verify worker succeeded.

    As failed commands are detected we write the stderr from that command
    to the job_error.json file. This file will be printed to the platform
    job log on App failure.
    """
    all_succeed = True
    err_msgs = []
    for proc in proc_tuples:
        if proc[2] != 0:
            all_succeed = False
            err_msgs.append(proc[1])
    if err_msgs:
        raise dxpy.exceptions.AppInternalError(b"\n".join(err_msgs))

This convenient pattern allows you to quickly orchestrate jobs on a worker. For more detailed overview of the multiprocessing module, visit the .

View full source code on GitHub
Apt-Get
python docs