Spark Apps
Learn to develop Spark apps for use in Apollo.
DNAnexus is working to phase out outdated terminology and change scripts using those terms to remove inappropriate language. The terms "master" and "slave" will be replaced with "driver" and "clusterWorker" in Spark documentation. DNAnexus will also eventually replace the older terms in the codebase. For now, variable names and scripts containing the older terms will still be used in the actual code.
Overview
A license is required to access Spark functionality on the DNAnexus Platform. Contact DNAnexus Sales for more information.
Spark apps are like regular apps except that they are executed on a Spark cluster (instead of on a single machine). When a Spark app is launched, the system instantiates a Spark cluster specifically for that execution. The cluster contains the Apache Spark software -- a modern, scalable framework for parallel processing of big data.
dx-toolkit Versions
Make sure your dx-toolkit has been updated to the latest release to take advantage of the latest features and improvements. Download the latest version at Downloads.
Bash Apps
Spark-based DNAnexus apps should be Bash apps at the top level. If you want to develop PySpark applications, then you can include Python code as resources to your app and submit them to Spark in the Bash script.
App Specifications
The following settings must be included in the app specification dxapp.json
.
Cluster Specifications
The cluster specification describes the makeup of the Spark cluster. You can configure things like cluster type, the Spark version and the number of nodes. The cluster specification is required for Spark apps -- it should be added to the dxapp.json
with the following syntax:
The cluster specification is a mapping with the following key-values:
type
string the cluster type,"dxspark"
for an Apollo Spark cluster or"apachespark"
for generic Spark.version
string Requested version fordxspark
orapachespark
clusters. Supported values are [2.4.4
,3.2.3
]initialInstanceCount
integer the number of nodes in the cluster, including the driver node. It should be at least1
.If the value is 1, then its a single node spark cluster where both driver and clusterWorker are running on the same node.
ports
: (Optional) Comma separated list of ports (or port range) to be opened between the nodes of the cluster.bootstrapScript
: (Optional) Path to the bootstrap script. Bootstrap Script runs on all nodes of the cluster before application code. It's recommended that the script be located at the same location as the application code.
Types of Clusters
The following spark cluster types are currently supported.
dxspark
dxspark
This is the Apollo flavor of Spark which is fully integrated with the rest of the Platform. For example, when you create a database there is a representative database object created in the same project where your app was executed. Those databases can be shared by sharing the project and the permission levels are reflected in the SQL permissions.
apachespark
apachespark
This is generic Apache Spark that you can use to create a Spark cluster and run Spark jobs. You can specify a custom location to store your database files e.g. your own Amazon S3 bucket. There are no Platform database objects created or storage provided, but you can use this cluster type to perform Spark analysis jobs and store results back to the Platform in your output spec.
Only the following combinations of runspec and clusterspec are supported:
Spark Cluster Management Software
Spark apps use Spark Standalone for cluster management and HDFS to share data between the nodes. The following diagram depicts a typical spark cluster application.
Spark cluster application consists of the following components. 1. Spark Master
: Driver (formerly master) is most important component of Spark cluster. It runs all the important services required to keep the cluster functioning like spark master service
, spark driver
, hdfs namenode
, hdfs datanode
. Application code runs on this node. 2. One or more Spark Slaves
: ClusterWorker (formerly slave) nodes run Spark ClusterWorker service. Spark executor processes are started in these nodes which process the data.
All the nodes of the cluster are identical in-terms of hardware and software configurations.
Nodes have the same instance-type.
Nodes have the same set of software installed. Packages listed in
assetDepends
,bundleDepends
and files underresources/
will all be available on all nodes of the cluster.
Application code that runs the main logic of the app runs only on the driver node. ClusterWorker nodes do not run any application code directly. In Spark cluster, application code running on driver would submit a Spark job. The Spark driver breaks down the Spark job into multiple tasks and assigns clusterWorkers to execute them.
Output Specification
For apps that create databases, you may want to set the new database name as an app output, so the output specification will include the following:
Network and Project Access
dxspark
apps should have network access to be able to connect to the DNAnexus Hive Metastore. To be able to read databases from the parent project, app(let)s should request VIEW access to the project. To create new databases or write into existing databases in the parent project, app(let)s should request UPLOAD access to the project.
Example dxapp.json
:
Ports
Cluster requires a number of ports to be opened in order to communicate between the nodes. The system automatically includes ports required by Spark if the cluster type is dxspark
or apachespark
. Additional ports to be opened can be specified in clusterSpec
.
Comma separated list of ports (or port ranges) can be specified. For example, ports 9500 and ports in range 9700-9750 are opened between the nodes for the above configuration. Platform uses certain ports for performing its internal functions and those ports should be not be used.
Bootstrapping
Application code in cluster applications runs only on the driver. ClusterWorker nodes do not run any application code. There might be instances where you may need to run some code in all the nodes. For example, starting any services or downloading data etc. BootstrapScripts are used for this purpose.
The following is the sequence of executions on driver and clusterWorker nodes.
The bootstrap script will run on all nodes and can be used to start any services that the cluster would depend on. The following is an example of bootstrap script.
Pre-Bootstrapping
In some cases when running a Spark application, you may want to perform some actions prior to the Spark cluster being bootstrapped since Spark automatically starts on successful installation. An example of this could be mounting a remote volume (a fuse mount for example).
In order to create a pre-bootstrap script for your app or applet, you must create a file under the resources directory as follows: /resources/cluster/hooks/prebootstrap.sh
(the resources
folder being the app folder that gets extracted over the instances root directory, so /cluster/
folder will be in the root directory of the app instance).
This pre-bootstrap shell script will be run during the installation of the dxspark
package. If the pre-bootstrap script returns a non-zero exit code, the setup of the app will fail and the instance will terminate. In the case of a multi-node cluster configuration, if setup on a child node fails, it will cause the child node to terminate and another to be spun up to take its place, resulting in the pre-bootstrap script being tried again.
If you wish to display the results of the pre-bootstrap script, you can find the log at $PRE_BOOTSTRAP_LOG
(default value of /cluster/prebootstrap.log
). Below is an example for how to display it from within your app's startup script:
Default Instance Configurations
Multi-node Spark Cluster
Based on the instance type, the following spark configurations are automatically set by default.
spark.executor.memory is set to 70% of totalMemoryMB
spark.executor.cores is set to numCores
spark.driver.memory is set to 70% of totalMemoryMB
Single-node Spark Cluster
spark.driver.memory is set to 1024mb
spark.executor.memory is set to 70% of (totalMemoryMB - 1024mb)
spark.executor.cores is set to numCores
Cluster Life Cycle
The job lifecycle reflects the state of the driver node. A Spark job will fail if the driver node fails. If the clusterWorker node fails, a new node will be provisioned. Spark application failures can also cause the job to fail.
What to Include in Your Bash Script
The following things should be included in the Bash app's main script, described in the order in which they might be required.
Distributing Input Data
Unlike regular jobs, input data when downloaded is not automatically available across the complete execution environment. Instead it is available only on the driver node. If the data has to be processed by Spark, it must be distributed to the Spark workers. To distribute data to all nodes, you can put it into HDFS using the following line.
DX Spark Submit
The dx-spark-submit utility is the easy and recommended way to submit your app to the Spark cluster, override Spark configs and setup logging.
For more information on dx-spark-submit click here. If you're using dx-spark-submit you can skip down to the PySpark topic.
Submitting Applications Without dx-spark-submit
If you chose not to use the dx-spark-submit utility, then you need to perform the traditional Spark steps.
To submit a PySpark application to the Spark cluster, include the following line.
This is the minimum requirement for submitting applications. More advanced options are described below.
Configuring Runtime Memory and Cores
Custom Spark Properties
To pass custom Spark properties for the Spark application, you can do
or
Sharing Environment Variables
Custom Log Levels
The available log levels are {"WARN", "INFO", "DEBUG", "TRACE"}
. By default, the log levels are WARN, but at times it may be desirable to see more detailed logs for debugging.
Collecting Cluster Logs
To enable the collection of Spark cluster logs for debugging after the fact, you must include the following in the output specification
and the following in the Bash script after the spark-submit
call.
The event logs in the tarball could be passed to the Spark history server for debugging after the fact.
PySpark
The following things should be included in the submitted PySpark script, described in the order in which they might be required.
Spark Session and Hive Support
Inside the submitted PySpark script (pyspark_app.py
in the previous examples), a Spark session must be instantiated, and it must have Hive support.
Additional options may be specified, but they are outside the scope of this documentation.
Creating Databases
To create DNAnexus databases, you can use the following line.
Note the location 'dnax://'
- this is the custom scheme that must be used for all DNAnexus databases in order to integrate with the DNAnexus Platform.
Monitoring the Spark UI
While the Spark job is running, it is possible to interactively monitor the Spark UI. The Spark UI is enabled by default and is running at port 8081.
Using httpsApp
A license is required to use this feature. Contact DNAnexus Sales for more information.
The httpsApp feature enables the app to open up https access to web servers running at ports [443, 8080, 8081]. You can use this feature to easily expose the Spark UI on the cluster which is running at port 8081.
The following needs to be added to the app's dxapp.json:
Once the app starts running within the Spark context, you can access the Spark UI at https://job-xxxx.dnanexus.cloud:8081/jobs/
Using Spark History Server
It is still possible to construct the UI of an application through Spark’s history server, provided that the application’s event logs exist. When log collection is enabled, event logs are also collected. Spark history server is included in Spark binaries.
Download Spark from https://spark.apache.org/downloads.html and install Spark.
Download the Spark app logs and extract. In
tmp/clusterLogs/eventlogs/ -
you will see a file. Copy this to let's say/userName/sparkUIEventLogs/
Create a file, for this example
/userName/histProperties.txt
, which has this single linespark.history.fs.logDirectory=file:/userName/sparkUIEventLogs
Run the spark history server
$SPARK_HOME/sbin/start-history-server.sh --properties-file /userName/histProperties.txt
. Now go to localhost:18080 in your browser and check out the jobs, stages, executor usage And you can add more files to the/userName/sparkUIEventLogs/
and the history server will pick them up.
For more details refer to: https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact
Spark App Example
The dxapp.json
:
The bash_app.sh
:
The pyspark_app.py
:
Last updated