Chapter 1. Secondary Sort: Introduction
A secondary sort problem relates to sorting values associated with a key in the reduce phase. Sometimes, it is called value-to-key conversion. The secondary sorting technique will enable us to sort the values (in ascending or descending order) passed to each reducer. I will provide concrete examples of how to achieve secondary sorting in ascending or descending order.
The goal of this chapter is to implement the Secondary Sort design pattern in MapReduce/Hadoop and Spark. In software design and programming, a design pattern is a reusable algorithm that is used to solve a commonly occurring problem. Typically, a design pattern is not presented in a specific programming language but instead can be implemented by many programming languages.
The MapReduce framework automatically sorts the keys generated by mappers. This means that, before starting reducers, all intermediate key-value pairs generated by mappers must be sorted by key (and not by value). Values passed to each reducer are not sorted at all; they can be in any order. What if you also want to sort a reducer’s values? MapReduce/Hadoop and Spark do not sort values for a reducer. So, for those applications (such as time series data) in which you want to sort your reducer data, the Secondary Sort design pattern enables you to do so.
First we’ll focus on the MapReduce/Hadoop solution. Let’s look at the MapReduce paradigm and then unpack the concept of the secondary sort:
map
(key1
,value1
) →list
(key2
,value2
)reduce
(key2
,list
(value2
)) →list
(key3
,value3
)
First, the map()
function receives a key-value pair input, (key1
, value1
). Then it outputs any number of key-value pairs, (key2
, value2
). Next, the reduce()
function receives as input another key-value pair, (key2
, list
(value2
)), and outputs any number of (key3
, value3
) pairs.
Now consider the following key-value pair, (key2
, list
(value2
)), as an input for a reducer:
list
(value2
) = (V1
,V2
, ...,Vn
)
where there is no ordering between reducer values (V1
, V2
, ..., Vn
).
The goal of the Secondary Sort pattern is to give some ordering to the values received by a reducer. So, once we apply the pattern to our MapReduce paradigm, then we will have:
SORT(V1, V2, ..., Vn) = (S1, S2, ..., Sn)
list(value2) = (S1, S2, ..., Sn)
where:
-
S1 < S2 < ... < Sn
(ascending order), or -
S1 > S2 > ... > Sn
(descending order)
Here is an example of a secondary sorting problem: consider the temperature data from a scientific experiment. A dump of the temperature data might look something like the following (columns are year
, month
, day
, and daily temperature
, respectively):
2012, 01, 01, 5 2012, 01, 02, 45 2012, 01, 03, 35 2012, 01, 04, 10 ... 2001, 11, 01, 46 2001, 11, 02, 47 2001, 11, 03, 48 2001, 11, 04, 40 ... 2005, 08, 20, 50 2005, 08, 21, 52 2005, 08, 22, 38 2005, 08, 23, 70
Suppose we want to output the temperature for every year-month
with the values sorted in ascending order. Essentially, we want the reducer values iterator to be sorted. Therefore, we want to generate something like this output (the first column is year-month
and the second column is the sorted temperatures):
2012-01: 5, 10, 35, 45, ... 2001-11: 40, 46, 47, 48, ... 2005-08: 38, 50, 52, 70, ...
Solutions to the Secondary Sort Problem
There are at least two possible approaches for sorting the reducer values. These solutions may be applied to both the MapReduce/Hadoop and Spark frameworks:
-
The first approach involves having the reducer read and buffer all of the values for a given key (in an array data structure, for example), then doing an in-reducer sort on the values. This approach will not scale: since the reducer will be receiving all values for a given key, this approach might cause the reducer to run out of memory (
java.lang.OutOfMemoryError
). On the other hand, this approach can work well if the number of values is small enough that it will not cause an out-of-memory error. -
The second approach involves using the MapReduce framework for sorting the reducer values (this does not require in-reducer sorting of values passed to the reducer). This approach consists of “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” For the details on this approach, see Java Code Geeks. This option is scalable and will not generate out-of-memory errors. Here, we basically offload the sorting to the MapReduce framework (sorting is a paramount feature of the MapReduce/Hadoop framework).
This is a summary of the second approach:
- Use the Value-to-Key Conversion design pattern: form a composite intermediate key, (
K
,V1
), whereV1
is the secondary key. Here,K
is called a natural key. To inject a value (i.e.,V1
) into a reducer key, simply create a composite key (for details, see theDateTemperaturePair
class). In our example,V1
is thetemperature
data. - Let the MapReduce execution framework do the sorting (rather than sorting in memory, let the framework sort by using the cluster nodes).
- Preserve state across multiple key-value pairs to handle processing; you can achieve this by having proper mapper output partitioners (for example, we partition the mapper’s output by the natural key).
- Use the Value-to-Key Conversion design pattern: form a composite intermediate key, (
Implementation Details
To implement the secondary sort feature, we need additional plug-in Java classes. We have to tell the MapReduce/Hadoop framework:
-
How to sort reducer keys
-
How to partition keys passed to reducers (custom partitioner)
-
How to group data that has arrived at each reducer
Sort order of intermediate keys
To accomplish secondary sorting, we need to take control of the sort order of intermediate keys and the control order in which reducers process keys. First, we inject a value (temperature
data) into the composite key, and then we take control of the sort order of intermediate keys. The relationships between the natural key, composite key, and key-value pairs are depicted in Figure 1-1.
The main question is what value we should add to the natural key to accomplish the secondary sort. The answer is the temperature
data field (because we want the reducers’ values to be sorted by temperature
). So, we have to indicate how DateTemperaturePair
objects should be sorted using the compareTo()
method. We need to define a proper data structure for holding our key and value, while also providing the sort order of intermediate keys. In Hadoop, for custom data types (such as DateTemperaturePair
) to be persisted, they have to implement the Writable
interface; and if we are going to compare custom data types, then they have to implement an additional interface called WritableComparable
(see Example 1-1).
Example 1-1. DateTemperaturePair class
1
import
org.apache.hadoop.io.Writable
;
2
import
org.apache.hadoop.io.WritableComparable
;
3
...
4
public
class
DateTemperaturePair
5
implements
Writable
,
WritableComparable
<
DateTemperaturePair
>
{
6
7
private
Text
yearMonth
=
new
Text
();
// natural key
8
private
Text
day
=
new
Text
();
9
private
IntWritable
temperature
=
new
IntWritable
();
// secondary key
10
11
...
12
13
@Override
14
/**
15 * This comparator controls the sort order of the keys.
16 */
17
public
int
compareTo
(
DateTemperaturePair
pair
)
{
18
int
compareValue
=
this
.
yearMonth
.
compareTo
(
pair
.
getYearMonth
());
19
if
(
compareValue
==
0
)
{
20
compareValue
=
temperature
.
compareTo
(
pair
.
getTemperature
());
21
}
22
//return compareValue; // sort ascending
23
return
-
1
*
compareValue
;
// sort descending
24
}
25
...
26
}
Custom partitioner
In a nutshell, the partitioner decides which mapper’s output goes to which reducer based on the mapper’s output key. For this, we need two plug-in classes: a custom partitioner to control which reducer processes which keys, and a custom Comparator
to sort reducer values. The custom partitioner ensures that all data with the same key (the natural key, not including the composite key with the temperature
value) is sent to the same reducer. The custom Comparator
does sorting so that the natural key (year-month
) groups the data once it arrives at the reducer.
Example 1-2. DateTemperaturePartitioner class
1
import
org.apache.hadoop.io.Text
;
2
import
org.apache.hadoop.mapreduce.Partitioner
;
3
4
public
class
DateTemperaturePartitioner
5
extends
Partitioner
<
DateTemperaturePair
,
Text
>
{
6
7
@Override
8
public
int
getPartition
(
DateTemperaturePair
pair
,
9
Text
text
,
10
int
numberOfPartitions
)
{
11
// make sure that partitions are non-negative
12
return
Math
.
abs
(
pair
.
getYearMonth
().
hashCode
()
%
numberOfPartitions
);
13
}
14
}
Hadoop provides a plug-in architecture for injecting the custom partitioner code into the framework. This is how we do so inside the driver class (which submits the MapReduce job to Hadoop):
import org.apache.hadoop.mapreduce.Job;
... Jobjob
=
...;
... job.setPartitionerClass(
TemperaturePartitioner.class)
;
Grouping comparator
In Example 1-3, we define the comparator (DateTemperatureGroupingComparator
class) that controls which keys are grouped together for a single call to the Reducer.reduce()
function.
Example 1-3. DateTemperatureGroupingComparator class
1
import
org.apache.hadoop.io.WritableComparable
;
2
import
org.apache.hadoop.io.WritableComparator
;
3
4
public
class
DateTemperatureGroupingComparator
5
extends
WritableComparator
{
6
7
public
DateTemperatureGroupingComparator
()
{
8
super
(
DateTemperaturePair
.
class
,
true
);
9
}
10
11
@Override
12
/**
13 * This comparator controls which keys are grouped
14 * together into a single call to the reduce() method
15 */
16
public
int
compare
(
WritableComparable
wc1
,
WritableComparable
wc2
)
{
17
DateTemperaturePair
pair
=
(
DateTemperaturePair
)
wc1
;
18
DateTemperaturePair
pair2
=
(
DateTemperaturePair
)
wc2
;
19
return
pair
.
getYearMonth
().
compareTo
(
pair2
.
getYearMonth
());
20
}
21
}
Hadoop provides a plug-in architecture for injecting the grouping comparator code into the framework. This is how we do so inside the driver class (which submits the MapReduce job to Hadoop):
job
.
setGroupingComparatorClass
(
YearMonthGroupingComparator
.
class
);
Data Flow Using Plug-in Classes
To help you understand the map()
and reduce()
functions and custom plug-in classes, Figure 1-2 illustrates the data flow for a portion of input.
The mappers create (K,V) pairs, where K is a composite key of (year,month,temperature)
and V is temperature
. The (year,month)
part of the composite key is the natural key. The partitioner plug-in class enables us to send all natural keys to the same reducer and the grouping comparator plug-in class enables temperatures to arrive sorted at reducers. The Secondary Sort design pattern uses MapReduce’s framework for sorting the reducers’ values rather than collecting them all and then sorting them in memory. The Secondary Sort design pattern enables us to “scale out” no matter how many reducer values we want to sort.
MapReduce/Hadoop Solution to Secondary Sort
This section provides a complete MapReduce implementation of the secondary sort problem using the Hadoop framework.
Input
The input will be a set of files, where each record (line) will have the following format:
Format: <year><,><month><,><day><,><temperature> Example: 2012, 01, 01, 35 2011, 12, 23, -4
Expected Output
The expected output will have the following format:
Format: <year><-><month>: <temperature1><,><temperature2><,> ... where temperature1 <= temperature2 <= ... Example: 2012-01: 5, 10, 35, 45, ... 2001-11: 40, 46, 47, 48, ... 2005-08: 38, 50, 52, 70, ...
map() Function
The map()
function parses and tokenizes the input and then injects the value (temperature
) into the reducer key, as shown in Example 1-4.
Example 1-4. map() for secondary sorting
1
/**
2 * @param key is generated by Hadoop (ignored here)
3 * @param value has this format: "YYYY,MM,DD,temperature"
4 */
5
map
(
key
,
value
)
{
6
String
[]
tokens
=
value
.
split
(
","
);
7
// YYYY = tokens[0]
8
// MM = tokens[1]
9
// DD = tokens[2]
10
// temperature = tokens[3]
11
String
yearMonth
=
tokens
[
0
]
+
tokens
[
1
];
12
String
day
=
tokens
[
2
];
13
int
temperature
=
Integer
.
parseInt
(
tokens
[
3
]);
14
// prepare reducer key
15
DateTemperaturePair
reducerKey
=
new
DateTemperaturePair
();
16
reducerKey
.
setYearMonth
(
yearMonth
);
17
reducerKey
.
setDay
(
day
);
18
reducerKey
.
setTemperature
(
temperature
);
// inject value into key
19
// send it to reducer
20
emit
(
reducerKey
,
temperature
);
21
}
reduce() Function
The reducer’s primary function is to concatenate the values (which are already sorted through the Secondary Sort design pattern) and emit them as output. The reduce()
function is given in Example 1-5.
Example 1-5. reduce() for secondary sorting
1
/**
2 * @param key is a DateTemperaturePair object
3 * @param value is a list of temperatures
4 */
5
reduce
(
key
,
value
)
{
6
StringBuilder
sortedTemperatureList
=
new
StringBuilder
();
7
for
(
Integer
temperature
:
value
)
{
8
sortedTemperatureList
.
append
(
temperature
);
9
sortedTemperatureList
.
append
(
","
);
10
}
11
emit
(
key
,
sortedTemperatureList
);
12
}
Hadoop Implementation Classes
The classes shown in Table 1-1 are used to solve the problem.
Class name | Class description |
---|---|
SecondarySortDriver |
The driver class; defines input/output and registers plug-in classes |
SecondarySortMapper |
Defines the map() function |
SecondarySortReducer |
Defines the reduce() function |
DateTemperatureGroupingComparator |
Defines how keys will be grouped together |
DateTemperaturePair |
Defines paired date and temperature as a Java object |
DateTemperaturePartitioner |
Defines custom partitioner |
How is the value injected into the key? The first comparator (the DateTemperaturePair.compareTo()
method) controls the sort order of the keys, while the second comparator (the DateTemperatureGroupingComparator.compare()
method) controls which keys are grouped together into a single call to the reduce()
method. The combination of these two comparators allows you to set up jobs that act like you’ve defined an order for the values.
The SecondarySortDriver
is the driver class, which registers the custom plug-in classes (DateTemperaturePartitioner
and DateTemperatureGroupingComparator
) with the MapReduce/Hadoop framework. This driver class is presented in Example 1-6.
Example 1-6. SecondarySortDriver class
1
public
class
SecondarySortDriver
extends
Configured
implements
Tool
{
2
public
int
run
(
String
[]
args
)
throws
Exception
{
3
Configuration
conf
=
getConf
();
4
Job
job
=
new
Job
(
conf
);
5
job
.
setJarByClass
(
SecondarySortDriver
.
class
);
6
job
.
setJobName
(
"SecondarySortDriver"
);
7
8
Path
inputPath
=
new
Path
(
args
[
0
]);
9
Path
outputPath
=
new
Path
(
args
[
1
]);
10
FileInputFormat
.
setInputPaths
(
job
,
inputPath
);
11
FileOutputFormat
.
setOutputPath
(
job
,
outputPath
);
12
13
job
.
setOutputKeyClass
(
TemperaturePair
.
class
);
14
job
.
setOutputValueClass
(
NullWritable
.
class
);
15
16
job
.
setMapperClass
(
SecondarySortingTemperatureMapper
.
class
);
17
job
.
setReducerClass
(
SecondarySortingTemperatureReducer
.
class
);
18
job
.
setPartitionerClass
(
TemperaturePartitioner
.
class
);
19
job
.
setGroupingComparatorClass
(
YearMonthGroupingComparator
.
class
);
20
21
boolean
status
=
job
.
waitForCompletion
(
true
);
22
theLogger
.
info
(
"run(): status="
+
status
);
23
return
status
?
0
:
1
;
24
}
25
26
/**
27 * The main driver for the secondary sort MapReduce program.
28 * Invoke this method to submit the MapReduce job.
29 * @throws Exception when there are communication
30 * problems with the job tracker.
31 */
32
public
static
void
main
(
String
[]
args
)
throws
Exception
{
33
// Make sure there are exactly 2 parameters
34
if
(
args
.
length
!=
2
)
{
35
throw
new
IllegalArgumentException
(
"Usage: SecondarySortDriver"
+
36
" <input-path> <output-path>"
);
37
}
38
39
//String inputPath = args[0];
40
//String outputPath = args[1];
41
int
returnStatus
=
ToolRunner
.
run
(
new
SecondarySortDriver
(),
args
);
42
System
.
exit
(
returnStatus
);
43
}
44
45
}
Sample Run of Hadoop Implementation
Input
# cat sample_input.txt
2000,12,04, 10
2000,11,01,20
2000,12,02,-20
2000,11,07,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,22,80
2013,01,23,90
2013,01,24,70
2013,01,20,-10
HDFS input
# hadoop fs -mkdir /secondary_sort
# hadoop fs -mkdir /secondary_sort/input
# hadoop fs -mkdir /secondary_sort/output
# hadoop fs -put sample_input.txt /secondary_sort/input/
# hadoop fs -ls /secondary_sort/input/
Found1
items -rw-r--r--1
...128
... /secondary_sort/input/sample_input.txt
The script
# cat run.sh
export
JAVA_HOME
=
/usr/java/jdk7export
BOOK_HOME
=
/home/mp/data-algorithms-bookexport
APP_JAR
=
$BOOK_HOME
/dist/data_algorithms_book.jarINPUT
=
/secondary_sort/inputOUTPUT
=
/secondary_sort/output$HADOOP_HOME
/bin/hadoop fs -rmr$OUTPUT
PROG
=
org.dataalgorithms.chap01.mapreduce.SecondarySortDriver$HADOOP_HOME
/bin/hadoop jar$APP_JAR
$PROG
$INPUT
$OUTPUT
Log of sample run
# ./run.sh
... Deleted hdfs://localhost:9000/secondary_sort/output 13/02/27 19:39:54 INFO input.FileInputFormat: Total input paths to process : 1 ... 13/02/27 19:39:54 INFO mapred.JobClient: Running job: job_201302271939_0001 13/02/27 19:39:55 INFO mapred.JobClient: map 0% reduce 0% 13/02/27 19:40:10 INFO mapred.JobClient: map 100% reduce 0% 13/02/27 19:40:22 INFO mapred.JobClient: map 100% reduce 10% ... 13/02/27 19:41:10 INFO mapred.JobClient: map 100% reduce 90% 13/02/27 19:41:16 INFO mapred.JobClient: map 100% reduce 100% 13/02/27 19:41:21 INFO mapred.JobClient: Jobcomplete
: job_201302271939_0001 ... 13/02/27 19:41:21 INFO mapred.JobClient: Map-Reduce Framework ... 13/02/27 19:41:21 INFO mapred.JobClient: Reduce inputrecords
=
14 13/02/27 19:41:21 INFO mapred.JobClient: Reduce inputgroups
=
4 13/02/27 19:41:21 INFO mapred.JobClient: Combine outputrecords
=
0 13/02/27 19:41:21 INFO mapred.JobClient: Reduce outputrecords
=
4 13/02/27 19:41:21 INFO mapred.JobClient: Map outputrecords
=
14 13/02/27 19:41:21 INFO SecondarySortDriver: run()
:status
=
true
13/02/27 19:41:21 INFO SecondarySortDriver:returnStatus
=
0
How to Sort in Ascending or Descending Order
You can easily control the sorting order of the values (ascending or descending) by using the DateTemperaturePair.compareTo()
method as follows:
1
public
int
compareTo
(
DateTemperaturePair
pair
)
{
2
int
compareValue
=
this
.
yearMonth
.
compareTo
(
pair
.
getYearMonth
());
3
if
(
compareValue
==
0
)
{
4
compareValue
=
temperature
.
compareTo
(
pair
.
getTemperature
());
5
}
6
//return compareValue; // sort ascending
7
return
-
1
*
compareValue
;
// sort descending
8
}
Spark Solution to Secondary Sort
To solve a secondary sorting problem in Spark, we have at least two options:
- Option #1
- Read and buffer all of the values for a given key in an
Array
orList
data structure and then do an in-reducer sort on the values. This solution works if you have a small set of values (which will fit in memory) per reducer key.
- Option #2
- Use the Spark framework for sorting the reducer values (this option does not require in-reducer sorting of values passed to the reducer). This approach involves “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” This option always scales (because you are not limited by the memory of a commodity server).
Time Series as Input
To demonstrate secondary sorting, let’s use time series data:
nametime
value x2
9 y2
5 x1
3 y1
7 y3
1 x3
6 z1
4 z2
8 z3
7 z4
0 p2
6 p4
7 p1
9 p6
0 p7
3
Expected Output
Our expected output is as follows. Note that the values of reducers are grouped by name and sorted by time:
name t1 t2 t3 t4 t5 ...x
=
>[
3, 9, 6]
y
=
>[
7, 5, 1]
z
=
>[
4, 8, 7, 0]
p
=
>[
9, 6, 7, 0, 3]
Option 1: Secondary Sorting in Memory
Since Spark has a very powerful and high-level API, I will present the entire solution in a single Java class. The Spark API is built upon the basic abstraction concept of the RDD (resilient distributed data set). To fully utilize Spark’s API, we have to understand RDDs. An RDD<T>
(i.e., an RDD of type T
) object represents an immutable, partitioned collection of elements (of type T
) that can be operated on in parallel. The RDD<T>
class contains the basic MapReduce operations available on all RDDs, such as map()
, filter()
, and persist()
, while the JavaPairRDD<K,V>
class contains MapReduce operations such as mapToPair()
, flatMapToPair()
, and groupByKey()
. In addition, Spark’s PairRDDFunctions
contains operations available only on RDDs of key-value pairs, such as reduce()
, groupByKey()
, and join()
. (For details on RDDs, see Spark’s API and Appendix B of this book.) Therefore, JavaRDD<T>
is a list of objects of type T
, and JavaPairRDD<K,V>
is a list of objects of type Tuple2<K,V>
(where each tuple represents a key-value pair).
The Spark-based algorithm is listed next. Although there are 10 steps, most of them are trivial and some are provided for debugging purposes only:
We import the required Java/Spark classes. The main Java classes for MapReduce are given in the
org.apache.spark.api.java
package. This package includes the following classes and interfaces:-
JavaRDDLike
(interface) -
JavaDoubleRDD
-
JavaPairRDD
-
JavaRDD
-
JavaSparkContext
-
StorageLevels
-
- We pass input data as arguments and validate.
- We connect to the Spark master by creating a
JavaSparkContext
object, which is used to create new RDDs. - Using the context object (created in step 3), we create an RDD for the input file; the resulting RDD will be a
JavaRDD<String>
. Each element of this RDD will be a record of time series data:<name><,><time><,><value>
. - Next we want to create key-value pairs from a
JavaRDD<String>
, where the key is thename
and the value is a pair of (time
,value
). The resulting RDD will be aJavaPairRDD<String, Tuple2<Integer, Integer>>
. - To validate step 5, we collect all values from the
JavaPairRDD<>
and print them. -
We group
JavaPairRDD<>
elements by the key (name
). To accomplish this, we use thegroupByKey()
method.The result will be the RDD:
JavaPairRDD
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>
Note that the resulting list (
Iterable<Tuple2<Integer, Integer>>
) is unsorted. In general, Spark’sreduceByKey()
is preferred overgroupByKey()
for performance reasons, but here we have no other option thangroupByKey()
(sincereduceByKey()
does not allow us to sort the values in place for a given key). - To validate step 7, we collect all values from the
JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>>
and print them. - We sort the reducer’s values to get the final output. We accomplish this by writing a custom
mapValues()
method. We just sort the values (the key remains the same). - To validate the final result, we collect all values from the sorted
JavaPairRDD<>
and print them.
A solution for option #1 is implemented by a single driver class: SecondarySorting
(see Example 1-7). All steps, 1–10, are listed inside the class definition, which will be presented in the following sections. Typically, a Spark application consists of a driver program that runs the user’s main()
function and executes various parallel operations on a cluster. Parallel operations will be achieved through the extensive use of RDDs. For further details on RDDs, see Appendix B.
Example 1-7. SecondarySort class overall structure
1
// Step 1: import required Java/Spark classes
2
public
class
SecondarySort
{
3
public
static
void
main
(
String
[]
args
)
throws
Exception
{
4
// Step 2: read input parameters and validate them
5
// Step 3: connect to the Spark master by creating a JavaSparkContext
6
// object (ctx)
6
// Step 4: use ctx to create JavaRDD<String>
7
// Step 5: create key-value pairs from JavaRDD<String>, where
8
// key is the {name} and value is a pair of (time, value)
9
// Step 6: validate step 5-collect all values from JavaPairRDD<>
10
// and print them
11
// Step 7: group JavaPairRDD<> elements by the key ({name})
12
// Step 8: validate step 7-collect all values from JavaPairRDD<>
13
// and print them
14
// Step 9: sort the reducer's values; this will give us the final output
15
// Step 10: validate step 9-collect all values from JavaPairRDD<>
16
// and print them
17
18
// done
19
ctx
.
close
();
20
System
.
exit
(
0
);
21
}
22
}
Step 1: Import required classes
As shown in Example 1-8, the main Spark package for the Java API is org.apache.spark.api.java
, which includes the JavaRDD
, JavaPairRDD
, and JavaSparkContext
classes. JavaSparkContext
is a factory class for creating new RDDs (such as JavaRDD
and JavaPairRDD
objects).
Example 1-8. Step 1: Import required classes
1
// Step 1: import required Java/Spark classes
2
import
scala.Tuple2
;
3
import
org.apache.spark.api.java.JavaRDD
;
4
import
org.apache.spark.api.java.JavaPairRDD
;
5
import
org.apache.spark.api.java.JavaSparkContext
;
6
import
org.apache.spark.api.java.function.Function
;
7
import
org.apache.spark.api.java.function.Function2
;
8
import
org.apache.spark.api.java.function.PairFunction
;
9
10
import
java.util.List
;
11
import
java.util.ArrayList
;
12
import
java.util.Map
;
13
import
java.util.Collections
;
14
import
java.util.Comparator
;
Step 2: Read input parameters
This step, demonstrated in Example 1-9, reads the HDFS input file (Spark may read data from HDFS and other persistent stores, such as a Linux filesystem), which might look like /dir1/dir2/myfile.txt.
Example 1-9. Step 2: Read input parameters
1
// Step 2: read input parameters and validate them
2
if
(
args
.
length
<
1
)
{
3
System
.
err
.
println
(
"Usage: SecondarySort <file>"
);
4
System
.
exit
(
1
);
5
}
6
String
inputPath
=
args
[
0
];
7
System
.
out
.
println
(
"args[0]: <file>="
+
args
[
0
]);
Step 3: Connect to the Spark master
To work with RDDs, first you need to create a JavaSparkContext
object (as shown in Example 1-10), which is a factory for creating JavaRDD
and JavaPairRDD
objects. It is also possible to create a JavaSparkContext
object by injecting a SparkConf
object into the JavaSparkContext
’s class constructor. This approach is useful when you read your cluster configurations from an XML file. In a nutshell, the JavaSparkContext
object has the following responsibilities:
-
Initializes the application driver.
-
Registers the application driver to the cluster manager. (If you are using the Spark cluster, then this will be the Spark master; if you are using YARN, then it will be YARN’s resource manager.)
-
Obtains a list of executors for executing your application driver.
Example 1-10. Step 3: Connect to the Spark master
1
// Step 3: connect to the Spark master by creating a JavaSparkContext object
2
final
JavaSparkContext
ctx
=
new
JavaSparkContext
();
Step 4: Use the JavaSparkContext to create a JavaRDD
This step, illustrated in Example 1-11, reads an HDFS file and creates a JavaRDD<String>
(which represents a set of records where each record is a String
object). By definition, Spark’s RDDs are immutable (i.e., they cannot be altered or modified). Note that Spark’s RDDs are the basic abstraction for parallel execution. Note also that you may use textFile()
to read HDFS or non-HDFS files.
Example 1-11. Step 4: Create JavaRDD
1
// Step 4: use ctx to create JavaRDD<String>
2
// input record format: <name><,><time><,><value>
3
JavaRDD
<
String
>
lines
=
ctx
.
textFile
(
inputPath
,
1
);
Step 5: Create key-value pairs from the JavaRDD
This step, shown in Example 1-12, implements a mapper. Each record (from the JavaRDD<String>
and consisting of <name><,><time><,><value>
) is converted to a key-value pair, where the key is a name
and the value is a Tuple2(time, value)
.
Example 1-12. Step 5: Create key-value pairs from JavaRDD
1
// Step 5: create key-value pairs from JavaRDD<String>, where
2
// key is the {name} and value is a pair of (time, value).
3
// The resulting RDD will be a JavaPairRDD<String, Tuple2<Integer, Integer>>.
4
// Convert each record into Tuple2(name, time, value).
5
// PairFunction<T, K, V>
6
// T => Tuple2(K, V) where T is input (as String),
7
// K=String
8
// V=Tuple2<Integer, Integer>
9
JavaPairRDD
<
String
,
Tuple2
<
Integer
,
Integer
>>
pairs
=
10
lines
.
mapToPair
(
new
PairFunction
<
11
String
,
// T
12
String
,
// K
13
Tuple2
<
Integer
,
Integer
>
// V
14
>()
{
15
public
Tuple2
<
String
,
Tuple2
<
Integer
,
Integer
>>
call
(
String
s
)
{
16
String
[]
tokens
=
s
.
split
(
","
);
// x,2,5
17
System
.
out
.
println
(
tokens
[
0
]
+
","
+
tokens
[
1
]
+
","
+
tokens
[
2
]);
18
Integer
time
=
new
Integer
(
tokens
[
1
]);
19
Integer
value
=
new
Integer
(
tokens
[
2
]);
20
Tuple2
<
Integer
,
Integer
>
timevalue
=
new
Tuple2
<
Integer
,
Integer
>(
time
,
value
);
21
return
new
Tuple2
<
String
,
Tuple2
<
Integer
,
Integer
>>(
tokens
[
0
],
timevalue
);
22
}
23
});
Step 6: Validate step 5
To debug and validate your steps in Spark (as shown in Example 1-13), you may use JavaRDD.collect()
and JavaPairRDD.collect()
. Note that collect()
is used for debugging and educational purposes (but avoid using collect()
for debugging purposes in production clusters; doing so will impact performance). Also, you may use JavaRDD.saveAsTextFile()
for debugging as well as creating your desired outputs.
Example 1-13. Step 6: Validate step 5
1
// Step 6: validate step 5-collect all values from JavaPairRDD<>
2
// and print them
3
List
<
Tuple2
<
String
,
Tuple2
<
Integer
,
Integer
>>>
output
=
pairs
.
collect
();
4
for
(
Tuple2
t
:
output
)
{
5
Tuple2
<
Integer
,
Integer
>
timevalue
=
(
Tuple2
<
Integer
,
Integer
>)
t
.
_2
;
6
System
.
out
.
println
(
t
.
_1
+
","
+
timevalue
.
_1
+
","
+
timevalue
.
_1
);
7
}
Step 7: Group JavaPairRDD elements by the key (name)
We implement the reducer operation using groupByKey()
. As you can see in Example 1-14, it is much easier to implement the reducer through Spark than MapReduce/Hadoop. Note that in Spark, in general, reduceByKey()
is more efficient than groupByKey()
. Here, however, we cannot use reduceByKey()
.
Example 1-14. Step 7: Group JavaPairRDD elements
1
// Step 7: group JavaPairRDD<> elements by the key ({name})
2
JavaPairRDD
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>
groups
=
3
pairs
.
groupByKey
();
Step 8: Validate step 7
This step, shown in Example 1-15, validates the previous step by using the collect()
function, which gets all values from the groups
RDD.
Example 1-15. Step 8: Validate step 7
1
// Step 8: validate step 7-we collect all values from JavaPairRDD<>
2
// and print them
2
System
.
out
.
println
(
"===DEBUG1==="
);
3
List
<
Tuple2
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>>
output2
=
4
groups
.
collect
();
5
for
(
Tuple2
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>
t
:
output2
)
{
6
Iterable
<
Tuple2
<
Integer
,
Integer
>>
list
=
t
.
_2
;
7
System
.
out
.
println
(
t
.
_1
);
8
for
(
Tuple2
<
Integer
,
Integer
>
t2
:
list
)
{
9
System
.
out
.
println
(
t2
.
_1
+
","
+
t2
.
_2
);
10
}
11
System
.
out
.
println
(
"====="
);
12
}
The following shows the output of this step. As you can see, the reducer values are not sorted:
y 2,5 1,7 3,1=====
x 2,9 1,3 3,6=====
z 1,4 2,8 3,7 4,0=====
p 2,6 4,7 6,0 7,3 1,9=====
Step 9: Sort the reducer’s values in memory
This step, shown in Example 1-16, uses another powerful Spark method, mapValues()
, to just sort the values generated by reducers. The mapValues()
method enables us to convert (K, V1) into (K, V2), where V2 is a sorted V1. One important note about Spark’s RDD is that it is immutable and cannot be altered/updated by any means. For example, in this step, to sort our values, we have to copy them into another list first. Immutability applies to the RDD itself and its elements.
Example 1-16. Step 9: sort the reducer’s values in memory
1
// Step 9: sort the reducer's values; this will give us the final output.
2
// Option #1: worked
3
// mapValues[U](f: (V) => U): JavaPairRDD[K, U]
4
// Pass each value in the key-value pair RDD through a map function
5
// without changing the keys;
6
// this also retains the original RDD's partitioning.
7
JavaPairRDD
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>
sorted
=
8
groups
.
mapValues
(
9
new
Function
<
Iterable
<
Tuple2
<
Integer
,
Integer
>>,
// input
10
Iterable
<
Tuple2
<
Integer
,
Integer
>>
// output
11
>()
{
12
public
Iterable
<
Tuple2
<
Integer
,
Integer
>>
call
(
Iterable
<
Tuple2
<
Integer
,
13
Integer
>>
s
)
{
14
List
<
Tuple2
<
Integer
,
Integer
>>
newList
=
new
ArrayList
<
Tuple2
<
Integer
,
15
Integer
>>(
s
);
16
Collections
.
sort
(
newList
,
new
TupleComparator
());
17
return
newList
;
18
}
19
});
Step 10: output final result
The collect()
method collects all of the RDD’s elements into a java.util.List
object. Then we iterate through the List
to get all the final elements (see Example 1-17).
Example 1-17. Step 10: Output final result
1
// Step 10: validate step 9-collect all values from JavaPairRDD<>
2
// and print them
3
System
.
out
.
println
(
"===DEBUG2="
);
4
List
<
Tuple2
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>>
output3
=
5
sorted
.
collect
();
6
for
(
Tuple2
<
String
,
Iterable
<
Tuple2
<
Integer
,
Integer
>>>
t
:
output3
)
{
7
Iterable
<
Tuple2
<
Integer
,
Integer
>>
list
=
t
.
_2
;
8
System
.
out
.
println
(
t
.
_1
);
9
for
(
Tuple2
<
Integer
,
Integer
>
t2
:
list
)
{
10
System
.
out
.
println
(
t2
.
_1
+
","
+
t2
.
_2
);
11
}
12
System
.
out
.
println
(
"====="
);
13
}
Spark Sample Run
As far as Spark/Hadoop is concerned, you can run a Spark application in three different modes:1
- Standalone mode
- This is the default setup. You start the Spark master on a master node and a “worker” on every slave node, and submit your Spark application to the Spark master.
- YARN client mode
- In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the client Spark process that submits the application.
- YARN cluster mode
- In this mode, you do not start a Spark master or worker nodes. Instead, you submit the Spark application to YARN, which runs the Spark driver in the ApplicationMaster in YARN.
Next, we will cover how to submit the secondary sort application in the standalone and YARN cluster modes.
Running Spark in standalone mode
The following subsections provide the input, script, and log output of a sample run of our secondary sort application in Spark’s standalone mode.
HDFS input
# hadoop fs -cat /mp/timeseries.txt
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,2,6
p,4,7
p,1,9
p,6,0
p,7,3
The script
# cat run_secondarysorting.sh
#!/bin/bash
export
JAVA_HOME
=
/usr/java/jdk7export
SPARK_HOME
=
/home/hadoop/spark-1.1.0export
SPARK_MASTER
=
spark://myserver100:7077BOOK_HOME
=
/home/mp/data-algorithms-bookAPP_JAR
=
$BOOK_HOME
/dist/data_algorithms_book.jarINPUT
=
/home/hadoop/testspark/timeseries.txt# Run on a Spark standalone cluster
prog
=
org.dataalgorithms.chap01.spark.SparkSecondarySort$SPARK_HOME
/bin/spark-submit\
--class$prog
\
--master$SPARK_MASTER
\
--executor-memory 2G\
--total-executor-cores20
\
$APP_JAR
\
$INPUT
Log of the run
# ./run_secondarysorting.sh
args[
0]
: <file>=
/mp/timeseries.txt ...===
DEBUG STEP5
===
... x,2,2 y,2,2 x,1,1 y,1,1 y,3,3 x,3,3 z,1,1 z,2,2 z,3,3 z,4,4 p,2,2 p,4,4 p,1,1 p,6,6 p,7,7===
DEBUG STEP7
===
14/06/04 08:42:54 INFO spark.SparkContext: Starting job: collect at SecondarySort.java:96 14/06/04 08:42:54 INFO scheduler.DAGScheduler: Registering RDD 2(
mapToPair at SecondarySort.java:75)
... 14/06/04 08:42:55 INFO scheduler.DAGScheduler: Stage 1(
collect at SecondarySort.java:96)
finished in 0.273 s 14/06/04 08:42:55 INFO spark.SparkContext: Job finished: collect at SecondarySort.java:96, took 1.587001929 s z 1,4 2,8 3,7 4,0=====
p 2,6 4,7 1,9 6,0 7,3=====
x 2,9 1,3 3,6=====
y 2,5 1,7 3,1=====
===
DEBUG STEP9
===
14/06/04 08:42:55 INFO spark.SparkContext: Starting job: collect at SecondarySort.java:158 ... 14/06/04 08:42:55 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 14/06/04 08:42:55 INFO spark.SparkContext: Job finished: collect at SecondarySort.java:158, took 0.074271723 s z 1,4 2,8 3,7 4,0=====
p 1,9 2,6 4,7 6,0 7,3=====
x 1,3 2,9 3,6=====
y 1,7 2,5 3,1=====
Typically, you save the final result to HDFS. You can accomplish this by adding the following line of code after creating your “sorted” RDD:
sorted
.
saveAsTextFile
(
"/mp/output"
);
Then you may view the output as follows:
# hadoop fs -ls /mp/output/
Found2
items -rw-r--r--3
hadoop root,hadoop0
2014-06-04 10:49 /mp/output/_SUCCESS -rw-r--r--3
hadoop root,hadoop125
2014-06-04 10:49 /mp/output/part-00000# hadoop fs -cat /mp/output/part-00000
(
z,[(
1,4)
,(
2,8)
,(
3,7)
,(
4,0)])
(
p,[(
1,9)
,(
2,6)
,(
4,7)
,(
6,0)
,(
7,3)])
(
x,[(
1,3)
,(
2,9)
,(
3,6)])
(
y,[(
1,7)
,(
2,5)
,(
3,1)])
Running Spark in YARN cluster mode
The script to submit our Spark application in YARN cluster mode is as follows:
# cat run_secondarysorting_yarn.sh
#!/bin/bash
export
JAVA_HOME
=
/usr/java/jdk7export
HADOOP_HOME
=
/usr/local/hadoop-2.5.0export
HADOOP_CONF_DIR
=
$HADOOP_HOME
/etc/hadoopexport
YARN_CONF_DIR
=
$HADOOP_HOME
/etc/hadoopexport
SPARK_HOME
=
/home/hadoop/spark-1.1.0BOOK_HOME
=
/home/mp/data-algorithms-bookAPP_JAR
=
$BOOK_HOME
/dist/data_algorithms_book.jarINPUT
=
/mp/timeseries.txtprog
=
org.dataalgorithms.chap01.spark.SparkSecondarySort$SPARK_HOME
/bin/spark-submit\
--class$prog
\
--master yarn-cluster\
--executor-memory 2G\
--num-executors10
\
$APP_JAR
\
$INPUT
Option #2: Secondary Sorting Using the Spark Framework
In the solution for option #1, we sorted reducer values in memory (using Java’s Collections.sort()
method), which might not scale if the reducer values will not fit in a commodity server’s memory. Next we will implement option #2 for the MapReduce/Hadoop framework. We cannot achieve this in the current Spark (Spark-1.1.0) framework, because currently Spark’s shuffle is based on a hash, which is different from MapReduce’s sort-based shuffle. So, you should implement sorting explicitly using an RDD operator. If we had a partitioner by a natural key (name
) that preserved the order of the RDD, that would be a viable solution—for example, if we sorted by (name, time)
, we would get:
(
p,1)
,(
1,9)
(
p,4)
,(
4,7)
(
p,6)
,(
6,0)
(
p,7)
,(
7,3)
(
x,1)
,(
1,3)
(
x,2)
,(
2,9)
(
x,3)
,(
3,6)
(
y,1)
,(
1,7)
(
y,2)
,(
2,5)
(
y,3)
,(
3,1)
(
z,1)
,(
1,4)
(
z,2)
,(
2,8)
(
z,3)
,(
3,7)
(
z,4)
,(
4,0)
There is a partitioner (represented as an abstract class, org.apache.spark.Partitioner
), but it does not preserve the order of the original RDD elements. Therefore, option #2 cannot be implemented by the current version of Spark (1.1.0).
Further Reading on Secondary Sorting
To support secondary sorting in Spark, you may extend the JavaPairRDD
class and add additional methods such as groupByKeyAndSortValues()
. For further work on this topic, you may refer to the following:
- Support sorting of values in addition to keys (i.e., secondary sort)
- https://github.com/tresata/spark-sorted
Chapter 2 provides a detailed implementation of the Secondary Sort design pattern using the MapReduce and Spark frameworks.
1 For details, see the Spark documentation.
Get Data Algorithms now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.