Spark Apps

Learn to develop Spark apps for use in Apollo.

DNAnexus is working to phase out outdated terminology and change scripts using those terms to remove inappropriate language. The terms "master" and "slave" will be replaced with "driver" and "clusterWorker" in Spark documentation. DNAnexus will also eventually replace the older terms in the codebase. For now, variable names and scripts containing the older terms will still be used in the actual code.

Overview

A license is required to access Spark functionality on the DNAnexus Platform. Contact DNAnexus Sales for more information.

Spark apps are like regular apps except that they are executed on a Spark cluster (instead of on a single machine). When a Spark app is launched, the system instantiates a Spark cluster specifically for that execution. The cluster contains the Apache Spark software -- a modern, scalable framework for parallel processing of big data.

dx-toolkit Versions

Make sure your dx-toolkit has been updated to the latest release to take advantage of the latest features and improvements. Download the latest version at Downloads.

Bash Apps

Spark-based DNAnexus apps should be Bash apps at the top level. If you want to develop PySpark applications, then you can include Python code as resources to your app and submit them to Spark in the Bash script.

App Specifications

The following settings must be included in the app specification dxapp.json.

Cluster Specifications

The cluster specification describes the makeup of the Spark cluster. You can configure things like cluster type, the Spark version and the number of nodes. The cluster specification is required for Spark apps -- it should be added to the dxapp.json with the following syntax:

{
  "...": "...",
  "systemRequirements": {
    "*": {
      "...": "...",
      "clusterSpec": {
        "type": "dxspark",                                  # Type of the cluster e.g dxspark , apachespark
        "version": "3.2.3",                                 # Cluster version to use
        "initialInstanceCount": "<num_cluster_nodes>",      # Total number of nodes in the cluster (including 1 master)
        "ports": "9500, 9700-9750",                         # ( Optional ) Ports (or port range) to be opened between the nodes of the cluster 
        "bootstrapScript": "path/to/script.sh"              # ( Optional ) Bootstrap Script that can run on all nodes of 
                                                            # the cluster before application code.
      }
    }
  }
}

The cluster specification is a mapping with the following key-values:

  • type string the cluster type, "dxspark" for an Apollo Spark cluster or "apachespark" for generic Spark.

  • version string Requested version for dxspark or apachespark clusters. Supported values are [2.4.4, 3.2.3]

  • initialInstanceCount integer the number of nodes in the cluster, including the driver node. It should be at least 1.

    If the value is 1, then its a single node spark cluster where both driver and clusterWorker are running on the same node.

  • ports : (Optional) Comma separated list of ports (or port range) to be opened between the nodes of the cluster.

  • bootstrapScript : (Optional) Path to the bootstrap script. Bootstrap Script runs on all nodes of the cluster before application code. It's recommended that the script be located at the same location as the application code.

Types of Clusters

The following spark cluster types are currently supported.

dxspark

This is the Apollo flavor of Spark which is fully integrated with the rest of the Platform. For example, when you create a database there is a representative database object created in the same project where your app was executed. Those databases can be shared by sharing the project and the permission levels are reflected in the SQL permissions.

apachespark

This is generic Apache Spark that you can use to create a Spark cluster and run Spark jobs. You can specify a custom location to store your database files e.g. your own Amazon S3 bucket. There are no Platform database objects created or storage provided, but you can use this cluster type to perform Spark analysis jobs and store results back to the Platform in your output spec.

Only the following combinations of runspec and clusterspec are supported:

Spark Cluster Management Software

Spark apps use Spark Standalone for cluster management and HDFS to share data between the nodes. The following diagram depicts a typical spark cluster application.

Spark cluster application consists of the following components. 1. Spark Master: Driver (formerly master) is most important component of Spark cluster. It runs all the important services required to keep the cluster functioning like spark master service, spark driver, hdfs namenode, hdfs datanode. Application code runs on this node. 2. One or more Spark Slaves: ClusterWorker (formerly slave) nodes run Spark ClusterWorker service. Spark executor processes are started in these nodes which process the data.

All the nodes of the cluster are identical in-terms of hardware and software configurations.

  • Nodes have the same instance-type.

  • Nodes have the same set of software installed. Packages listed in assetDepends, bundleDepends and files under resources/

    will all be available on all nodes of the cluster.

Application code that runs the main logic of the app runs only on the driver node. ClusterWorker nodes do not run any application code directly. In Spark cluster, application code running on driver would submit a Spark job. The Spark driver breaks down the Spark job into multiple tasks and assigns clusterWorkers to execute them.

Output Specification

For apps that create databases, you may want to set the new database name as an app output, so the output specification will include the following:

{
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "...": "..."
}

Network and Project Access

