In my previous blog, “Talend and Apache Spark: A Technical Primer”, I walked you through how Talend Spark jobs equate to Spark Submit. In this blog post, I want to continue evaluating Talend Spark confiurations with Apache Spark Submit. First, we are going to look at how you can map the options in the Apache Spark Configuration tab in the Talend Spark Job, to what you can pass as arguments to Spark Submit and discuss their usage. Let’s get started.
When running an Apache Spark job (like one of the Apache Spark examples offered by default on the Hadoop cluster used to verify that Spark is working as expected) in your environment you use the following commands:
export HADOOP_CONF_DIR=XXX ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --executor-memory 5G --num-executors 10 /path/to/examples.jar 1000
The two commands highlighted above set the directory from where our Spark submit job will read the cluster configuration files. Then, we issue our Spark submit command that will run Spark on a YARN cluster in a client mode, using 10 executors and 5G of memory for each to run our Spark example job.
Now, let’s take a look at how this same Spark example job runs in Talend. When we run a Spark example job (like the one above) in Talend, all the Spark configuration information is entered in the following tab within the run tab:
This raises a few questions. How does information we enter in Talend map to what we enter on the terminal to run a Spark job? How do we know how many executors and memory we requested? What about troubleshooting? We will answer all of these questions in our blog.
Before we proceed any further, I want to introduce some Spark submit options that will be used throughout this blog. According to Apache Spark documentation, these are some of the most common options that you can pass to a Spark submit script:
–class: This is the main entry point for your Spark application.
–master: In this option you specify if your Spark Master is a standalone Spark, or you are going to be using Spark on YARN.
–deploy-mode: As we mentioned in my previous blog this goes to the two different YARN modes you have available and details how your Spark driver will be deployed.
–conf: In this option you will pass additional Spark Configurations that you want your job to utilize like as an example “spark.executor.userClassPathFirst=true”.
–application-jar: This refers to the path where you have placed your Spark compiled code that Apache Spark is going to execute.
–application-arguments: In this option, you pass any arguments that are specific to your Spark code.
Let’s now turn to how the above options are used within a Talend Spark job. You will notice that in Spark Configuration tab under the run tab the different options you can set are logically categorized in the following categories:
- Cluster Version
- Spark History
Let’s start with one of the first options that you have within the Talend job in the Cluster Version category. This is the “Spark Mode” option.
In this option, you can specify if your Spark Master in on YARN or if you are going to be using a Standalone Spark. This option maps to the “–deploy-mode” we described previously for the Spark submit options as well as the “–master” one. For example, if you select Spark Mode as “YARN Client” in Talend, then this will equate to specifying in Spark submit “–master yarn –deploy-mode client”. Now if Standalone mode is selected in that drop-down box, Talend will prompt you to enter the information for your Spark Master URL as you would do on the Spark-submit end. This will map to passing the following argument in spark submit that is “–master spark://127.0.0.1:7077”.
In Talend we have the “Configuration” category that asks for the following information:
In the first set of checkboxes, Talend asks us to enter information about the resource manager, resource manager, scheduler address, jobhistory address, and staging directory.
When using Spark submit all this information is injected to our Spark job, through the HADOOP_CONF_DIR. We can either set this as an environment variable before running our Spark submit script or by setting it as a permanent environment variable in /etc/environment or /etc/profile. As a note, all of these environment variables are also set in an environment shell script that is sourced by spark jobs when running through spark submit. The name of that file is spark-env.sh and it is always located under “/etc/spark/conf” directory on the Spark hosts. Here is an example of how this config file in the cluster looks like:
In the next checkbox, it asks you if you want to define the Hadoop home directory as it is needed sometimes from Spark jobs. In Spark submit jobs, this information is also passed in the same manner but the environment variable name is HADOOP_HOME. In a Talend Spark job, the checkboxes do what it is done by the “spark-env.sh” file for the Spark submit script, which sources those values at runtime of your Spark job.
Finishing the configuration category in the Spark Configuration within Talend, the last option you have defines the hostname or IP address of the Spark driver. This is a useful option when the system that the Spark Job runs from uses internal and external IP’s or there are issues with the hostname resolution that could cause issues when the Spark Master and Executors try to connect back to the Spark Driver.
By default, if this option is not specified then it will try to use the local hostname and resolve its IP address. As we mentioned in the previous Blog, Talend uses YARN — client mode currently so the Spark Driver always runs on the system that the Spark Job is started from. Now mapping this to the options provided by Spark submit, this would be specified by using the “–conf” one and then we would provide the following key/value pair “spark.driver.host=127.0.0.1”. This concludes mapping the options under the configuration sub-menu in Spark Configuration tab.
In the authentication category, we are given the option to select the authentication method that is used by our Hadoop cluster:
If we don’t check anything in the category, our job will assume that simple authentication is used by the cluster, and it will try to connect to our hadoop cluster using the username that we specify in there. In spark submit case, this information would be entered in the applications spark configuration that we are submitting.
Now if we go ahead and check the option to “Use Kerberos authentication”, it will prompt us to add the following information:
The first two fields are the service principal names that are used by the Resource Manager and Job History service. If the option to use a “keytab” is not checked, then when the job runs it will look for the ticket Kerberos cache on the system that it is running under as well as look under the cache that is specific for the user that started the job for valid Kerberos tickets to use.
If the “keytab” option is checked, then you will need specify the keytab to be used along with the principal name of the user that is issued for. This way when the job starts it will generate a Kerberos ticket based on that keytab for the principal that will be used by job. In the case of Spark submit, in your application you would pass in the Spark configuration that you set in the code that Kerberos is used for authentication. Before running though the spark-submit, you would run the kinit Kerberos command to generate a ticket if not using a keytab, or if a keytab is used then you can either run the kinit command with the flags needed to use a keytab for ticket generation or within your Spark application code you specify to login from keytab.
Let’s move on to the category of “Tuning” in Talend that provides the option of “Set tuning properties” which is always unchecked by default. When “Set tuning properties” is checked, we are automatically greeted with the following options:
So, let’s go and see how all those options match to Spark submit.
The first option here is to “Set application master tuning properties” that allows a user to set the amount of memory and number of cores that the YARN Application Master should utilize.
The purpose of the YARN Application Master instance is to do the negotiation of the resources from the Resource Manager and then communicate with the Node Managers to monitor the utilization of resources and execute containers. If this option is not set it will allocate to the YARN Application Master 512m and 1 core by default. When mapping this to how we will pass it as an option to Spark submit, we would use the “–conf” option, and then pass the following two key/value pairs to it “spark.yarn.am.memory=512m,spark.yarn.am.cores=1”.
We can also set a number of additional settings including the number of executors, the amount of memory on each executor, cores per executor, and also set the amount of overhead memory that can be allocated per executor in the next options.
The default values are 1g per executor memory, 1 core per executor, executor memory overhead by default will be 10 percent of the executor memory used with the minimum being 384m, and 2 executors will be requested. Mapping this back to how it would be passed in Spark submit as an option we have two different ways to execute. One of them is to use as we have in the example Spark submit command above “–executor-memory 5G –num-executors 10” or we can pass them using the “–conf” option and then use the following key/value pairs “spark.executor.instances=2, spark.executor.cores=1, spark.executor.memory=2, spark.yarn.executor.memoryOverhead=384m”.
The next option we see available to use it asks us about the YARN resources allocation:
The options that we here are Auto, Fixed, and Dynamic, but what do those mean? Spark provides us with the ability to select how we want executors to be allocated.
If it is left on Auto we notice that the option to set the amount of executors disappears that we mentioned above as it will use the default allocated by YARN which, as we mentioned, is 2 executors. Now if it is set to Fixed, then we will see that we are given the option to set the amount of executors we want our job to request for. Last option is Dynamic, which provides us with the ability to use the mechanism that Spark provides to dynamically adjust the executors allocated to our Spark job based on what it is needed at runtime. This means that our application while running would be able to ask for more executors as needed, and release them back to YARN when not used. We will see that when this option is selected it provides us with the following configuration:
We can now select how many executors we will initially ask for from YARN, and then we can specify the minimum executors the job can have and the maximum amount depending on the workload of our job when executed by Spark. In order to pass the dynamic option in Spark submit you will use the “–conf” option and then use the following key/value pairs “spark.dynamicAllocation.enabled=true, spark.shuffle.service.enabled=true”. Per Spark documentation (https://spark.apache.org/docs/1.6.1/job-scheduling.html#dynamic-resource-allocation target=”_blank”) those two properties are required in to use this feature.
Moving on in the tuning category within the Spark Configuration tab in Talend, the next checkbox is “Set Web UI port”. When selected, it gives you the option to specify a port with the default being “4040”. The purpose of this option is when your Spark Application is running, the Spark driver starts a Web UI that can be used to monitor your running Spark job and inspect the execution of the job. If this option is not selected it will go ahead and start with the default port mentioned above and keep increasing the port number until an open one is found. This option would be usually used if you know that port 4040 is not available on the system that you are running your Spark job from, and you want to specify a certain port to be used, instead of the Spark application trying to find an open port. As far as setting this option in spark submit you will use the “–conf” option and then use the following key/value pair of “spark.ui.port=4041”.
Now the next option we have available to select is “Broadcast Factory” and we notice that for this one we are given a few different options:
So, what does “Broadcast Factory” do? The responsibility of Broadcast in Spark applications is for broadcasting variables across the executors in your cluster. The reasoning behind it is so that the variable can be quickly and efficiently get distributed instead of having a single node doing everything. As we noticed we are offered with 3 options to select from in this case. The first option is Auto which when selected it will let default’s to be used. The second and third option allows you to select between using Torrent or HTTP as the broadcast factory. In spark submit you would pass this using the “–conf” option and then use the following key/value pair “spark.broadcast.factory=org.apache.spark.broadcast.TorrentBroadcastFactory” if you don’t want the default to be used which is usually the Torrent one.
The last option that we are offered in the Tuning category, is to customize the Spark serializer to be used:
The importance of Serialization in Spark as also described in the Spark documentation (https://spark.apache.org/docs/latest/tuning.html#data-serialization) is to serialize the data among executors to increase performance in a distributed environment. By default, if this option is not selected by default Talend will set the serialization to be used as the Kryo Serialization that is considered the most efficient one. When trying to use the same exact option in spark submit the “–conf” option will be used and then specify the following key/value pair “spark.serializer=org.apache.spark.serializer.KryoSerializer”. If this option is not specified in spark submit the default Java Serializer will be used, and if the Spark SQL Thrift Server is used then it will utilize by default the Kryo one.
Now let’s move to the last category, “Spark History”. When we enable Spark logging, we notice that we are provided with the following options:
When event Logging is enabled you are given the option to specify a directory in HDFS where the job history files can be read by the Spark History server, and specify the address of the History server. In Spark submit, in order to enable it, you will have to pass the following key/value pairs to the “–conf” option to enable and set it up which are “spark.eventLog.enabled=true,spark.eventLog.dir=hdfs://namenode_host:namenode_port/user/spark/applicationHistory,spark.yarn.historyServer.address=http://spark_history_server:history_port”.
Now that we are done with the different categories in the Spark Configuration tab, we will notice that we have three more options left that we can use. The first one is “Spark “scratch” directory”. This option specifies the scratch directory that it is going to be used on the local disk of your system where the Spark job is started while your application is running. Using Spark submit we would utilize the “–conf” and then pass “spark.local.dir=/tmp”. If we don’t specify anything then by default the /tmp directory will be used.
The next option is used for activating Spark checkpoints. This gives our Spark job the ability to recover from a specific point in time in case of failure. When activated we will notice that it gives the opportunity to specify a directory either in the local filesystem or in HDFS to save as the job progresses. If we were to enable it in Spark submit, this will have to be done as also pointed out in the Spark documentation within our Spark code. An example is provided within the Spark documentation in this page (https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing).
The last option is the Advanced Properties. In this option we can add any Spark Properties that we want to pass in our application in a key/value pair. This is the same with what you will do when using Spark submit as in that case you would be passing them in the “–conf” option.
As a note, when you take a closer look at your Hadoop cluster and one of the Spark Gateway nodes, you will notice that a lot of the default selections mentioned above are already specified within a specific file named spark-defaults.conf that will be used when you run Spark submit. This file is located under “/etc/spark/conf”. If you go ahead and open the file you will see most of those properties mentioned here in there. You can still though override them as mentioned above, by passing them as options in your Spark submit. Here is an example:
Talend provides all the different options that you can use to configure your Spark application and makes it easy with the checkboxes and drop-down selections, to specify the options that you want to utilize and which defaults are going to be used. I invite you to go through all those different settings that you can use in Talend Spark Jobs, and experience how easy it is to configure them and optimize them for your environment.