In order to take full advantage of the scalability that cloud computing offers, our scripts have to implement the correct methodologies. This applet tutorial will:
Install SAMtools
Download BAM file
Split workload
Count regions in parallel
How is the SAMtools dependency provided?
The SAMtools dependency is resolved by declaring an Apt-Get package in the dxapp.jsonrunSpec.execDepends field.
This applet downloads all inputs at once using dxpy.download_all_inputs:
inputs = dxpy.download_all_inputs()# download_all_inputs returns a dictionary that contains mapping from inputs to file locations.# Additionaly, helper keys, value pairs are added to the dicitonary, similar to bash helper functionsinputs# mappings_sorted_bam_path: [u'/home/dnanexus/in/mappings_sorted_bam/SRR504516.bam']# mappings_sorted_bam_name: u'SRR504516.bam'# mappings_sorted_bam_prefix: u'SRR504516'# mappings_sorted_bai_path: u'/home/dnanexus/in/mappings_sorted_bai/SRR504516.bam.bai'# mappings_sorted_bai_name: u'SRR504516.bam.bai'# mappings_sorted_bai_prefix: u'SRR504516'
Split workload
We process in parallel using the python multiprocessing module using a rather simple pattern shown below:
print("Number of cpus: {0}".format(cpu_count()))# Get cpu count from multiprocessingworker_pool =Pool(processes=cpu_count())# Create a pool of workers, 1 for each coreresults = worker_pool.map(run_cmd, collection)# map run_cmds to a collection# Pool.map will handle orchestrating the jobworker_pool.close()worker_pool.join()# Make sure to close and join workers when done
This convenient pattern allows you to quickly orchestrate jobs on a worker. For more detailed overview of the multiprocessing module, visit the python docs.
We create several helpers in our applet script to manage our workload. One helper you may have seen before is run_cmd; we use this function to manage or subprocess calls:
defrun_cmd(cmd_arr):"""Run shell command. Helper function to simplify the pool.map() call in our parallelization. Raises OSError if command specified (index 0 in cmd_arr) isn't valid """ proc = subprocess.Popen( cmd_arr, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() exit_code = proc.returncode proc_tuple = (stdout, stderr, exit_code)return proc_tuple
Before we can split our workload, we need to know what regions are present in our BAM input file. We handle this initial parsing in the parse_sam_header_for_region function:
defparse_sam_header_for_region(bamfile_path):"""Helper function to match SN regions contained in SAM header Returns: regions (list[string]): list of regions in bam header """ header_cmd = ['samtools','view','-H', bamfile_path]print('parsing SAM headers:', " ".join(header_cmd)) headers_str = subprocess.check_output(header_cmd).decode("utf-8") rgx = re.compile(r'SN:(\S+)\s') regions = rgx.findall(headers_str)return regions
Once our workload is split and we’ve started processing, we wait and review the status of each Pool worker. Then, we merge and output our results.
Note: The run_cmd function returns a tuple containing the stdout, stderr, and exit code of the subprocess call. We parse these outputs from our workers to determine whether the run failed or passed.
defverify_pool_status(proc_tuples):""" Helper to verify worker succeeded. As failed commands are detected we write the stderr from that command to the job_error.json file. This file will be printed to the platform job log on App failure. """ all_succeed =True err_msgs = []for proc in proc_tuples:if proc[2]!=0: all_succeed =False err_msgs.append(proc[1])if err_msgs:raise dxpy.exceptions.AppInternalError(b"\n".join(err_msgs))
Last updated
Copyright 2024 DNAnexus
This site uses cookies to deliver its service and to analyse traffic. By browsing this site, you accept the privacy policy.