Spark Apps

This document describes how to develop Spark apps for Apollo. It is intended for app developers familiar with developing apps on DNAnexus.

Overview

Spark apps are special apps that instantiate a fully managed, on-demand Spark/Hadoop cluster on the DNAnexus platform for big data processing and translational informatics. Spark apps are like regular apps in many ways. However, Spark apps have access to the data processing capabilities of Spark, which allow them to do distributed data processing in a natural, scalable way. Although Spark apps are implemented using the same framework as regular apps, there are some notable differences.

Minimum DNAnexus CLI version

Make sure your dx-toolkit has been updated to the latest release to take advantage of the latest features and improvements. Download the latest version at Downloads.

Bash app always

Spark-based DNAnexus apps should be Bash apps at the top level. If you want to develop PySpark applications, then you can include Python code as resources to your app and submit them to Spark in the Bash script.

The app specification

The following things must be included in the app specification dxapp.json.

The cluster specification

The cluster specification describes the makeup of the Spark cluster. You can configure things like cluster type, the Spark version (2.4.0) and the number of nodes. The cluster specification is required for Spark apps -- it should be added to the dxapp.json with the following syntax:

{
"...": "...",
"systemRequirements": {
"*": {
"...": "...",
"clusterSpec": {
"type": "dxspark", # Type of the cluster e.g dxspark , apachespark
"version": "2.4.0", # Cluster version to use
"initialInstanceCount": "<num_cluster_nodes>", # Total number of nodes in the cluster (including 1 master)
"ports": "9500, 9700-9750", # ( Optional ) Ports (or port range) to be opened between the nodes of the cluster
"bootstrapScript": "path/to/script.sh" # ( Optional ) Bootstrap Script that can run on all nodes of
# the cluster before application code.
}
}
}
}

The cluster specification is a mapping with the following key-values:

  • type string the cluster type, "dxspark" for an Apollo Spark cluster or "apachespark" for generic Spark.

  • version string the Spark version, "2.4.0"

  • initialInstanceCount integer the number of nodes in the cluster, including the master node. It should be at least 1.

    If the value is 1, then its a single node spark cluster where both master and slave are running on the same node.

  • ports : (Optional) Comma separated list of ports (or port range) to be opened between the nodes of the cluster.

  • bootstrapScript : ( Optional ) Path to the bootstrap script. Bootstrap Script runs on all nodes of the cluster

    before application code. It's recommended that the script be located at the same location as the application code.

Types of Clusters

The following spark cluster types are currently supported.

dxspark

This is the Apollo flavor of Spark which is fully integrated with the rest of the platform. For example, when you create a database there is a representative database object created in the same project where your app was executed. Those databases can be shared by sharing the project and the permission levels are reflected in the SQL permissions.

apachespark

This is generic Apache Spark that you can use to create a Spark cluster and run Spark jobs. You can specify a custom location to store your database files e.g. your own Amazon S3 bucket. There are no platform database objects created or storage provided, but you can use this cluster type to perform Spark analysis jobs and store results back to the platform in your output spec.

Spark Cluster Management Software

Spark apps use Spark Standalone for cluster management and HDFS to share data between the nodes. The following diagram depicts a typical spark cluster application. spark-arch

Spark cluster application consists of the following components. 1. Spark Master: Master is most important component of spark cluster. It runs all the important services requried to keep the cluster functioning like spark master service, spark driver, hdfs namenode, hdfs datanode. Application code runs on this node. 2. One or more Spark Slaves: Slave nodes run Spark slave service. Spark executor processes are started in these nodes which process the data.

All the nodes of the cluster are identical in-terms of hardware and software configurations.

  • Nodes have the same instance-type.

  • Nodes have the same set of software installed. Packages listed in assetDepends, bundleDepends and files under resources/

    will all be available on all nodes of the cluster.

