I first want to start by thanking all the readers of my previous two blogs on the topic of Talend and Apache Spark.
If you are new to this blog series and haven’t read my previous posts, you can start reading them here “Talend and Apache Spark: A Technical Primer” and part two here “Talend vs. Spark Submit Configuration: What’s the Difference?”.
The first two posts in my series about Apache Spark provided an overview of how Talend works with Spark, where the similarities lie between Talend and Spark Submit, and the configuration options available for Spark jobs in Talend.
In this blog, we are going to take a look at Apache Spark performance and tuning. This a common discussion among almost everyone that uses Apache Spark, even outside of Talend. When developing and running your first Spark jobs there are always the following questions that come to mind.
- How many executors should I allocate for my Spark job?
- How much memory does each executor need?
- How many cores should I be using?
- Why do some Spark jobs take hours to process 10GB worth of data and how do I fix that problem?
In this blog, I’m going to go through each of these questions and provide some answers and insight. Before we proceed any further with this topic, let’s introduce some key concepts that are going to be used throughout this blog:
Partition: A partition is a portion of a distributed data set. It is created by the default HDFS block size. Spark utilizes partitions to do parallel processing of data sets.
Tasks: Tasks are the units of work that can be run within an executor.
Core: A core is the processing unit within a CPU that determines the number of parallel tasks in Spark that can be run within an executor.
Executor: A process that is started on worker nodes that runs your job submission in memory or disk.
Application Master: Each YARN application spins up an application master process that has the responsibility to request resources from the resource manager. Once the resources are allocated the process then works with node managers to start the required containers within them.
To begin, let’s start with going over how you can tune your Apache Spark jobs inside Talend. As mentioned previously, in your Talend Spark Job, you’ll find the Spark Configuration tab where you can set tuning properties. This is always unchecked by default in Talend.
In this section, you are given the option to set the memory and cores that your application master and executors will use and how many executors your job will request. Now the main question that comes up once you start filling the values in this section is “How do I determine the number of cores or memory my application master or executors need to have good performance?” Let’s tackle that question.
How to choose the number of cores for your Spark job
At this point, there are a few factors that we need to consider before proceeding any further. These are:
- The size of our datasets
- The time frame in which our job needs to be completed
- The operations and actions that our job is doing
Now with those factors in mind, we can start configuring our job to maximize performance. Let’s first start with tuning our application master. For the application master, we can leave the default values since it only does orchestration of resources and no processing which means there is no need for high values of memory or cores.
Our next step is to configure the memory and cores for our executors. The main question here is how many executors, memory and cores should be used. To find that answer, let’s imagine we have a Hadoop cluster that has 6 worker nodes and each one of them has 32 cores and 120GB of memory. The first thought that probably comes to mind is that the more concurrent tasks that we can have per executor the better our performance will be. When researching this, we can see in performance tuning guides from Hadoop distros like Cloudera as an example in the following link, it has been shown that more than 5 cores per executor will lead to bad HDFS I/O. As a result, the optimal value of cores for performance is 5.
Next, let’s look at how many executors we want to launch. Based on the number of cores and nodes, we can easily determine this number. As we mentioned 5 cores is the optimal number to use per executor. Now, from the each of the 32 cores we have per node we must remove the ones that we cannot use for our jobs as they are needed by the Operating System and Hadoop daemons running on the node. The Hadoop cluster management tool already does that for us, making it easier to determine how many cores per node we have available to use for our Spark jobs.
After making that calculation, let’s assume we are left with 30 cores per node that can be used. Since we already determined that 5 cores is the optimal number per executor that means that we can run up to 6 executors per node. Easy!
Finally, let’s finish up by calculating the amount of memory that we can use. Based on the hardware specs above, we see that there is 120GB of memory per node, but as I mentioned when discussing cores, we cannot use all that memory for jobs as the Operating System needs to use some. Again, our Hadoop cluster management tool can determine how much of that memory can be used for jobs for us. If the Operating System and Hadoop daemons require 2GB of memory, then that leaves us with 118GB of memory to use for our Spark jobs. Since we have already determined that we can have 6 executors per node the math shows that we can use up to roughly 20GB of memory per executor. This though is not 100 percent true as we also should calculate in it, the memory overhead that each executor will have. In my previous blog, I mentioned that the default for the overhead is 384MB. Now if I were to remove that amount from the 20GB then I could roughly say that 19GB will be the highest I can give to an executor.
Dynamic vs. fixed allocation of cluster resources
The numbers above will work for either fixed or dynamic allocation of cluster resources in a Spark Job. The difference between the two of them is dynamic allocation. With dynamic allocation, you can specify the initial number of executors used, a minimum of executors that the job can use when there is not that much workload, and a maximum number when more processing power is needed. Although it would be nice to be able to use all the resources of the cluster for our job we need to share that processing power with other jobs that run on the cluster. As a result, based on what we have identified as our requirements when going over the factors that we defined earlier for considering for tuning our Talend Spark job, it will determine what percentage of those max values we can use.
With our job configured we can now proceed with actually running our job! Let’s say that we still notice though that our Spark job takes a lot of time to complete, even when it has been set up to the max settings defined above. We need to go back to our job and look at a few more settings to ensure they are used for maximum performance.
First, let’s assume that we are joining two tables in our Spark job. One of the factors we considered before starting to optimize our Spark jobs was the size of our datasets. Now, when we look at the size of the tables and we determine that one of them is 50GB and the other one is 100MB, we need to look and see if we are taking advantage within the Talend components of replicated joins.
A replicated join (or otherwise called Map-Side join) is widely used when you join a large table with a small table to broadcast the data from the smaller table to all executors. In this case, since the smaller dataset can fit in memory, we can use a replicated join to broadcast it to every executor and optimize the performance of our Spark job.
Since the table data needs to be combined at the executor level with the side data, by broadcasting the smaller data set to all executors we are avoiding the data of the larger table being sent over the network. A lot of the performance issues in Spark occur because of the shuffling of large amounts of data over the network. This is something that we can check easily within the Talend job — by enabling the option for “Use replicated join” in the tMap component as shown below. This will broadcast the data of the lookup table to all the executors.
The next step is to look and see our job has operations within it that are performing expensive re-computations.
In order to walk through about re-computations, let’s consider a simple example of loading a file with customer purchase data. From this data we want to capture a couple of metrics –
- the total number of customers
- the number of products purchased
In this case, if we don’t use a Spark cache, each operation above will load the data. This will affect our performance as it causes an expensive re-computation to happen. Since we know that this dataset will need to be used down the road in our job, it is best to use Spark Cache to cache it in memory for later use so that we don’t have to keep reloading it.
Within our Talend Spark jobs, this is done with the tCacheIn and tCacheOut components which are available in the Apache Spark palette in Talend and allow you to utilize the Spark caching mechanism offering different options that are available.
Also, you can select if you want to cache the data on disk only, and then you are also given the option for the cached data to be serialized for either memory, disk or both. Finally, you can also select for the cached data to be replicated on 2 other nodes. The most used option is memory without serialization as it is faster, but when we know that the cached RDD cannot fit in the memory and we don’t want to spill to disk, then serialization is selected as it reduces the space consumed by the dataset but it comes at an additional cost of overhead that affects performance. As a result, you should evaluate your options and select the one that best fits your needs.
If performance issues still remain after all that, we need to start looking at the Spark History Web Interface to see what is happening. As mentioned in my previous blog, in the Spark History section of the Spark Configuration in Talend we can enable Spark logging. Spark logging helps with troubleshooting issues with Spark jobs by keeping the logs after the job has finished and makes it available it through the Spark History Web Interface. Having Spark event logging enabled with our Spark jobs is a best practice and allows us to more easily troubleshoot performance issues.
With Spark event logging enabled, you can go to the Spark History Web Interface where we see that we have the following tabs when looking at the application number for our job:
In our Spark UI above we want to look at the stages tab, identify the one that is affecting the performance of our job, go to the details of it, and check to see if we are seeing something like the behavior below:
What we see is that only one is processing most of the data and the rest are idle, even after we have allocated 10 executors. Now, why does this happen? To answer that we need to identify the stage of the job where the problem exists. As an example, we notice that this is happening at the part of the Spark job that we are reading the data from a compressed file. Since archive files aren’t partitioned at read by default, an RDD with a single partition is going to be created for each archive file we read which will cause this behavior to be seen. Now if that compressed file is in an archive format that is split-able like BZIP2, and can be partitioned at read, then in the advanced settings of the tFileInputDelimited we can enable the property “Set Minimum Partitions” and then at the minimum set as many partitions as our executors as a starting point.
But in the case of an archive file like GZIP that cannot be repartitioned at read, we can explicitly repartition it using our tPartition component. This component, as shown below, allows us to repartition the file so that we can distribute the load equally among the executors.
Partitioning at read can also be used when reading from a database using our tJDBC components, using the following properties:
The re-partitioning can only be applied in certain situations as we can see above. If we determine that our dataset is skewed on the keys we are using for the join, then different methods need to be used. So how can we identify the data skew? Start by looing at the dataset by partition and see how our data is grouped among our keys that we are using for the join. Here is an example of how a skewed dataset by partition would look:
In this case, if we cannot repartition by a different key, we should look at other methods to improve our Talend Spark job. One technique that is widely used is called “salting”. With salting, you are adding a fake key to your actual key to even out the distribution of data per partition. This is something that can be done through our tmap component in Spark job as an example like this:
As we can see above, we are adding at the tmap level the fake key as a numeric random, and we are joining it along with our actual key with the lookup dataset that we have also already added the fake kye. Since the joining is happening based on our actual key plus the fake key we generated for the distribution, it will help to avoid skewed partitions that can affect our performance when joining datasets in Spark.
There are many different techniques that we can use to improve the performance and tune our Talend Spark Jobs. I hope the few that we went through in this blog were helpful. Now go forth and build more Spark jobs on Talend!