# Spark Apps

{% hint style="info" %}
To reflect more inclusive language, DNAnexus has updated its terminology. The terms `master` and `slave` are being replaced with `driver` and `clusterWorker` in Spark documentation articles. The codebase updates are in progress. Some variable names and scripts in the code still use the older terms.
{% endhint %}

## Overview

{% hint style="info" %}
A license is required to access Spark functionality on the DNAnexus Platform. [Contact DNAnexus Sales](mailto:sales@dnanexus.com) for more information.
{% endhint %}

Spark apps are like regular apps except that they are executed on a Spark cluster (instead of on a single machine). When a Spark app is launched, the system instantiates a Spark cluster specifically for that execution. The cluster contains the Apache Spark software - a modern, scalable framework for parallel processing of big data.

## dx-toolkit Versions

Make sure your dx-toolkit has been updated to the latest release to take advantage of the latest features and improvements. Download the latest version at [Downloads](https://documentation.dnanexus.com/downloads).

## Bash Apps

Spark-based DNAnexus apps should be Bash apps at the top level. If you want to develop PySpark applications, then you can include Python code as resources to your app and submit them to Spark in the Bash script.

## App Specifications

The following settings must be included in the app specification [`dxapp.json`](https://documentation.dnanexus.com/developer/apps/app-metadata).

### Cluster Specifications

The cluster specification describes the makeup of the Spark cluster. You can configure things like cluster type, the Spark version and the number of nodes. The cluster specification is required for Spark apps -- it should be added to the `dxapp.json` with the following syntax:

```python
{
  "...": "...",
  "systemRequirements": {
    "*": {
      "...": "...",
      "clusterSpec": {
        "type": "dxspark",                                  # Type of the cluster e.g dxspark , apachespark
        "version": "3.5.2",                                 # Cluster version to use
        "initialInstanceCount": "<num_cluster_nodes>",      # Total number of nodes in the cluster (including 1 master)
        "ports": "9500, 9700-9750",                         # ( Optional ) Ports (or port range) to be opened between the nodes of the cluster
        "bootstrapScript": "path/to/script.sh"              # ( Optional ) Bootstrap Script that can run on all nodes of
                                                            # the cluster before application code.
      }
    }
  }
}
```

The cluster specification is a mapping with the following key-values:

* `type` **string** the cluster type, `dxspark` for an Apollo Spark cluster or `apachespark` for generic Spark.
* `version` **string** Requested version for `dxspark` or `apachespark` clusters. Supported values are `[2.4.4, 3.2.3, 3.5.2]`
* `initialInstanceCount` **integer** the number of nodes in the cluster, including the driver node. It should be at least `1`. If the value is 1, then it's a single-node Spark cluster where both `driver` and `clusterWorker` are running on the same node.
* `ports` : (Optional) Comma-separated list of ports (or port range) to be opened between the nodes of the cluster.
* `bootstrapScript` : (Optional) Path to the bootstrap script. Bootstrap Script runs on all nodes of the cluster before application code. It's recommended that the script be located at the same location as the application code.

### Types of Clusters

The following spark cluster types are supported.

#### `dxspark`

This is the Apollo flavor of Spark which is fully integrated with the rest of the Platform. For example, when you create a database there is a representative database object created in the same project where your app was executed. Those databases can be shared by sharing the project and the permission levels are reflected in the [SQL permissions](https://documentation.dnanexus.com/user/spark#database-access).

#### `apachespark`

This is generic Apache Spark that you can use to create a Spark cluster and run Spark jobs. You can specify a custom location to store your database files for example your own Amazon S3 bucket. The Platform does not create any database objects or provide storage, but you can use this cluster type to perform Spark analysis jobs and store results back to the Platform in your output spec.

Only the following combinations of `runspec` and `clusterspec` are supported:

| runspec.distribution | runspec.release | runspec.version | Python version | runspec.interpreter | clusterspec.type       | clusterspec.version |
| -------------------- | --------------- | --------------- | -------------- | ------------------- | ---------------------- | ------------------- |
| Ubuntu               | 14.04           | 0               | 2.7.x          | bash                | apachespark or dxspark | 2.4.4               |
| Ubuntu               | 14.04           | 1               | 3.5.x          | bash                | apachespark or dxspark | 2.4.4               |
| Ubuntu               | 16.04           | 0               | 2.7.x          | bash                | apachespark or dxspark | 2.4.4               |
| Ubuntu               | 16.04           | 1               | 3.5.x          | bash                | apachespark or dxspark | 2.4.4               |
| Ubuntu               | 20.04           | 0               | 3.8.10         | bash                | apachespark or dxspark | 3.2.0               |
| Ubuntu               | 20.04           | 0               | 3.8.10         | bash                | apachespark or dxspark | 3.2.3               |
| Ubuntu               | 20.04           | 0               | 3.8.10         | bash                | apachespark or dxspark | 3.5.2               |

### Spark Cluster Management Software

Spark apps use [Spark Standalone](https://spark.apache.org/docs/latest/spark-standalone.html) for cluster management and [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) to share data between the nodes. The following diagram depicts a typical spark cluster application.

![Spark-arch diagram.](https://1612471957-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_EsL_ie8XyZlLe_yf9%2Fuploads%2Fgit-blob-06eae57638a8b4640bc3daff9f4cbe75cc3e600a%2Fspark-arch-0.png?alt=media)

A Spark cluster application consists of the following components:

1. `Spark Master`: Driver (formerly master) is the most important component of the Spark cluster. It runs all the important services required to keep the cluster functioning like `spark master service`, `spark driver`, `hdfs namenode`, and `hdfs datanode`. Application code runs on this node.
2. `One or more Spark Slaves`: ClusterWorker (formerly slave) nodes run the Spark ClusterWorker service. Spark executor processes are started on these nodes, which process the data.

All the nodes of the cluster are identical in terms of hardware and software configurations.

* Nodes have the same instance-type.
* Nodes have the same set of software installed. Packages listed in `assetDepends`, `bundleDepends` and files under `resources/` are available on all nodes of the cluster.

Application code that runs the main logic of the app runs only on the driver node. ClusterWorker nodes do not run any application code directly. In a Spark cluster, application code running on the driver would submit a Spark job. The Spark driver breaks down the Spark job into multiple tasks and assigns ClusterWorkers to execute them.

### Output Specification

For apps that create databases, you may want to set the new database name as an app output, so the output specification includes the following:

```python
{
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "...": "..."
}
```

### Network and Project Access

`dxspark` apps should have network access to be able to connect to the DNAnexus Hive Metastore. To be able to read databases from the parent project, app(let)s should request VIEW access to the project. To create new databases or write into existing databases in the parent project, app(let)s should request UPLOAD access to the project.

Example `dxapp.json`:

```python
{
  "access": {
    "project": "VIEW",
    "allProjects": "VIEW",
    "network": [ "*" ]
  },
  "...": "..."
}
```

### Ports

The cluster requires specific ports to be opened to communicate between the nodes. The system automatically includes ports required by Spark if the cluster type is `dxspark` or `apachespark`. Additional ports to be opened can be specified in `clusterSpec`.

```python
"systemRequirements": {
 "*": {
   "instanceType": "mem1_ssd1_x4",
   "clusterSpec": {
     "type": "dxspark",
     "version": "3.5.2",
     "initialInstanceCount": 3,
     "ports": "9500, 9700-9750",
     "bootStrapScript":"src/startup.sh"
   }
 }
```

A comma-separated list of ports (or port ranges) can be specified. For example, ports 9500 and ports in range 9700-9750 are opened between the nodes for the above configuration. The platform uses certain ports for internal functions, and those ports should not be used.

### Bootstrapping

Application code in cluster applications runs only on the driver. ClusterWorker nodes do not run any application code. There might be instances where you may need to run some code in all the nodes. For example, starting any services, or downloading data. BootstrapScripts are used for this purpose.

The following is the sequence of executions on `driver` and `clusterWorker` nodes.\\

![](https://1612471957-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-L_EsL_ie8XyZlLe_yf9%2Fuploads%2Fgit-blob-86a2d334706f5849bc9e5abe858fc9b5fd79d2aa%2Fspark_cluster_flow.png?alt=media)

The bootstrap script runs on all nodes and can be used to start any services that the cluster would depend on. The following is an example of a bootstrap script.

```shell
#!/usr/bin/env bash

set -e -o pipefail

. /cluster/dx-cluster.environment
. /home/dnanexus/environment

run_on_master()
{
    set +e -x
    # TODO add code that runs on master
    set -e +x
}

run_on_slave()
{
    #set +e -x
    # TODO add code that runs on slave
    #set -e +x
}

if [ -z "$DX_CLUSTER_MASTER_IP" ]; then
 echo "Bootstrapping master node"
 run_on_master
else
 echo "Bootstrapping slave node"
 run_on_slave
fi
```

### Pre-Bootstrapping

When running a Spark application, you might need to perform certain actions before the Spark cluster bootstraps, as Spark automatically starts on successful installation. Common pre-bootstrap tasks include mounting remote volumes (such as a FUSE mount) or initializing environment-specific configurations.

To create a pre-bootstrap script for your app or applet, you must create a file under the resources directory as follows: `/resources/cluster/hooks/prebootstrap.sh` (the `resources` folder being the app folder that gets extracted over the instances root directory, so `/cluster/` folder is in the root directory of the app instance).

This pre-bootstrap shell script runs during the installation of the `dxspark` package. If the pre-bootstrap script returns a non-zero exit code, the setup of the app fails and the instance terminates. In the case of a multi-node cluster configuration, if setup on a child node fails, it causes the child node to terminate and another to be spun up to take its place, resulting in the pre-bootstrap script being tried again.

If you wish to display the results of the pre-bootstrap script, you can find the log at `$PRE_BOOTSTRAP_LOG` (default value of `/cluster/prebootstrap.log`). Below is an example for how to display it from within your app's startup script:

```shell
if [[ -f $PRE_BOOTSTRAP_LOG ]]; then
    cat $PRE_BOOTSTRAP_LOG
fi
```

### Default Instance Configurations

#### Multi-node Spark Cluster

Based on the instance type, the following spark configurations are automatically set by default.

* `spark.executor.memory` is set to 70% of totalMemoryMB
* `spark.executor.cores` is set to `numCores`
* `spark.driver.memory` is set to 70% of totalMemoryMB

#### Single-node Spark Cluster

* `spark.driver.memory` is set to 1024mb
* `spark.executor.memory` is set to 70% of (totalMemoryMB - 1024mb)
* `spark.executor.cores` is set to `numCores`

### Cluster Life Cycle

The job lifecycle reflects the state of the driver node. A Spark job fails if the driver node fails. If the `clusterWorker` node fails, a new node is provisioned. Spark application failures can also cause the job to fail.

## What to Include in Your Bash Script

The following things should be included in the Bash app's main script, described in the order in which they might be required.

### Distributing Input Data

Unlike regular jobs, input data when downloaded is not automatically available across the complete execution environment. Instead it is available only on the driver node. If the data has to be processed by Spark, it must be distributed to the Spark workers. To distribute data to all nodes, you can put it into HDFS using the following line.

```shell
$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file
```

### DX Spark Submit

The dx-spark-submit utility is the recommended way to submit your app to the Spark cluster, override Spark configs and setup logging.

See more information about [dx-spark-submit](https://documentation.dnanexus.com/developer/apps/developing-spark-apps/dx-spark-submit-utility). If you're using dx-spark-submit you can skip down to the PySpark topic.

### Submitting Applications Without dx-spark-submit

If you chose not to use the dx-spark-submit utility, then you need to perform the traditional Spark steps.

To submit a PySpark application to the Spark cluster, include the following line.

```shell
$SPARK_HOME/bin/spark-submit /opt/pyspark_app.py ...
```

This is the minimum requirement for submitting applications. More advanced options are described below.

#### Configuring Runtime Memory and Cores

```
spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...
```

#### Custom Spark Properties

To pass custom Spark properties for the Spark application, you can do

```shell
$SPARK_HOME/bin/spark-submit \
  --conf $spark_property=$spark_property_value \
  /opt/pyspark_app.py ...
```

or

```shell
$SPARK_HOME/bin/spark-submit \
  --properties-file=/path/to/properties/file \
  /opt/pyspark_app.py ...
```

#### Sharing Environment Variables

```shell
$SPARK_HOME/bin/spark-submit \
  --conf spark.executorEnv.PYTHONPATH=$PYTHONPATH \
  /opt/pyspark_app.py ...
```

#### Custom Log Levels

```shell
$SPARK_HOME/bin/spark-submit \
  --driver-java-options -Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_master_log_level)" \
  --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:"$($DX_CLUSTER_UTILS/get_log_properties.sh $cluster_workers_log_level)" \
  /opt/pyspark_app.py ...
```

The available log levels are `{"WARN", "INFO", "DEBUG", "TRACE"}`. By default, the log levels are WARN, but at times it may be desirable to see more detailed logs for debugging.

#### Collecting Cluster Logs

To enable the collection of Spark cluster logs for debugging after the fact, you must include the following in the output specification

```python
{
  "outputSpec": [
    {
      "name": "cluster_runtime_logs_tarball",
      "label": "Cluster runtime logs tarball",
      "class": "file",
      "patterns": [ "*.tar.gz" ],
      "optional": true,
      "help": "The tarball of the cluster runtime logs"
    },
    "..."
  ],
  "...": "..."
}
```

and the following in the Bash script *after* the `spark-submit` call.

```shell
/cluster/log_collector.sh /home/dnanexus/out/cluster_runtime_logs_tarball
```

The event logs in the tarball could be passed to the Spark history server for debugging after the fact.

## PySpark

The following things should be included in the submitted PySpark script, described in the order in which they might be required.

### Spark Session and Hive Support

Inside the submitted PySpark script (`pyspark_app.py` in the previous examples), a Spark session must be instantiated, and it must have Hive support.

```python
import pyspark

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()
```

Additional options may be specified, but they are outside the scope of this documentation.

### Creating Databases

To create DNAnexus databases, you can use the following line.

```python
spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")
```

The location `'dnax://'` is the custom scheme required for all DNAnexus databases to integrate with the DNAnexus Platform.

## Monitoring the Spark UI

While the Spark job is running, it is possible to interactively monitor the Spark UI. The Spark UI is enabled by default and is running at port 8081.

### Using `httpsApp`

{% hint style="info" %}
A license is required to use this feature. [Contact DNAnexus Sales](mailto:sales@dnanexus.com) for more information.
{% endhint %}

The [`httpsApp`](https://documentation.dnanexus.com/developer/apps/https-applications) feature enables the app to open up HTTPS access to web servers running at ports 443, 8080, and 8081. You can use this feature to expose the Spark UI on the cluster which is running at port 8081.

The following needs to be added to the app's `dxapp.json`:

```python
  "httpsApp": {
      "shared_access":"NONE",
      "ports": [
        8081
      ]
  },
```

Once the app starts running within the Spark context, you can access the Spark UI at `https://job-xxxx.dnanexus.cloud:8081/jobs/`.

### Using Spark History Server

It is still possible to construct the UI of an application through Spark's history server, if the application's event logs exist. When log collection is enabled, event logs are also collected. Spark history server is included in Spark binaries.

1. Download Spark from the [Spark downloads page](https://spark.apache.org/downloads.html) and install Spark.
2. Download the Spark app logs and extract. In `tmp/clusterLogs/eventlogs/ -` you see a file. Copy this, for example, to `/userName/sparkUIEventLogs/`
3. Create a file, for this example `/userName/histProperties.txt`, which has this single line `spark.history.fs.logDirectory=file:/userName/sparkUIEventLogs`.
4. Run the spark history server by using `$SPARK_HOME/sbin/start-history-server.sh --properties-file /userName/histProperties.txt`.
5. Go to <http://localhost:18080> in your browser and check out the jobs, stages, executor usage.

You can add more files to the `/userName/sparkUIEventLogs/` and the history server picks them up.

For more details, refer to Spark documentation on [Viewing After the Fact](https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact).

## Spark App Example

The `dxapp.json`:

```json
{
  "...": "...",
  "outputSpec": [
    {
      "name": "database",
      "label": "Output database",
      "class": "string",
      "patterns": { "class": "database" },
      "help": "The output database"
    },
    "..."
  ],
  "runSpec": {
    "file": "src/bash_app.sh",
    "systemRequirements": {
      "*": {
        "...": "...",
        "clusterSpec": {
          "type": "dxspark",
          "version": "3.5.2",
          "initialInstanceCount": "<num_cluster_nodes>"
      }
    }
  }
  },
  "access": {
    "project": "CONTRIBUTE",
    "allProjects": "VIEW",
    "network": [ "*" ]
  },
}
```

The `bash_app.sh`:

```shell
$HADOOP_HOME/bin/hadoop fs -mkdir -p /hdfs/path/to/dir
$HADOOP_HOME/bin/hadoop fs -put /local/path/to/file /hdfs/path/to/file

spark_executor_memory = ... # Could be parameterized as an app input
spark_executor_cores = ...  # Could be parameterized as an app input
$SPARK_HOME/bin/spark-submit \
  --executor-memory=$spark_executor_memory \
  --executor-cores=$spark_executor_cores \
  /opt/pyspark_app.py ...
```

The `pyspark_app.py`:

```python
import pyspark

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("CREATE DATABASE clinical_data LOCATION 'dnax://'")
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://documentation.dnanexus.com/developer/apps/developing-spark-apps.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