dxspark apps should have network access to be able to connect to the DNAnexus Hive Metastore. To be able to read databases from the parent project, app(let)s should request VIEW access to the project. To create new databases or write into existing databases in the parent project, app(let)s should request UPLOAD access to the project.

Example dxapp.json:

{
  "access": {
    "project": "VIEW",
    "allProjects": "VIEW",
    "network": [ "*" ]
  },
  "...": "..."
}

Ports

Cluster requires a number of ports to be opened in order to communicate between the nodes. The system automatically includes ports required by Spark if the cluster type is dxspark or apachespark. Additional ports to be opened can be specified in clusterSpec.

"systemRequirements": {
 "*": {
   "instanceType": "mem1_ssd1_x4",
   "clusterSpec": {
     "type": "dxspark",
     "version": "3.2.3",
     "initialInstanceCount": 3,
     "ports": "9500, 9700-9750",
     "bootStrapScript":"src/startup.sh"
   }
 }

Comma separated list of ports (or port ranges) can be specified. For example, ports 9500 and ports in range 9700-9750 are opened between the nodes for the above configuration. Platform uses certain ports for performing its internal functions and those ports should be not be used.

Bootstrapping

Application code in cluster applications runs only on the driver. ClusterWorker nodes do not run any application code. There might be instances where you may need to run some code in all the nodes. For example, starting any services or downloading data etc. BootstrapScripts are used for this purpose.

The following is the sequence of executions on driver and clusterWorker nodes.

The bootstrap script will run on all nodes and can be used to start any services that the cluster would depend on. The following is an example of bootstrap script.

#!/usr/bin/env bash

set -e -o pipefail

. /cluster/dx-cluster.environment
. /home/dnanexus/environment

run_on_master()
{
    set +e -x
    # TODO add code that runs on master
    set -e +x
}

run_on_slave()
{
    #set +e -x
    # TODO add code that runs on slave
    #set -e +x
}

if [ -z "$DX_CLUSTER_MASTER_IP" ]; then
 echo "Bootstrapping master node"
 run_on_master
else
 echo "Bootstrapping slave node"
 run_on_slave
fi

Pre-Bootstrapping

In some cases when running a Spark application, you may want to perform some actions prior to the Spark cluster being bootstrapped since Spark automatically starts on successful installation. An example of this could be mounting a remote volume (a fuse mount for example).

In order to create a pre-bootstrap script for your app or applet, you must create a file under the resources directory as follows: /resources/cluster/hooks/prebootstrap.sh (the resources folder being the app folder that gets extracted over the instances root directory, so /cluster/ folder will be in the root directory of the app instance).

This pre-bootstrap shell script will be run during the installation of the dxspark package. If the pre-bootstrap script returns a non-zero exit code, the setup of the app will fail and the instance will terminate. In the case of a multi-node cluster configuration, if setup on a child node fails, it will cause the child node to terminate and another to be spun up to take its place, resulting in the pre-bootstrap script being tried again.

If you wish to display the results of the pre-bootstrap script, you can find the log at $PRE_BOOTSTRAP_LOG (default value of /cluster/prebootstrap.log). Below is an example for how to display it from within your app's startup script:

if [[ -f $PRE_BOOTSTRAP_LOG ]]; then 
    cat $PRE_BOOTSTRAP_LOG 
fi

Default Instance Configurations

Multi-node Spark Cluster

Based on the instance type, the following spark configurations are automatically set by default.

  • spark.executor.memory is set to 70% of totalMemoryMB

  • spark.executor.cores is set to numCores

  • spark.driver.memory is set to 70% of totalMemoryMB

Single-node Spark Cluster

  • spark.driver.memory is set to 1024mb

  • spark.executor.memory is set to 70% of (totalMemoryMB - 1024mb)

  • spark.executor.cores is set to numCores

Cluster Life Cycle

The job lifecycle reflects the state of the driver node. A Spark job will fail if the driver node fails. If the clusterWorker node fails, a new node will be provisioned. Spark application failures can also cause the job to fail.

What to Include in Your Bash Script

The following things should be included in the Bash app's main script, described in the order in which they might be required.

Distributing Input Data

Unlike regular jobs, input data when downloaded is not automatically available across the complete execution environment. Instead it is available only on the driver node. If the data has to be processed by Spark, it must be distributed to the Spark workers. To distribute data to all nodes, you can put it into HDFS using the following line.

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

DX Spark Submit

The dx-spark-submit utility is the easy and recommended way to submit your app to the Spark cluster, override Spark configs and setup logging.

For more information on dx-spark-submit click here. If you're using dx-spark-submit you can skip down to the PySpark topic.

Submitting Applications Without dx-spark-submit

If you chose not to use the dx-spark-submit utility, then you need to perform the traditional Spark steps.

To submit a PySpark application to the Spark cluster, include the following line.

$SPARK_HOME/bin/spark-submit /opt/pyspark_app.py ...

This is the minimum requirement for submitting applications. More advanced options are described below.

Configuring Runtime Memory and Cores

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...

Custom Spark Properties

To pass custom Spark properties for the Spark application, you can do

$SPARK_HOME/bin/spark-submit \
  --conf $spark_property=$spark_property_value \
  /opt/pyspark_app.py ...

or

$SPARK_HOME/bin/spark-submit \
  --properties-file=/path/to/properties/file \
  /opt/pyspark_app.py ...

Sharing Environment Variables

$SPARK_HOME/bin/spark-submit \
  --conf spark.executorEnv.PYTHONPATH=$PYTHONPATH \
  /opt/pyspark_app.py ...

Custom Log Levels

$SPARK_HOME/bin/spark-submit \
  --driver-java-options -Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_master_log_level)" \
  --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_workers_log_level)" \
  /opt/pyspark_app.py ...