Application code that runs the main logic of the app runs only on the master node. Slave nodes do not run any application code directly. In spark cluster, application code running on master would submit a spark job. Spark master would break down the spark job into multiple tasks and assign slaves to execute them.

Output specification

For apps that create databases, you might want to have the new database name as an app output, so the output specification should include the following:

{
"outputSpec": [
{
"name": "database",
"label": "Output database",
"class": "string",
"patterns": { "class": "database" },
"help": "The output database"
},
"..."
],
"...": "..."
}

Network and project access

dxspark apps should have network access to be able to connect to the DNAnexus Hive Metastore. Apps that create databases should have direct access to the parent project (minimally UPLOAD), and apps that read databases should have access to all projects (minimally VIEW).

Example dxapp.json:

{
"access": {
"project": "CONTRIBUTE",
"allProjects": "VIEW",
"network": [ "*" ]
},
"...": "..."
}

Ports

Cluster requires a number of ports to be opened in order to communicate between the nodes. If the user wants any additional ports to be opened, it can be specified in clusterSpec.

"systemRequirements": {
"*": {
"instanceType": "mem1_ssd1_x4",
"clusterSpec": {
"type": "dxspark",
"version": "2.4.0",
"initialInstanceCount": 3,
"ports": "9500, 9700-9750",
"bootStrapScript":"src/startup.sh"
}
}

Comma separated list of ports (or port ranges) can be specified. For example, ports 9500 and ports in range 9700-9750 are opened between the nodes for the above configuration. Platform uses certain ports for performing its internal functions and those ports should be not be used.

Bootstrapping

Application code in cluster applications runs only on master. Slave nodes do not run any application code. There might be instances where you may need to run some code in all the nodes. For example, starting any services or downloading data etc. BootstrapScripts are used for this purpose.

The following is the sequence of executions on master and slave nodes.

bootstrapping

Bootstrap script will run on all nodes and can be used to start any services that the cluster would depend on. The following is an example of bootstrap script.

#!/usr/bin/env bash
set -e -o pipefail
. /cluster/dx-cluster.environment
. /home/dnanexus/environment
run_on_master()
{
set +e -x
# TODO add code that runs on master
set -e +x
}
run_on_slave()
{
#set +e -x
# TODO add code that runs on slave
#set -e +x
}
if [ -z "$DX_CLUSTER_MASTER_IP" ]; then
echo "Bootstrapping master node"
run_on_master
else
echo "Bootstrapping slave node"
run_on_slave
fi

Default instance configurations

Multi node spark cluster

Based on the instance type, the following spark configurations are automatically set by default.

  • spark.executor.memory is set to 70% of totalMemoryMB

  • spark.executor.cores is set to numCores

  • spark.driver.memory is set to 70% of totalMemoryMB

Single node spark cluster

  • spark.driver.memory is set to 1024mb

  • spark.executor.memory is set to 70% of (totalMemoryMB - 1024mb)

  • spark.executor.cores is set to numCores

Cluster life cycle

The job life cycle will remain the same for spark apps. There are no new statuses added for spark apps. A spark job will fail if the master node fails. If the slave node fails, a new slave node will be provisioned. Spark application failures can also cause the job to fail.

Bash script -- things to include

The following things should be included in the Bash app's main script, described in the order in which they might be required.

Distributing input data

Unlike regular jobs, input data for Spark jobs is not automatically available across the complete execution environment. In particular, input data must be distributed to the Spark workers before the Spark executors can process the data. To distribute data to all nodes, you can put it into HDFS using the following line.

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

DX Spark Submit

The dx-spark-submit utility is the easy and recommended way to submit your app to the Spark cluster, override Spark configs and setup logging.

For more information on dx-spark-submit click here. If you're using dx-spark-submit you can skip down to the PySpark topic.

Submitting applications (without dx-spark-submit)

If you chose not to use the dx-spark-submit utility, then you need to perform the traditional Spark steps.

To submit a PySpark application to the Spark cluster, include the following line.

$SPARK_HOME/bin/spark-submit /opt/pyspark_app.py ...

This is the minimum requirement for submitting applications. More advanced options are described below.

Configuring runtime memory and cores

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ... # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
--executor-memory=$spark_executor_memory \
--executor-cores=$spark_executor_cores \
/opt/pyspark_app.py ...

Custom Spark properties

To pass custom Spark properties for the Spark application, you can do

$SPARK_HOME/bin/spark-submit \
--conf $spark_property=$spark_property_value \
/opt/pyspark_app.py ...

or

$SPARK_HOME/bin/spark-submit \
--properties-file=/path/to/properties/file \
/opt/pyspark_app.py ...

Sharing environment variables

$SPARK_HOME/bin/spark-submit \
--conf spark.executorEnv.PYTHONPATH=$PYTHONPATH \
/opt/pyspark_app.py ...

Custom log levels

$SPARK_HOME/bin/spark-submit \
--driver-java-options -Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_master_log_level)" \
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_workers_log_level)" \
/opt/pyspark_app.py ...

