This is second in a two part series that talks about Text Normalization using Spark.In this blog post, we are going to understand the jargon (jobs,stags and executors) of Apache Spark with Text Normalization application using Spark history server UI. To get a better understanding of the use case, please refer our Text Normalization with Spark – Part 1 blog post.
Spark History Server UI
Web UI (aka Application UI or webUI or Spark UI) is the web interface of a running Spark application to monitor and inspect Spark job executions in a web browser.Web UI comes with the following tabs,
- Storage (RDD size and memory use)
Understanding Text Normalization Application
In the previous blog post we have submitted Text Normalization application using spark submit command in YARN cluster. We can monitor that application and inspect how spark constructs jobs and stages for that application in web UI.
Jobs: In Apache Spark execution terminology, operations that physically move data to produce some results are called “jobs”. A job is associated with a chain of RDD dependencies organized in a Direct Acyclic Graph (DAG). Jobs will be triggered in two ways as follows:
- Jobs are triggered by user API calls (so-called “Action” APIs such as “count” to count records).
- Jobs live behind the scenes and are implicitly triggered (sortByKey runs a job to assess the distribution of the data).
Totally, 3 jobs were created for our example “NormalizedMostPopularWords”.
Event Timeline: The sequence of events here is fairly straightforward. Shortly after all executors have registered, the application runs 3 jobs one after another. When all jobs have finished and the application exits, the executors are removed with it.
Job 0: This job is triggered by “collect” action. This job will collect and convert into Set from stopWords RDD. stopWords RDD contain entire “stopwords_en.txt” file data.
val stopWords = sc.textFile(args(1))
val stopWordSet = stopWords.collect.toSet
Job 1: This job is triggered by sortByKey() and it is the transformation. But, it launches job in cluster as sortByKey runs a job to assess the distribution of the data.
// Flip (word, count) tuples to (count, word) and then sort by key (the counts)
val wordCountsSorted = wordCounts.map( x => (x._2, x._1) ).sortByKey()
Job 2: This job is triggered by saveAsTextFile() action. This job will store the final output into HDFS.
// Save the file into HDFS.
Stages: Jobs are decomposed into “stages” by separating where a shuffle is required. The more stages you have – the more data shuffle, which indicates jobs are running in a least efficient manner. To avoid more stages, explicitly partitioning data will be helped using partitionBy (). Totally, 5 stages from 3 jobs were created from our example “NormalizedMostPopularWords”.
Why there are additional Stages (1 and 4) for Jobs (1 and 2)? As already indicated, the number of stages depends on the spark that needs to shuffle data. Let us consider Job 1 and Job 2 to understand the additional stages.
Job 1 DAG: This job is triggered due to sortByKey() operation. The job DAG clearly indicates the list of operations used before the sortByKey() operations.There is a need for shuffle while using reduceByKey() operation. That is why a separate stage – “Stage 2” is created. reduceByKey () is a transformation operation and is a wide operation when data shuffling happens. Spark RDD reduceByKey function merges the values of each key using an associative reduce function.
Note: Stage 2 has both reduceByKey() and sortByKey() operations and as indicated in job summary “saveAsTextFile()” action triggered Job 2.
Do you have any guess whether Stage 2 will be further divided into other stages in Job 2?
The answer is: yes
Job 2 DAG: This job is triggered due to saveAsTextFile() action operation. The job DAG clearly indicates the list of operations used before the saveAsTextFile() operations.Stage 2 in Job 1 is further divided into another stage as Stage 2. In Stage 2 has both reduceByKey() and sortByKey() operations and both operations can shuffle the data so that Stage 2 in Job 1 is broken down into Stage 4 and Stage 5 in Job 2. There are three stages in this job. But, Stage 3 is skipped. The answer for the skipped stage is provided below “What does “Skipped Stages” mean in Spark?” section.
Why there is no “Stage 3″ in Stage Summary UI? The answer is in Job 2 DAG. Note down the “Skipped Stage: 1” in the top, it is is “Stage 3”.
What does “Skipped Stages” mean in Spark? Typically, it means that data has been fetched from cache and need not re-execute the given stage. It is consistent with your DAG, which shows that the next stage requires shuffling (reduceByKey). Whenever there is shuffling, Spark automatically caches generated data.
Tasks within Stages
Task: represents a unit of work on a partition of a distributed dataset. Once stages are figured out, spark will generate tasks from stages.The number of tasks to be generated depends on how your files are distributed. Suppose that you have 3 three different files in three different nodes, the first stage will generate 3 tasks : one task per partition. Stage 5 – There are two tasks executed in two different executors.
Event Timeline: Each bar represents a single task within the stage. From this timeline view, we can gather several insights about this stage. First, the partitions are fairly well distributed across the machines. Second, a majority of the task execution time comprises of raw computation rather than network or I/O overheads, which is not surprising because we are shuffling very little data. Third, the level of parallelism can be increased if we allocate more cores to the executors.
This page indicates the number of executors used for our example “NormalizedMostPopularWords”.Default number of executors while executing in YARN cluster is 2.
Now, we understand how Spark constructs DAG,jobs,stages and tasks for Text normalization applications. Understanding these Spark internal parts will help us to optimize our application and to be executed in a efficient manner in cluster.
- The code examples and input files are available in GitHub.
GitHub Location: https://github.com/treselle-systems/text_normalization_using_spark
- Please refer Text Normalization with Spark – Part 1 for the first blog post.