Distributed by Region (py)

This applet creates a count of reads from a BAM format file.

View full source code on GitHub

How is the SAMtools dependency provided?

The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.json runSpec.execDepends.

  "runSpec": {
    ...
    "execDepends": [
      {"name": "samtools"}
    ]
  }

For additional information, refer to the execDepends documentation.

Entry Points

Distributed Python-interpreter apps use Python decorators on functions to declare entry points. This app has the following entry points as decorated functions:

  • main

  • samtoolscount_bam

  • combine_files

Entry points are executed on a new worker with their own system requirements. In this example, the applet splits and merges files on basic mem1_ssd1_x2 instances and performs a more intensive processing step on a mem1_ssd1_x4 instance. Instance type can be set in the dxapp.json's runSpec.systemRequirements:

  "runSpec": {
    ...
    "systemRequirements": {
      "main": {
        "instanceType": "mem1_ssd1_x2"
      },
      "samtoolscount_bam": {
        "instanceType": "mem1_ssd1_x4"
      },
      "combine_files": {
        "instanceType": "mem1_ssd1_x2"
      }
    },
    ...
  }

main

The main function scatters by region bins based on user input. If no *.bai file is present, the applet generates an index *.bai.

regions = parseSAM_header_for_region(filename)
split_regions = [regions[i:i + region_size]
                  for i in range(0, len(regions), region_size)]

if not index_file:
    mappings_bam, index_file = create_index_file(filename, mappings_bam)

Regions bins are passed to the samtoolscount_bam entry points in the dxpy.new_dxjob function.

print('creating subjobs')
subjobs = [dxpy.new_dxjob(
            fn_input={"region_list": split,
                      "mappings_bam": mappings_bam,
                      "index_file": index_file},
            fn_name="samtoolscount_bam")
            for split in split_regions]

fileDXLinks = [subjob.get_output_ref("readcount_fileDX")
                for subjob in subjobs]

Outputs from the samtoolscount_bam entry points are used as inputs for the combine_files entry point. The output of the combine_files entry point is used as the output of the main entry point.

print('combining outputs')
postprocess_job = dxpy.new_dxjob(
    fn_input={"countDXlinks": fileDXLinks, "resultfn": filename},
    fn_name="combine_files")

countDXLink = postprocess_job.get_output_ref("countDXLink")

output = {}
output["count_file"] = countDXLink

return output

samtoolscount_bam

This entry point downloads and creates a samtools view -c command for each region in the input bin. The dictionary returned from dxpy.download_all_inputs() is used to reference input names and paths.

def samtoolscount_bam(region_list, mappings_bam, index_file):
    """Processing function.

    Arguments:
        region_list (list[str]): Regions to count in BAM
        mappings_bam (dict): dxlink to input BAM
        index_file (dict): dxlink to input BAM

    Returns:
        Dictionary containing dxlinks to the uploaded read counts file
    """
    #
    # Download inputs
    # -------------------------------------------------------------------
    # dxpy.download_all_inputs will download all input files into
    # the /home/dnanexus/in directory.  A folder will be created for each
    # input and the file(s) will be download to that directory.
    #
    # In this example, the dictionary has the following key-value pairs
    # Note that the values are all list
    #     mappings_bam_path: [u'/home/dnanexus/in/mappings_bam/<bam filename>.bam']
    #     mappings_bam_name: [u'<bam filename>.bam']
    #     mappings_bam_prefix: [u'<bam filename>']
    #     index_file_path: [u'/home/dnanexus/in/index_file/<bam filename>.bam.bai']
    #     index_file_name: [u'<bam filename>.bam.bai']
    #     index_file_prefix: [u'<bam filename>']
    #

    inputs = dxpy.download_all_inputs()

    # SAMtools view command requires the bam and index file to be in the same
    shutil.move(inputs['mappings_bam_path'][0], os.getcwd())
    shutil.move(inputs['index_file_path'][0], os.getcwd())
    input_bam = inputs['mappings_bam_name'][0]

    #
    # Per region perform SAMtools count.
    # --------------------------------------------------------------
    # Output count for regions and return DXLink as job output to
    # allow other entry points to download job output.
    #

    with open('read_count_regions.txt', 'w') as f:
        for region in region_list:
                view_cmd = create_region_view_cmd(input_bam, region)
                region_proc_result = run_cmd(view_cmd)
                region_count = int(region_proc_result[0])
                f.write("Region {0}: {1}\n".format(region, region_count))
    readcountDXFile = dxpy.upload_local_file("read_count_regions.txt")
    readCountDXlink = dxpy.dxlink(readcountDXFile.get_id())

    return {"readcount_fileDX": readCountDXlink}

This entry point returns {"readcount_fileDX": readCountDXlink}, a JBOR referencing an uploaded text file. This approach to scatter-gather stores the results in files and uploads/downloads the information as needed. This approach exaggerates a scatter-gather for tutorial purposes. You're able to pass types other than file such as int.

combine_files

The main entry point triggers this subjob, providing the output of samtoolscount_bam as an input. This entry point gathers all the files generated by the samtoolscount_bam jobs and sums them.

def combine_files(countDXlinks, resultfn):
    """The 'gather' subjob of the applet.

    Arguments:
        countDXlinks (list[dict]): list of DXlinks to process job output files.
        resultfn (str): Filename to use for job output file.

    Returns:
        DXLink for the main function to return as the job output.

    Note: Only the DXLinks are passed as parameters.
    Subjobs work on a fresh instance so files must be downloaded to the machine
    """
    if resultfn.endswith(".bam"):
        resultfn = resultfn[:-4] + '.txt'

    sum_reads = 0
    with open(resultfn, 'w') as f:
        for i, dxlink in enumerate(countDXlinks):
            dxfile = dxpy.DXFile(dxlink)
            filename = "countfile{0}".format(i)
            dxpy.download_dxfile(dxfile, filename)
            with open(filename, 'r') as fsub:
                for line in fsub:
                    sum_reads += parse_line_for_readcount(line)
                    f.write(line)
        f.write('Total Reads: {0}'.format(sum_reads))

    countDXFile = dxpy.upload_local_file(resultfn)
    countDXlink = dxpy.dxlink(countDXFile.get_id())

    return {"countDXLink": countDXlink}

Important: While the main entry point triggers the processing and gathering entry points, remember that the main entry point doesn't do any heavy lifting or processing. The .runSpec JSON shows a workflow that starts with a lightweight instance, scales up for the processing entry point, and then scales down for the gathering step.

Last updated

Was this helpful?