The available log levels are {"WARN", "INFO", "DEBUG", "TRACE"}. By default, the log levels are WARN, but at times it may be desirable to see more detailed logs for debugging.

Collecting cluster logs

To enable the collection of Spark cluster logs for debugging after the fact, you must include the following in the output specification

{
"outputSpec": [
{
"name": "cluster_runtime_logs_tarball",
"label": "Cluster runtime logs tarball",
"class": "file",
"patterns": [ "*.tar.gz" ],
"optional": true,
"help": "The tarball of the cluster runtime logs"
},
"..."
],
"...": "..."
}

and the following in the Bash script after the spark-submit call.

/cluster/log_collector.sh /home/dnanexus/out/cluster_runtime_logs_tarball

The event logs in the tarball could be passed to the Spark history server for debugging after the fact. How to do so is outside the scope of this documentation.

PySpark

The following things should be included in the submitted PySpark script, described in the order in which they might be required.

Spark session and Hive support

Inside the submitted PySpark script (pyspark_app.py in the previous examples), a Spark session must be instantiated, and it must have Hive support.

import pyspark
spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()

Additional options may be specified, but they are outside the scope of this documentation.

Creating databases

To create DNAnexus databases, you can use the following line.

spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")

Note the location 'dnax://' -- this is the custom scheme that must be used for all DNAnexus databases in order to integrate with the DNAnexus platform.

Monitoring the Spark UI

While the Spark job is running, it is possible to interactively monitor the Spark UI. The Spark UI is enabled by default and is running at port 8081.

Using httpsApp

The httpsApp feature enables the app to open up https access to web servers running at ports [443, 8080, 8081]. We can use this feature to easily expose the Spark UI on the cluster which is running at port 8081.

The following needs to be added to the app's dxapp.json:

"httpsApp": {
"shared_access":"NONE",
"ports": [
8081
]
},

Once the app starts running within the Spark context, you can access the Spark UI at https://job-xxxx.dnanexus.cloud:8081/jobs/

Spark App Example

The dxapp.json:

{
"...": "...",
"outputSpec": [
{
"name": "database",
"label": "Output database",
"class": "string",
"patterns": { "class": "database" },
"help": "The output database"
},
"..."
],
"runSpec": {
"file": "src/bash_app.sh"
},
"systemRequirements": {
"*": {
"...": "...",
"clusterSpec": {
"type": "dxspark",
"version": "2.4.0",
"initialInstanceCount": "<num_cluster_nodes>"
}
}
},
"access": {
"project": "CONTRIBUTE",
"allProjects": "VIEW",
"network": [ "*" ]
},
}

The bash_app.sh:

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file
spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ... # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
--executor-memory=$spark_executor_memory \
--executor-cores=$spark_executor_cores \
/opt/pyspark_app.py ...

The pyspark_app.py:

import pyspark
spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")