Exploring and Querying Datasets

A license is required to access Spark functionality on the DNAnexus Platform. Contact DNAnexus Sales for more information.

Extracting Data From a Dataset With Spark

The dx commands, extract_dataset and extract_assay germline, provide the option to either return the data dictionary of a dataset or to retrieve the underlying data comprising a dataset that is described by the data dictionary. The commands also provide options to return metadata of a dataset, such as listing the name and title for entities and fields, or listing all relevant assays in a dataset. When retrieving data the user will have the choice of using a private Spark resource. In most scenarios, retrieving data without direct Spark usage may suffice, and additional compute resources may not be needed (see the example OpenBio notebooks). However, when additional compute resources are needed, data is returned using the DNAnexus Thrift Server, and though the server is highly available there is a fixed timeout which may prevent a high number of queries from executing. In scenarios where the data model has many relationships, there is a high volume of stored data, and/or there is a high volume of data to be extracted and returned, it may be necessary to extract data using additional private compute resources. These resources are scaled accordingly so that timeouts enforced via the Thrift Server are avoided completely. If the flag --sql is provided the command will instead return a SQL statement (string) to then use when querying from a stand alone Spark-enabled app(let), such as JupyterLab.

Initiating a Spark Session

The most common way to use Spark on the DNAnexus Platform is via a Spark enabled JupyterLab notebook.

After creating a Jupyter notebook within a project, enter the commands shown below, to initiate a Spark session.

Python:

import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

R:

install.packages("sparklyr")
library(sparklyr)
port <- Sys.getenv("SPARK_MASTER_PORT")
master <- paste("spark://master:", port, sep = '')
sc = spark_connect(master)

Executing SQL Queries

Once you've initiated a Spark session, you can run SQL queries on the database within your notebook, with the results written to a Spark DataFrame:

Python:

retrieve_sql = 'select .... from .... '
df = spark.sql(retrieve_sql)

R:

library(DBI)
retrieve_sql <- 'select .... from .... '
df = dbGetQuery(sc, retrieve_sql)

Query to Extract Data From Database Using extract_dataset

Python:

import subprocess
cmd = ["dx", "extract_dataset", dataset, "--fields", "entity1.field1, entity1.field2, entity2.field4", "-sql", "-o", "extracted_data.sql"]
subprocess.check_call(cmd)

Where dataset is the record-id or the path to the dataset or cohort, for example, “record-abc123” or “/mydirectory/mydataset.dataset.”

R:

cmd <- paste("dx extract_dataset", dataset, " --fields", "entity1.field1, entity1.field2, entity2.field4", "--sql", "-o extracted_data.sql")
system(cmd)

Where dataset is the record-id or the path to the dataset or cohort.

Query to Filter and Extract Data from Database Using extract_assay germline

Python:

import subprocess
cmd = ["dx", "extract_assay", "germline", dataset, "--retrieve-allele", "allele_filter.json", "--sql", "-o", "extract_allele.sql"]
subprocess.check_call(cmd)

R:

cmd <- paste("dx extract_assay", "germline", dataset, "--retrieve-allele", "allele_filter.json", "--sql", "-o extracted_allele.sql")
system(cmd)

In the examples above, dataset is the record-id or the path to the dataset or cohort, for example, record-abc123 or /mydirectory/mydataset.dataset. allele_filter.json is a JSON object, as a file, and which contains filters for the --retrieve-allele command. For more information, refer to the notebooks here.

Run SQL Query to Extract Data

Python:

with open("extracted_data.sql", "r") as file:
    retrieve_sql=""
    for line in file: 
        retrieve_sql += line.strip()
df = spark.sql(retrieve_sql.strip(";"))

R:

install.packages("tidyverse")
library(readr)
retrieve_sql <-read_file("extracted_data.sql")
retrieve_sql <- gsub("[;\n]", "", retrieve_sql)
df <- dbGetQuery(sc, retrieve_sql)

Best Practices

  • When querying large datasets - such as those containing genomic data - ensure that your Spark cluster is scaled up appropriately with multiple clusters to parallelize across.

  • Ensure that your Spark session is only initialized once per Jupyter session. If you initialize the Spark session in multiple notebooks in the same Jupyter Job - e.g. run notebook 1 and also run notebook 2 OR run a notebook from start to finish multiple times - the Spark session will be corrupted and you will need to restart the specific notebook's kernel. As a best practice, shut down the kernel of any notebook you are not using, before running a second notebook in the same session.

  • If you would like to use a database outside your project's scope, you must refer to it using it's unique database name (typically this will look something like database_fjf3y28066y5jxj2b0gz4g85__metabric_data) as opposed to the database name (metabric_data in this case).

Last updated