The available log levels are {"WARN", "INFO", "DEBUG", "TRACE"}. By default, the log levels are WARN, but at times it may be desirable to see more detailed logs for debugging.

Collecting Cluster Logs

To enable the collection of Spark cluster logs for debugging after the fact, you must include the following in the output specification

{
  "outputSpec": [
    {
      "name": "cluster_runtime_logs_tarball",
      "label": "Cluster runtime logs tarball",
      "class": "file",
      "patterns": [ "*.tar.gz" ],
      "optional": true,
      "help": "The tarball of the cluster runtime logs"
    },
    "..."
  ],
  "...": "..."
}

and the following in the Bash script after the spark-submit call.

/cluster/log_collector.sh /home/dnanexus/out/cluster_runtime_logs_tarball

The event logs in the tarball could be passed to the Spark history server for debugging after the fact.

PySpark

The following things should be included in the submitted PySpark script, described in the order in which they might be required.

Spark Session and Hive Support

Inside the submitted PySpark script (pyspark_app.py in the previous examples), a Spark session must be instantiated, and it must have Hive support.

import pyspark

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()

Additional options may be specified, but they are outside the scope of this documentation.

Creating Databases

To create DNAnexus databases, you can use the following line.

spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")

Note the location 'dnax://' - this is the custom scheme that must be used for all DNAnexus databases in order to integrate with the DNAnexus Platform.

Monitoring the Spark UI

While the Spark job is running, it is possible to interactively monitor the Spark UI. The Spark UI is enabled by default and is running at port 8081.

Using httpsApp

A license is required to use this feature. Contact DNAnexus Sales for more information.

The httpsApp feature enables the app to open up https access to web servers running at ports [443, 8080, 8081]. You can use this feature to easily expose the Spark UI on the cluster which is running at port 8081.

The following needs to be added to the app's dxapp.json:

  "httpsApp": {
      "shared_access":"NONE",
      "ports": [
        8081
      ]
  },

Once the app starts running within the Spark context, you can access the Spark UI at https://job-xxxx.dnanexus.cloud:8081/jobs/

Using Spark History Server

It is still possible to construct the UI of an application through Spark’s history server, provided that the application’s event logs exist. When log collection is enabled, event logs are also collected. Spark history server is included in Spark binaries.

  1. Download Spark from https://spark.apache.org/downloads.html and install Spark.

  2. Download the Spark app logs and extract. In tmp/clusterLogs/eventlogs/ - you will see a file. Copy this to let's say /userName/sparkUIEventLogs/

  3. Create a file, for this example /userName/histProperties.txt , which has this single line spark.history.fs.logDirectory=file:/userName/sparkUIEventLogs

  4. Run the spark history server $SPARK_HOME/sbin/start-history-server.sh --properties-file /userName/histProperties.txt. Now go to localhost:18080 in your browser and check out the jobs, stages, executor usage And you can add more files to the /userName/sparkUIEventLogs/ and the history server will pick them up.

For more details refer to: https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact

Spark App Example

The dxapp.json:

{
  "...": "...",
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "runSpec": {
    "file": "src/bash_app.sh",
    "systemRequirements": {
      "*": {
        "...": "...",
        "clusterSpec": {
          "type": "dxspark",
          "version": "3.2.3",
          "initialInstanceCount": "<num_cluster_nodes>"
      }
    }
  }
  },
  "access": {
    "project": "CONTRIBUTE",
    "allProjects": "VIEW",
    "network": [ "*" ]
  },
}

The bash_app.sh:

$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...

The pyspark_app.py:

import pyspark

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")

Last updated