Optimisation of Spark applications
Apache Spark <https://www.adaltas.com/en/tag/apache-spark/>
is an in-memory
data processing tool widely used in companies to deal with Big Data issues.
Running a Spark application in production requires user-defined resources.
This article presents several Spark concepts to optimize the use of the
engine, both in the writing of the code and in the selection of execution
parameters. These concepts will be illustrated through a use case with a
focus on best practices for allocating ressources of a Spark applications
in a Hadoop Yarn <https://www.adaltas.com/en/tag/apache-yarn/>
Cluster: terminologies and modes
Deploying a Spark application in a YARN cluster requires an understanding
of the “master-slave” model as well as the operation of several components:
the Cluster Manager, the Spark Driver, the Spark Executors and the Edge
The “master-slave” model defines two types of entities: the master controls
and centralizes the communications of the slaves. It is a model that is
often applied in the implementation of clusters and/or for parallel
processing. It is also the model used by Spark applications.
The *Cluster Manager* maintains the physical machines on which the Driver
and its Executors are going to run and allocates the requested resources to
the users. Spark supports 4 Cluster Managers: Apache YARN, Mesos,
Standalone and, recently, Kubernetes. We will focus on YARN.
The *Spark Driver* is the entity that manages the execution of the Spark
application (the master), each application is associated with a Driver. Its
role is to interpret the application’s code to transform it into a sequence
of tasks and to maintain all the states and tasks of the Executors.
The *Spark Executors* are the entities responsible for performing the tasks
assigned to them by the Driver (the slaves). They will read these tasks,
execute them and return their states (Success/Fail) and results. The
Executors are linked to only one application at a time.
The *Edge Node* is a physical/virtual machine where users will connect to
instantiate their Spark applications. It serves as an interface between the
cluster and the outside world. It is a comfort zone where components are
pre-installed and most importantly, pre-configured.
There are different ways to deploy a Spark application:
- The *Cluster* mode: This is the most common, the user sends a JAR file
or a Python script to the Cluster Manager. The latter will instantiate a
Driver and Executors on the different nodes of the cluster. The CM is
responsible for all processes related to the Spark application. We will use
it to handle our example: it facilitates the allocation of resources and
releases them as soon as the application is finished.
- The *Client* mode: Almost identical to *cluster* mode with the
difference that the driver is instantiated on the machine where the job is
submitted, i.e. outside the cluster. It is often used for program
development because the logs are directly displayed in the current
terminal, and the instance of the driver is linked to the user’s session.
This mode is not recommended in production because the Edge Node can
quickly reach saturation in terms of resources and the Edge Node is a SPOF
(Single Point Of Failure).
- The *Local* mode: the Driver and Executors run on the machine on which
the user is logged in. It is only recommended for the purpose of testing an
application in a local environment or for executing unit tests.
The number of Executors and their respective resources are provided
directly in the spark-submit command, or via the configuration properties
injected at the creation of the SparkSession object. Once the Executors are
created, they will communicate with the Driver, which will distribute the
A Spark application works as follows: data is stored in memory, and the
CPUs are responsible for performing the tasks of an application. The
application is therefore constrained by the resources used, including
memory and CPUs, which are defined for the Driver and Executors.
Spark applications can generally be divided into two types:
- *Memory-intensive*: Applications involving massive joins or HashMap
processing. These operations are expensive in terms of memory.
- *CPU-intensive*: All applications involving sorting operations or
searching for particular data. These types of jobs become intensive
depending on the frequency of these operations.
Some applications are both memory intensive and CPU intensive: some models
of Machine Learning, for example, require a large number of computationally
intensive operation loops and store the intermediate results in memory.
The operation of the Executor memory
main parts concerning storage and execution. Thanks to the *Unified Memory
Manager* mechanism, memory-storage and run-time memory share the same
space, allowing one to occupy the unused resources of the other.
- The first is for storing data in the cache when using, for example,
.cache() or broadcast().
- The other part (execution) is used to store the temporary results of
*shuffle*, *join*, aggregation, etc. processes.
Memory allocation to Executors is closely related to CPU allocation: one
core performs a task on one partition, so if an Executor has 4 cores, it
must have the capacity to store all 4 partitions as well as intermediate
results, metadata, etc… Thus, the user has to fix the amount of memory and
cores allocated to each Executor according to the