Distributed python-interpreter apps use python decorators on functions to declare entry points. This app has the following entry points as decorated functions:
main
samtoolscount_bam
combine_files
Entry points are executed on a new worker with their own system requirements. In this example, we split and merge our files on basic mem1_ssd1_x2 instances and perform our own, more intensive, processing step on a mem1_ssd1_x4 instance. Instance type can be set in the dxapp.json runSpec.systemRequirements:
The main function scatters by region bins based on user input. If no *.bai file is present, the applet generates an index *.bai.
regions =parseSAM_header_for_region(filename) split_regions = [regions[i:i + region_size]for i inrange(0, len(regions), region_size)]ifnot index_file: mappings_bam, index_file =create_index_file(filename, mappings_bam)
Regions bins are passed to the samtoolscount_bam entry point using the dxpy.new_dxjob function.
print('creating subjobs') subjobs = [dxpy.new_dxjob( fn_input={"region_list": split,"mappings_bam": mappings_bam,"index_file": index_file}, fn_name="samtoolscount_bam")for split in split_regions] fileDXLinks = [subjob.get_output_ref("readcount_fileDX")for subjob in subjobs]
Outputs from the samtoolscount_bam entry points are used as inputs for the combine_files entry point. The output of the combine_files entry point is used as the output of the main entry point.
This entry point downloads and creates a samtools view -c command for each region in the input bin. The dictionary returned from dxpy.download_all_inputs() is used to reference input names and paths.
defsamtoolscount_bam(region_list,mappings_bam,index_file):"""Processing function. Arguments: region_list (list[str]): Regions to count in BAM mappings_bam (dict): dxlink to input BAM index_file (dict): dxlink to input BAM Returns: Dictionary containing dxlinks to the uploaded read counts file """## Download inputs# -------------------------------------------------------------------# dxpy.download_all_inputs will download all input files into# the /home/dnanexus/in directory. A folder will be created for each# input and the file(s) will be download to that directory.## In this example our dictionary inputs has the following key, value pairs# Note that the values are all list# mappings_bam_path: [u'/home/dnanexus/in/mappings_bam/<bam filename>.bam']# mappings_bam_name: [u'<bam filename>.bam']# mappings_bam_prefix: [u'<bam filename>']# index_file_path: [u'/home/dnanexus/in/index_file/<bam filename>.bam.bai']# index_file_name: [u'<bam filename>.bam.bai']# index_file_prefix: [u'<bam filename>']# inputs = dxpy.download_all_inputs()# SAMtools view command requires the bam and index file to be in the same shutil.move(inputs['mappings_bam_path'][0], os.getcwd()) shutil.move(inputs['index_file_path'][0], os.getcwd()) input_bam = inputs['mappings_bam_name'][0]## Per region perform SAMtools count.# --------------------------------------------------------------# Output count for regions and return DXLink as job output to# allow other entry points to download job output.#withopen('read_count_regions.txt', 'w')as f:for region in region_list: view_cmd =create_region_view_cmd(input_bam, region) region_proc_result =run_cmd(view_cmd) region_count =int(region_proc_result[0]) f.write("Region {0}: {1}\n".format(region, region_count)) readcountDXFile = dxpy.upload_local_file("read_count_regions.txt") readCountDXlink = dxpy.dxlink(readcountDXFile.get_id())return{"readcount_fileDX": readCountDXlink}
This entry point returns {"readcount_fileDX": readCountDXlink}, a JBOR referencing an uploaded text file. This approach to scatter-gather stores the results in files and uploads/downloads the information as needed. This approach exaggerates a scatter-gather for tutorial purposes. You’re able to pass types other than file such as int.
combine_files
The main entry point triggers this subjob, providing the output of samtoolscount_bam as an input. This entry point gathers all the files generated by the samtoolscount_bam jobs and sums them.
defcombine_files(countDXlinks,resultfn):"""The 'gather' subjob of the applet. Arguments: countDXlinks (list[dict]): list of DXlinks to process job output files. resultfn (str): Filename to use for job output file. Returns: DXLink for the main function to return as the job output. Note: Only the DXLinks are passed as parameters. Subjobs work on a fresh instance so files must be downloaded to the machine """if resultfn.endswith(".bam"): resultfn = resultfn[:-4]+'.txt' sum_reads =0withopen(resultfn, 'w')as f:for i, dxlink inenumerate(countDXlinks): dxfile = dxpy.DXFile(dxlink) filename ="countfile{0}".format(i) dxpy.download_dxfile(dxfile, filename)withopen(filename, 'r')as fsub:for line in fsub: sum_reads +=parse_line_for_readcount(line) f.write(line) f.write('Total Reads: {0}'.format(sum_reads)) countDXFile = dxpy.upload_local_file(resultfn) countDXlink = dxpy.dxlink(countDXFile.get_id())return{"countDXLink": countDXlink}
Important: While the main entry point triggers the processing and gathering entry points, keep in mind the main entry point doesn’t do any heavy lifting or processing. Notice in the .runSpec json above we start with a lightweight instance, scale up for the processing entry point, then finally scale down for the gathering step.