Chapter 4. Left Outer Join
This chapter shows you how to implement a left outer join in the MapReduce environment. I provide three distinct implementations in MapReduce/Hadoop and Spark:
-
MapReduce/Hadoop solution using the classic
map()
andreduce()
functions -
Spark solution without using the built-in
JavaPairRDD.leftOuterJoin()
-
Spark solution using the built-in
JavaPairRDD.leftOuterJoin()
Left Outer Join Example
Consider a company such as Amazon, which has over 200 million users and can do hundreds of millions of transactions per day. To understand the concept of a left outer join, assume we have two types of data: users and transactions. The users data consists of users’ location information (say, location_id
) and the transactions data includes user identity information (say, user_id
), but no direct information about a user’s location. Given users
and transactions
, then:
users
(
user_id
,
location_id
)
transactions
(
transaction_id
,
product_id
,
user_id
,
quantity
,
amount
)
our goal is to find the number of unique locations in which each product has been sold.
But what exactly is a left outer join? Let T1 (a left table) and T2 (a right table) be two relations defined as follows (where t1 is attributes of T1 and t2 is attributes of T2):
T1 = (K, t1)
T2 = (K, t2)
The result of a left outer join for relations T1 and T2 on the join key of K contains all records of the left table (T1), even if the join condition does not find any matching record in the right table (T2). If the ON
clause with key K matches zero records in T2 (for a given record in T1), the join will still return a row in the result (for that record), but with NULL
in each column from T2. A left outer join returns all the values from an inner join plus all values in the left table that do not match to those of the right table. Formally, we can express this as:
LeftOuterJoin(T1, T2, K) | = | {(k, t1, t2) where k ∊ T1.K and k ∊ T2.K} |
∪ | ||
{(k, t1, null) where k ∊ T1.K and k ∉ T2.K} |
In SQL, we can express a left outer join as (where K is the column on which T1 and T2 are joined):
SELECT
field_1
,
field_2
,
...
FROM
T1
LEFT
OUTER
JOIN
T2
ON
T1
.
K
=
T2
.
K
;
A left outer join is visually expressed in Figure 4-1 (the colored part is included and the white part is excluded).
Consider the values in Tables 4-1 and 4-2 for our users and transactions (note that these values are just examples to demonstrate the concept of a left outer join in the MapReduce environment).
user_id | location_id |
---|---|
u1 | UT |
u2 | GA |
u3 | CA |
u4 | CA |
u5 | GA |
transaction_id | product_id | user_id | quantity | amount |
---|---|---|---|---|
t1 | p3 | u1 | 1 | 300 |
t2 | p1 | u2 | 1 | 100 |
t3 | p1 | u1 | 1 | 100 |
t4 | p2 | u2 | 1 | 10 |
t5 | p4 | u4 | 1 | 9 |
t6 | p1 | u1 | 1 | 100 |
t7 | p4 | u1 | 1 | 9 |
t8 | p4 | u5 | 2 | 40 |
Example Queries
Here are some example SQL queries relating to our left outer join:
-
Query 1: find all products sold (and their associated locations):
mysql
>
SELECT
product_id
,
location_id
->
FROM
transactions
LEFT
OUTER
JOIN
users
->
ON
transactions
.
user_id
=
users
.
user_id
;
+
------------+-------------+
|
product_id
|
location_id
|
+
------------+-------------+
|
p3
|
UT
|
|
p1
|
GA
|
|
p1
|
UT
|
|
p2
|
GA
|
|
p4
|
CA
|
|
p1
|
UT
|
|
p4
|
UT
|
|
p4
|
GA
|
+
------------+-------------+
8
rows
in
set
(
0
.
00
sec
)
-
Query 2: find all products sold (and their associated location counts):
mysql
>
SELECT
product_id
,
count
(
location_id
)
->
FROM
transactions
LEFT
OUTER
JOIN
users
->
ON
transactions
.
user_id
=
users
.
user_id
->
group
by
product_id
;
+
------------+--------------------+
|
product_id
|
count
(
location_id
)
|
+
------------+--------------------+
|
p1
|
3
|
|
p2
|
1
|
|
p3
|
1
|
|
p4
|
3
|
+
------------+--------------------+
4
rows
in
set
(
0
.
00
sec
)
-
Query 3: find all products sold (and their unique location counts):
mysql
>
SELECT
product_id
,
count
(
distinct
location_id
)
->
FROM
transactions
LEFT
OUTER
JOIN
users
->
ON
transactions
.
user_id
=
users
.
user_id
->
group
by
product_id
;
+
------------+-----------------------------+
|
product_id
|
count
(
distinct
location_id
)
|
+
------------+-----------------------------+
|
p1
|
2
|
|
p2
|
1
|
|
p3
|
1
|
|
p4
|
3
|
+
------------+-----------------------------+
4
rows
in
set
(
0
.
00
sec
)
Implementation of Left Outer Join in MapReduce
Our desired output is provided in the preceding section by SQL query 3, which finds all distinct (unique) locations in which each product has been sold given all transactions. We present our solution for the left outer join problem in two phases:
-
MapReduce phase 1: find all products sold (and their associated locations). We accomplish this using SQL query 1 from the previous section.
-
MapReduce phase 2: find all products sold (and their associated unique location counts). We accomplish this using SQL query 3 from the previous section.
MapReduce Phase 1: Finding Product Locations
This phase will perform the left outer join operation with a MapReduce job, which will utilize two mappers (one for users and the other for transactions) and whose reducer will emit a key-value pair with the key being product_id
, and the value being location_id
. Using multiple mappers is enabled by the MultipleInputs
class (note that if we had a single mapper, we would have used Job.setMapperClass()
instead):
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs
;
...
Job
job
=
new
Job
(...);
...
Path
transactions
=
<
hdfs
-
path
-
to
-
transactions
-
data
>;
Path
users
=
<
hdfs
-
path
-
to
-
users
-
data
>;
MultipleInputs
.
addInputPath
(
job
,
transactions
,
TextInputFormat
.
class
,
TransactionMapper
.
class
);
MultipleInputs
.
addInputPath
(
job
,
users
,
TextInputFormat
.
class
,
UserMapper
.
class
);
Figures 4-2 and 4-3 illustrate the working MapReduce flow of the Left Outer Join algorithm, consisting of the two MapReduce jobs (phases 1 and 2).
The core pieces of the left outer join data flow are as follows:
- Transaction mapper
- The transaction
map()
reads(transaction_id, product_id, user_id, quantity, amount)
and emits a key-value pair composed of(user_id,
product_id
)
. - User mapper
- The user
map()
reads(user_id, location_id)
and emits a key-value pair composed of(user_id, location_id)
.
The reducer for phase 1 gets both the user’s location_id
and product_id
and emits (product_id, location_id)
. Now, the question is how the reducer will distinguish location_id
from product_id
. In Hadoop, the order of reducer values is undefined. Therefore, the reducer for a specific key (user_id)
has no clue how to process the values. To remedy this problem we modify the transaction and user mappers/reducers (which we will call version 2):
- Transaction mapper (version 2)
- As shown in Example 4-1, the transaction
map()
reads(transaction_id, product_id, user_id, quantity, amount)
and emits the key pair(user_id, 2)
and the value pair("P", product_id)
. By adding a “2” to the reducer key, we guarantee thatproduct_id
(s) arrive at the end. This will be accomplished through the secondary sorting technique described in Chapters 1 and 2. We added “P” to the value to identify products. In Hadoop, to implementPair(String, String)
, we will use thePairOfStrings
class.1
Example 4-1. Transaction mapper (version 2)
1
/**
2 * @param key is framework generated, ignored here
3 * @param value is the
4 * transaction_id<TAB>product_id<TAB>user_id<TAB>quantity<TAB>amount
5 */
6
map
(
key
,
value
)
{
7
String
[]
tokens
=
StringUtil
.
split
(
value
,
"\t"
);
8
String
productID
=
tokens
[
1
];
9
String
userID
=
tokens
[
2
];
10
outputKey
=
Pair
(
userID
,
2
);
11
outputValue
=
Pair
(
"P"
,
productID
);
12
emit
(
outputKey
,
outputValue
);
13
}
- User mapper (version 2)
- As shown in Example 4-2, the user
map()
reads(user_id, location_id)
and emits the key pair(user_id, 1)
and the value pair("L", location_id)
. By adding a “1” to the reducer key, we guarantee thatlocation_id
(s) arrive first. This will be accomplished through the secondary sorting technique described in Chapters 1 and 2. We added “L” to the value to identify locations.
Example 4-2. User mapper (version 2)
1
/**
2 * @param key is framework generated, ignored here
3 * @param value is the user_id<TAB>location_id
4 */
5
map
(
key
,
value
)
{
6
String
[]
tokens
=
StringUtil
.
split
(
value
,
"\t"
);
7
String
userID
=
tokens
[
0
];
8
String
locationID
=
tokens
[
1
];
9
outputKey
=
Pair
(
userID
,
1
);
// make sure location shows before products
10
outputValue
=
Pair
(
"L"
,
locationID
);
11
emit
(
outputKey
,
outputValue
);
12
}
As shown in Example 4-3, the reducer for phase 1 (version 2) gets both the ("L", location_id)
and ("P", product_id
) pairs and emits a key-value pair of (product_id, location_id)
. Note that since 1 < 2, this means that the user’s location_id
arrives first.
Example 4-3. The reducer for phase 1 (version 2)
1
/**
2 * @param key is user_id
3 * @param values is List<Pair<left, right>>, where
4 * values = List<{
5 * Pair<"L", locationID>,
6 * Pair<"P", productID1>,
7 * Pair<"P", productID2>,
8 * ...
9 * }
10 * NOTE that the Pair<"L", locationID> arrives
11 * before all product pairs. The first value is location;
12 * if it's not, then we don't have a user record, so we'll
13 * set the locationID as "undefined".
14 */
15
reduce
(
key
,
values
)
{
16
locationID
=
"undefined"
;
17
for
(
Pair
<
left
,
right
>
value:
values
)
{
18
// the following if-stmt will be true
19
// once at the first iteration
20
if
(
value
.
left
.
equals
(
"L"
))
{
21
locationID
=
value
.
right
;
22
continue
;
23
}
24
25
// here we have a product: value.left.equals("P")
26
productID
=
value
.
right
;
27
emit
(
productID
,
locationID
);
28
}
29
}
MapReduce Phase 2: Counting Unique Locations
This phase will use the output of phase 1 (which is a sequence of pairs of (product_id,
location_id)
and generate pairs of (product_id, number_of_unique_locations)
. The mapper for this phase is an identity mapper (Example 4-4), and the reducer will count the number of unique locations (by using a Set
data structure) per product (Example 4-5).
Example 4-4. Mapper phase 2: counting unique locations
1
/**
2 * @param key is product_id
3 * @param value is location_id
4 */
5
map
(
key
,
value
)
{
6
emit
(
key
,
value
);
7
}
Example 4-5. Reducer phase 2: counting unique locations
1
/**
2 * @param key is product_id
3 * @param value is List<location_id>
4 */
5
reduce
(
key
,
values
)
{
6
Set
<
String
>
set
=
new
HashSet
<
String
>();
7
for
(
String
locationID
:
values
)
{
8
set
.
add
(
locationID
);
9
}
10
11
int
uniqueLocationsCount
=
set
.
size
();
12
emit
(
key
,
uniqueLocationsCount
);
13
}
Implementation Classes in Hadoop
The classes shown in Table 4-3 implement both phases of the Left Outer Join design pattern using the MapReduce/Hadoop framework.
Phase | Class name | Class description |
---|---|---|
Phase 1 | LeftJoinDriver LeftJoinReducer LeftJoinTransactionMapper LeftJoinUserMapper SecondarySortPartitioner SecondarySortGroupComparator |
Driver to submit job for phase 1 Left join reducer Left join transaction mapper Left join user mapper How to partition natural keys How to group by natural key |
Phase 2 | LocationCountDriver LocationCountMapper LocationCountReducer |
Driver to submit job for phase 2 Define map() for location countDefine reduce() for location count |
Sample Run
Running phase 1
#
./
run_phase1_left_join
.
sh
...
13
/
12
/
29
21
:
17
:
48
INFO
input
.
FileInputFormat
:
Total
input
paths
to
process
:
1
...
13
/
12
/
29
21
:
17
:
48
INFO
input
.
FileInputFormat
:
Total
input
paths
to
process
:
1
13
/
12
/
29
21
:
17
:
49
INFO
mapred
.
JobClient
:
Running
job:
job_201312291929_0004
13
/
12
/
29
21
:
17
:
50
INFO
mapred
.
JobClient
:
map
0
%
reduce
0
%
...
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
map
100
%
reduce
100
%
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Job
complete:
job_201312291929_0004
...
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Map
-
Reduce
Framework
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Map
input
records
=
13
...
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Reduce
input
records
=
13
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Reduce
input
groups
=
5
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Combine
output
records
=
0
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Reduce
output
records
=
8
13
/
12
/
29
21
:
18
:
41
INFO
mapred
.
JobClient
:
Map
output
records
=
13
Output of phase 1 (input for phase 2)
#
hadoop
fs
-
text
/
left_join
/
zbook
/
output
/
part
*
p4
GA
p3
UT
p1
UT
p1
UT
p4
UT
p1
GA
p2
GA
p4
CA
Running phase 2
#
./
run_phase2_location_count
.
sh
...
13
/
12
/
29
21
:
19
:
28
INFO
input
.
FileInputFormat
:
Total
input
paths
to
process
:
10
13
/
12
/
29
21
:
19
:
28
INFO
mapred
.
JobClient
:
Running
job:
job_201312291929_0005
13
/
12
/
29
21
:
19
:
29
INFO
mapred
.
JobClient
:
map
0
%
reduce
0
%
...
13
/
12
/
29
21
:
20
:
24
INFO
mapred
.
JobClient
:
map
100
%
reduce
100
%
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Job
complete:
job_201312291929_0005
...
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Map
-
Reduce
Framework
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Map
input
records
=
8
...
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Reduce
input
records
=
8
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Reduce
input
groups
=
4
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Combine
output
records
=
0
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Reduce
output
records
=
4
13
/
12
/
29
21
:
20
:
25
INFO
mapred
.
JobClient
:
Map
output
records
=
8
Output of phase 2
#
hadoop
fs
-
cat
/
left_join
/
zbook
/
output2
/
part
*
p1
2
p2
1
p3
1
p4
3
Spark Implementation of Left Outer Join
Since Spark provides a higher-level Java API than the MapReduce/Hadoop API, I will present the whole solution in a single Java class (called LeftOuterJoin
), which will include a series of map()
, groupByKey()
, and reduce()
functions. In the MapReduce/Hadoop implementation we used the MultipleInputs
class to process two different types of input by two different mappers. As you’ve learned, Spark provides a much richer API for mappers and reducers. Without needing special plug-in classes, you can have many different types of mappers (by using the map()
, flatMap()
, and flatMapToPair()
functions). In Spark, instead of using Hadoop’s MultipleInputs
class, we will use the JavaRDD.union()
function to return the union of two JavaRDD
s (a users RDD and a transactions RDD), which will be merged to create a new RDD. The JavaRDD.union()
function is defined as:
JavaRDD<T> union(JavaRDD<T> other) JavaPairRDD<T> union(JavaPairRDD<T> other) Description: Return the union of this JavaRDD and another one. Any identical elements will appear multiple times (use .distinct() to eliminate them).
You can only apply the union()
function to JavaRDD
s of the same type (T
). Therefore, we will create the same RDD type for users and transactions. This is how we do so:
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
usersRDD
=
users
.
map
(...);
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
transactionsRDD
=
transactions
.
map
(...);
// here we perform a union() on usersRDD and transactionsRDD
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
allRDD
=
transactionsRDD
.
union
(
usersRDD
);
The union()
workflow is illustrated in Figure 4-4.
To refresh your memory, here is the data for users and transactions (this data will be used as input files for our sample run at the end of this chapter):
# hadoop fs -cat /data/leftouterjoins/users.txt u1 UT u2 GA u3 CA u4 CA u5 GA # hadoop fs -cat /data/leftouterjoins/transactions.txt t1 p3 u1 3 330
t2
p1
u2
1
400
t3
p1
u1
3
600
t4
p2
u2
10
1000
t5
p4
u4
9
90
t6
p1
u1
4
120
t7
p4
u1
8
160
t8
p4
u5
2
40
Let’s see how the algorithm works. For the users and transactions data, we generate:
users
=>
(
userID
,
T2
(
"L"
,
location
))
transactions
=>
(
userID
,
T2
(
"P"
,
product
))
(Here, T2
stands for Tuple2
.)
Next, we create a union of these two sets of data:
all
=
transactions
.
union
(
users
);
=
{
(
userID1
,
T2
(
"L"
,
location
)),
(
userID1
,
T2
(
"P"
,
P11
)),
(
userID1
,
T2
(
"P"
,
P12
)),
...
(
userID1
,
T2
(
"P"
,
P1n
)),
...
}
where Pij
is a product ID.
The next step is to group the data by userID
. This will generate:
{
(
userID1
,
List
<
T2
(
"L"
,
L1
),
T2
(
"P"
,
P11
),
T2
(
"P"
,
P12
),
T2
(
"P"
,
P13
),
...>),
(
userID2
,
List
<
T2
(
"L"
,
L2
),
T2
(
"P"
,
P21
),
T2
(
"P"
,
P22
),
T2
(
"P"
,
P23
),
...>),
...
}
where Li
is a locationID
, and Pij
is a product ID.
Spark Program
First, I will provide a high-level solution in 11 steps (shown in Example 4-6), and then we will dissect each step with a proper working Spark code example.
Example 4-6. LeftOuterJoin high-level solution
1
// Step 1: import required classes and interfaces
2
public
class
LeftOuterJoin
{
3
public
static
void
main
(
String
[]
args
)
throws
Exception
{
4
// Step 2: read input parameters
5
// Step 3: create a JavaSparkContext object
6
// Step 4: create a JavaRDD for users
7
// Step 5: create a JavaRDD for transactions
8
// Step 6: create a union of the RDDs created in step 4 and step 5
9
// Step 7: create a JavaPairRDD(userID, List<T2>) by calling groupByKey()
10
// Step 8: create a productLocationsRDD as JavaPairRDD<String,String>
11
// Step 9: find all locations for a product;
12
// result will be JavaPairRDD<String, List<String>>
13
// Step 10: finalize output by changing "value" from List<String>
14
// to Tuple2<Set<String>, Integer>, where you have a unique
15
// set of locations and their count
16
// Step 11: print the final result RDD
17
System
.
exit
(
0
);
18
}
19
}
Step 1: Import required classes
We first import the required classes and interfaces from the JAR files provided by the binary distributions of the Spark framework. Spark provides two Java packages (org.apache.spark.api.java
and org.apache.spark.api.java.function
) for creating and manipulating RDDs. Example 4-7 demonstrates this step.
Example 4-7. Step 1: import required classes and interfaces
1
// Step 1: import required classes and interfaces
2
import
scala.Tuple2
;
3
import
org.apache.spark.api.java.JavaRDD
;
4
import
org.apache.spark.api.java.JavaPairRDD
;
5
import
org.apache.spark.api.java.JavaSparkContext
;
6
import
org.apache.spark.api.java.function.Function
;
7
import
org.apache.spark.api.java.function.PairFlatMapFunction
;
8
import
org.apache.spark.api.java.function.FlatMapFunction
;
9
import
org.apache.spark.api.java.function.PairFunction
;
10
11
import
java.util.Set
;
12
import
java.util.HashSet
;
13
import
java.util.Arrays
;
14
import
java.util.List
;
15
import
java.util.ArrayList
;
16
import
java.util.Collections
;
Step 2: Read input parameters
As shown in Example 4-8, next we read two input parameters: users data and the transactions data. The users and transactions data are provided as HDFS text files.
Example 4-8. Step 2: read input parameters
1
// Step 2: read input parameters
2
if
(
args
.
length
<
2
)
{
3
System
.
err
.
println
(
"Usage: LeftOuterJoin <users> <transactions>"
);
4
System
.
exit
(
1
);
5
}
6
String
usersInputFile
=
args
[
0
];
// HDFS text file
7
String
transactionsInputFile
=
args
[
1
];
// HDFS text file
8
System
.
out
.
println
(
"users="
+
usersInputFile
);
9
System
.
out
.
println
(
"transactions="
+
transactionsInputFile
);
The output of this step is:
users
=/
data
/
leftouterjoins
/
users
.
txt
transactions
=/
data
/
leftouterjoins
/
transactions
.
txt
Step 3: Create a JavaSparkContext object
As shown in Example 4-9, next we create a JavaSparkContext
object. This object is used to create the first RDD.
Example 4-9. Step 3: create a JavaSparkContext object
1
// Step 3: create a JavaSparkContext object
2
JavaSparkContext
ctx
=
new
JavaSparkContext
();
Step 4: Create a JavaRDD for users
In this step, shown in Example 4-10, we create a users JavaRDD<String>
, where the RDD element is a single record of the text file (representing userID
and locationID
). Next, we use the JavaRDD<String>.mapToPair()
function to create a new JavaPairRDD<String,Tuple2<String,String>>
, where the key is a userID
and the value is a Tuple2("L", location)
. Later we will create Tuple2("P", product)
pairs for our transactions data. The tags "L"
and "P"
identify locations and products, respectively.
Example 4-10. Step 4: create a JavaRDD for users
1
// Step 4: create a JavaRDD for users
2
JavaRDD
<
String
>
users
=
ctx
.
textFile
(
usersInputFile
,
1
);
3
// <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
4
// Return a new RDD by applying a function to all elements of this RDD.
5
// PairFunction<T, K, V> where T => Tuple2<K, V>
6
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
usersRDD
=
7
users
.
mapToPair
(
new
PairFunction
<
8
String
,
// T
9
String
,
// K
10
Tuple2
<
String
,
String
>
// V
11
>()
{
12
public
Tuple2
<
String
,
Tuple2
<
String
,
String
>>
call
(
Strings
)
{
13
String
[]
userRecord
=
s
.
split
(
"\t"
);
14
Tuple2
<
String
,
String
>
location
=
15
new
Tuple2
<
String
,
String
>(
"L"
,
userRecord
[
1
]);
16
return
new
Tuple2
<
String
,
Tuple2
<
String
,
String
>>(
userRecord
[
0
],
location
);
17
}
18
});
Step 5: Create a JavaRDD for transactions
In this step, shown in Example 4-11, we create a transactions JavaRDD<String>
, where the RDD element is a single record of the text file (representing a transaction record). Next, we use the JavaRDD<String>.mapToPair()
function to create a new JavaPairRDD<String,Tuple2<String,String>>
, where the key is a userID
and the value is a Tuple2("P", product)
. In the previous step, we created Tuple2("L",
location)
pairs for users. The tags "L"
and "P"
identify locations and products, respectively.
Example 4-11. Step 5: create a JavaRDD for transactions
1
// Step 5: create a JavaRDD for transactions
2
JavaRDD
<
String
>
transactions
=
ctx
.
textFile
(
transactionsInputFile
,
1
);
3
4
// mapToPair
5
// <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
6
// Return a new RDD by applying a function to all elements of this RDD.
7
// PairFunction<T, K, V>
8
// T => Tuple2<K, V>
9
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
transactionsRDD
=
10
transactions
.
mapToPair
(
new
PairFunction
<
11
String
,
// T
12
String
,
// K
13
Tuple2
<
String
,
String
>
// V
14
>()
{
15
public
Tuple2
<
String
,
Tuple2
<
String
,
String
>>
call
(
String
s
)
{
16
String
[]
transactionRecord
=
s
.
split
(
"\t"
);
17
Tuple2
<
String
,
String
>
product
=
18
new
Tuple2
<
String
,
String
>(
"P"
,
transactionRecord
[
1
]);
19
return
new
Tuple2
<
String
,
Tuple2
<
String
,
String
>>(
transactionRecord
[
2
],
20
product
);
21
}
22
});
Step 6: Create a union of the RDDs created in steps 4 and 5
This step, shown in Example 4-12, creates a union of two instances of JavaPairRDD<String,Tuple2<String,String>>
. The JavaPairRDD.union()
method requires both RDDs to have the same exact types.
Example 4-12. Step 6: create a union of RDDs
1
// Step 6: create a union of the RDDs created in step 4 and step 5
2
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
allRDD
=
3
transactionsRDD
.
union
(
usersRDD
);
Example 4-13 shows a semantically equivalent implementation of this step; we’ve simply changed the order of the union parameters.
Example 4-13. Step 6: Create a union of RDDs (alternative implementation)
1
// Here we perform a union() on usersRDD and transactionsRDD.
2
JavaPairRDD
<
String
,
Tuple2
<
String
,
String
>>
allRDD
=
3
usersRDD
.
union
(
transactionsRDD
);
The result of the union of the two JavaPairRDD
s is the following key-value pairs:
{
(
userID
,
Tuple2
(
"L"
,
location
)),
...
(
userID
,
Tuple2
(
"P"
,
product
))
...
}
Step 7: Create a JavaPairRDD(userID, List(T2)) by calling groupByKey()
Next, we group our data (created in step 6) by userID
. As you can see in Example 4-14, this step is accomplished by JavaPairRDD.groupByKey()
.
Example 4-14. Step 7: create a JavaPairRDD
1
// Step 7: create a JavaPairRDD (userID, List<T2>) by calling groupBy()
1
// group allRDD by userID
2
JavaPairRDD
<
String
,
Iterable
<
Tuple2
<
String
,
String
>>>
groupedRDD
=
3
allRDD
.
groupByKey
();
4
// now the groupedRDD entries will be as follows:
5
// <userIDi, List[T2("L", location),
6
// T2("P", Pi1),
7
// T2("P", Pi2),
8
// T2("P", Pi3), ...
9
// ]
10
// >
The result of this step is:
(
userID1
,
List
[
T2
(
"L"
,
location1
),
T2
(
"P"
,
P11
),
T2
(
"P"
,
P12
),
T2
(
"P"
,
P13
),
...]),
(
userID2
,
List
[
T2
(
"L"
,
location2
),
T2
(
"P"
,
P21
),
T2
(
"P"
,
P22
),
T2
(
"P"
,
P23
),
...]),
...
where Pij
is a productID
.
Step 8: Create a productLocationsRDD as a JavaPairRDD(String,String)
In this step, the userID
s are dropped from the RDDs. For a given RDD element:
(
userID
,
List
[
T2
(
"L"
,
location
),
T2
(
"P"
,
p1
),
T2
(
"P"
,
p2
),
T2
(
"P"
,
p3
),
...])
we create a JavaPairRDD<String,String>
as:
(
p1
,
location
)
(
p2
,
location
)
(
p3
,
location
)
...
This step is accomplished by the JavaPairRDD.flatMapToPair()
function, which we implement as a PairFlatMapFunction.call()
method. PairFlatMapFunction
works as follows:
PairFlatMapFunction
<
T
,
K
,
V
>
T
=>
Iterable
<
Tuple2
<
K
,
V
>>
where
in
our
example:
T
is
an
input
and
we
create
(
K
,
V
)
pairs
as
output:
T
=
Tuple2
<
String
,
Iterable
<
Tuple2
<
String
,
String
>>>
K
=
String
V
=
String
Example 4-15 shows the complete implementation of the PairFlatMapFunction.call()
method.
Example 4-15. Step 8: create a productLocationsRDD
1
// Step 8: create a productLocationsRDD as JavaPairRDD<String,String>
2
// PairFlatMapFunction<T, K, V>
3
// T => Iterable<Tuple2<K, V>>
4
JavaPairRDD
<
String
,
String
>
productLocationsRDD
=
5
groupedRDD
.
flatMapToPair
(
new
PairFlatMapFunction
<
6
Tuple2
<
String
,
Iterable
<
Tuple2
<
String
,
String
>>>,
// T
7
String
,
// K
8
String
>()
{
// V
9
public
Iterable
<
Tuple2
<
String
,
String
>>
10
call
(
Tuple2
<
String
,
Iterable
<
Tuple2
<
String
,
String
>>>
s
)
{
11
// String userID = s._1; // NOT Needed
12
Iterable
<
Tuple2
<
String
,
String
>>
pairs
=
s
.
_2
;
13
String
location
=
"UNKNOWN"
;
14
List
<
String
>
products
=
new
ArrayList
<
String
>();
15
for
(
Tuple2
<
String
,
String
>
t2
:
pairs
)
{
16
if
(
t2
.
_1
.
equals
(
"L"
))
{
17
location
=
t2
.
_2
;
18
}
19
else
{
20
// t2._1.equals("P")
21
products
.
add
(
t2
.
_2
);
22
}
23
}
24
25
// now emit (K, V) pairs
26
List
<
Tuple2
<
String
,
String
>>
kvList
=
27
new
ArrayList
<
Tuple2
<
String
,
String
>>();
28
for
(
String
product
:
products
)
{
29
kvList
.
add
(
new
Tuple2
<
String
,
String
>(
product
,
location
));
30
}
31
// Note that edges must be reciprocal; that
32
// is, every {source, destination} edge must have
33
// a corresponding {destination, source}.
34
return
kvList
;
35
}
36
});
Step 9: Find all locations for a product
In this step, RDD pairs of (product, location)
are grouped by product. We use JavaPairRDD.groupByKey()
to accomplish this, as shown in Example 4-16. This step does some basic debugging too, by calling the JavaPairRDD.collect()
function.
Example 4-16. Step 9: find all locations for a product
1
// Step 9: find all locations for a product;
2
// result will be JavaPairRDD <String, List<String>>
3
JavaPairRDD
<
String
,
Iterable
<
String
>>
productByLocations
=
4
productLocationsRDD
.
groupByKey
();
5
6
// debug3
7
List
<
Tuple2
<
String
,
List
<
String
>>>
debug3
=
productByLocations
.
collect
();
8
System
.
out
.
println
(
"--- debug3 begin ---"
);
9
for
(
Tuple2
<
String
,
Iterable
<
String
>>
t2
:
debug3
)
{
10
System
.
out
.
println
(
"debug3 t2._1="
+
t2
.
_1
);
11
System
.
out
.
println
(
"debug3 t2._2="
+
t2
.
_2
);
12
}
13
System
.
out
.
println
(
"--- debug3 end ---"
);
Step 10: Finalize output by changing value
Step 9 produced a JavaPairRDD<String, List<String>>
object, where the key is the product (as a string) and the value is a List<String>
, which is a list of locations and might have duplicates. To remove duplicate elements from a value, we use the JavaPairRDD.mapValues()
function. We implement this function by converting a List<String>
to a Set<String>
. Note that the keys are not altered. Mapping values is implemented by Function(T, R).call()
, where T
is an input (as List<String>
) and R
is an output (as Tuple2<Set<String>, Integer>
). See Example 4-17.
Example 4-17. Step 10: finalize output
1
// Step 10: finalize output by changing "value" from List<String>
2
// to Tuple2<Set<String>, Integer>, where you have a unique
3
// set of locations and their count
4
JavaPairRDD
<
String
,
Tuple2
<
Set
<
String
>,
Integer
>>
productByUniqueLocations
=
5
productByLocations
.
mapValues
(
6
new
Function
<
Iterable
<
String
>,
// input
7
Tuple2
<
Set
<
String
>,
Integer
>
// output
8
>()
{
9
public
Tuple2
<
Set
<
String
>,
Integer
>
call
(
Iterable
<
String
>
s
)
{
10
Set
<
String
>
uniqueLocations
=
new
HashSet
<
String
>();
11
for
(
String
location
:
s
)
{
12
uniqueLocations
.
add
(
location
);
13
}
14
return
new
Tuple2
<
Set
<
String
>,
Integer
>(
uniqueLocations
,
15
uniqueLocations
.
size
());
16
}
17
});
Step 11: Print the final result RDD
The final step, shown in Example 4-18, emits the results using the JavaPairRDD.collect()
method.
Example 4-18. Step 11: print the final result RDD
1
// Step 11: print the final result RDD
2
// debug4
3
System
.
out
.
println
(
"=== Unique Locations and Counts ==="
);
4
List
<
Tuple2
<
String
,
Tuple2
<
Set
<
String
>,
Integer
>>>
debug4
=
5
productByUniqueLocations
.
collect
();
6
System
.
out
.
println
(
"--- debug4 begin ---"
);
7
for
(
Tuple2
<
String
,
Tuple2
<
Set
<
String
>,
Integer
>>
t2
:
debug4
)
{
8
System
.
out
.
println
(
"debug4 t2._1="
+
t2
.
_1
);
9
System
.
out
.
println
(
"debug4 t2._2="
+
t2
.
_2
);
10
}
11
System
.
out
.
println
(
"--- debug4 end ---"
);
Running the Spark Solution
The shell script
# cat run_left_outer_join.sh #!/bin/bash export JAVA_HOME=/usr/java/jdk7 export SPARK_HOME=/usr/local/spark-1.1.0 export SPARK_MASTER=spark://myserver100:7077 export BOOK_HOME=/home/data-algorithms-book export APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar USERS=/data/leftouterjoins/users.txt TRANSACTIONS=/data/leftouterjoins/transactions.txt # Run on a Spark cluster prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin $SPARK_HOME/bin/spark-submit \ --class $prog \ --master $SPARK_MASTER \ --executor-memory 2G \ --total-executor-cores 20 \ $APP_JAR $USERS $TRANSACTIONS
Running the shell script
The log output from a sample run is shown here; it has been trimmed and formatted to fit the page:
#
./
run_left_outer_join
.
sh
users
=/
data
/
leftouterjoins
/
users
.
txt
transactions
=/
data
/
leftouterjoins
/
transactions
.
txt
...
14
/
06
/
03
17
:
52
:
01
INFO
scheduler
.
DAGScheduler
:
Stage
0
(
collect
at
LeftOuterJoin2
.
java
:
112
)
finished
in
0.163
s
14
/
06
/
03
17
:
52
:
01
INFO
spark
.
SparkContext
:
Job
finished:
collect
at
LeftOuterJoin2
.
java
:
112
,
took
6.365762312
s
---
debug3
begin
---
debug3
t2
.
_1
=
p2
debug3
t2
.
_2
=[
GA
]
debug3
t2
.
_1
=
p4
debug3
t2
.
_2
=[
GA
,
UT
,
CA
]
debug3
t2
.
_1
=
p1
debug3
t2
.
_2
=[
GA
,
UT
,
UT
]
debug3
t2
.
_1
=
p3
debug3
t2
.
_2
=[
UT
]
---
debug3
end
---
===
Unique
Locations
and
Counts
===
14
/
06
/
03
17
:
52
:
01
INFO
spark
.
SparkContext
:
Starting
job:
collect
at
LeftOuterJoin2
.
java
:
137
14
/
06
/
03
17
:
52
:
01
INFO
spark
.
MapOutputTrackerMaster
:
Size
of
output
statuses
for
shuffle
1
is
156
bytes
...
14
/
06
/
03
17
:
52
:
01
INFO
scheduler
.
DAGScheduler
:
Stage
3
(
collect
at
LeftOuterJoin2
.
java
:
137
)
finished
in
0.058
s
14
/
06
/
03
17
:
52
:
01
INFO
spark
.
SparkContext
:
Job
finished:
collect
at
LeftOuterJoin2
.
java
:
137
,
took
0.081830132
s
---
debug4
begin
---
debug4
t2
.
_1
=
p2
debug4
t2
.
_2
=([
GA
],
1
)
debug4
t2
.
_1
=
p4
debug4
t2
.
_2
=([
UT
,
GA
,
CA
],
3
)
debug4
t2
.
_1
=
p1
debug4
t2
.
_2
=([
UT
,
GA
],
2
)
debug4
t2
.
_1
=
p3
debug4
t2
.
_2
=([
UT
],
1
)
---
debug4
end
---
...
14
/
06
/
03
17
:
52
:
02
INFO
scheduler
.
DAGScheduler
:
Stage
6
(
saveAsTextFile
at
LeftOuterJoin2
.
java
:
144
)
finished
in
1.060
s
14
/
06
/
03
17
:
52
:
02
INFO
spark
.
SparkContext
:
Job
finished:
saveAsTextFile
at
LeftOuterJoin2
.
java
:
144
,
took
1.169724354
s
Running Spark on YARN
In this section, we cover how to submit Spark’s ApplicationMaster to Hadoop YARN’s ResourceManager, and instruct Spark to run the left outer join program. Further, we will instruct our Spark program to save our final result into an HDFS file. We save a file to HDFS by adding the following line after creating the productByUniqueLocations
RDD (/left/output is an HDFS output directory):
productByUniqueLocations
.
saveAsTextFile
(
"/left/output"
);
Script to run Spark on YARN
The script to run Spark on YARN is as follows:
# cat leftjoin.sh export SPARK_HOME=/usr/local/spark-1.0.0 export SPARK_JAR=spark-assembly-1.0.0-hadoop2.3.0.jar export SPARK_ASSEMBLY_JAR=$SPARK_HOME/assembly/target/scala-2.10/$SPARK_JAR export JAVA_HOME=/usr/java/jdk6 export HADOOP_HOME=/usr/local/hadoop-2.3.0 export HADOOP_CONF_DIR=$HADOOP_HOME/conf export YARN_CONF_DIR=$HADOOP_HOME/conf export BOOK_HOME=/mp/data-algorithms-book export APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar # Submit Spark's ApplicationMaster to YARN's ResourceManager, # and instruct Spark to run the LeftOuterJoin2 example prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin SPARK_JAR=$SPARK_ASSEMBLY_JAR \ $SPARK_HOME/bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar $APP_JAR \ --class $prog \ --args yarn-standalone \ --args /left/users.txt \ --args /left/transactions.txt \ --num-workers 3 \ --master-memory 4g \ --worker-memory 2g \ --worker-cores 1
For details on Spark parameters (such as num-workers
and worker-memory
) and environment variables, refer to the Spark Summit slides. Most of these parameters depend on the number of worker nodes, number of cores per cluster node, and amount of RAM available. Finding the optimal settings will require some trial and error and experimentation with different sizes of input data.
Running the script
The log output from a sample run is shown here. It has been trimmed and formatted to fit the page:
#
./
leftjoin
.
sh
14
/
05
/
28
16
:
49
:
31
INFO
RMProxy:
Connecting
to
..
ResourceManager
at
myserver100:
8032
14
/
05
/
28
16
:
49
:
31
INFO
Client:
Got
Cluster
metric
info
from
ApplicationsManager
(
ASM
),
number
of
NodeManagers:
13
...
14
/
05
/
28
16
:
49
:
33
INFO
Client:
Command
for
starting
..
the
Spark
ApplicationMaster:
$JAVA_HOME
/
bin
/
java
-
server
-
Xmx4096m
-
Djava
.
io
.
tmpdir
=
$PWD
/
tmp
org
.
apache
.
spark
.
deploy
.
yarn
.
ApplicationMaster
--
class
LeftOuterJoin2
--
jar
/
usr
/
local
/
spark
/
tmp
/
data_algorithms_book
.
jar
--
args
'
yarn
-
standalone
'
--
args
'
/
left
/
users
.
txt
'
--
args
'
/
left
/
transactions
.
txt
'
--
worker
-
memory
2048
--
worker
-
cores
1
--
num
-
workers
3
1
>
<
LOG_DIR
>/
stdout
2
>
<
LOG_DIR
>/
stderr
14
/
05
/
28
16
:
49
:
33
INFO
Client:
Submitting
application
to
ASM
...
yarnAppState:
FINISHED
distributedFinalState:
SUCCEEDED
appTrackingUrl:
http:
//myserver100:50030/proxy/application_1401319796895_0008/A
appUser:
hadoop
Checking the expected output
Here we examine the generated HDFS output:
# hadoop fs -ls /left/output Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2014-05-28 16:49 /left/output/_SUCCESS -rw-r--r-- 2 hadoop supergroup 36 2014-05-28 16:49 /left/output/part-00000 -rw-r--r-- 2 hadoop supergroup 32 2014-05-28 16:49 /left/output/part-00001 # hadoop fs -cat /left/output/part* (p2,([GA],1)) (p4,([UT, GA, CA],3)) (p1,([UT, GA],2)) (p3,([UT],1))
Spark Implementation with leftOuterJoin()
This section implements a left outer join by using Spark’s built-in JavaPairRDD.leftOuterJoin()
method (note that MapReduce/Hadoop does not offer higher-level API functionality such as a leftOuterJoin()
method):
import
scala.Tuple2
;
import
com.google.common.base.Optional
;
import
org.apache.spark.api.java.JavaPairRDD
;
JavaPairRDD
<
K
,
Tuple2
<
V
,
Optional
<
W
>>>
leftOuterJoin
(
JavaPairRDD
<
K
,
W
>
other
)
// Perform a left outer join of this and other. For each
// element (k, v) in this, the resulting RDD will either
// contain all pairs (k, (v, Some(w))) for w in other, or
// the pair (k, (v, None)) if no elements in other have key k.
Using Sparks’s JavaPairRDD.leftOuterJoin()
method helps us avoid:
-
Using the
JavaPairRDD.union()
operation betweenusers
andtransactions
, which is costly -
Introducing custom flags such as
"L"
for location and"P"
for products -
Using extra RDD transformations to separate custom flags from each other
Using the JavaPairRDD.leftOuterJoin()
method enables us to produce the result efficiently. transactionsRDD
is the left table and usersRDD
is the right table:
JavaPairRDD
<
String
,
String
>
usersRDD
=
...;
// (K=userID, V=location)
JavaPairRDD
<
String
,
String
>
transactionsRDD
=
...;
// (K=userID, V=product)
// perform left outer join by built-in leftOuterJoin()
JavaPairRDD
<
String
,
Tuple2
<
String
,
Optional
<
String
>>>
joined
=
transactionsRDD
.
leftOuterJoin
(
usersRDD
);
Now, the joined
RDD contains:
(
u4
,(
p4
,
Optional
.
of
(
CA
)))
(
u5
,(
p4
,
Optional
.
of
(
GA
)))
(
u2
,(
p1
,
Optional
.
of
(
GA
)))
(
u2
,(
p2
,
Optional
.
of
(
GA
)))
(
u1
,(
p3
,
Optional
.
of
(
UT
)))
(
u1
,(
p1
,
Optional
.
of
(
UT
)))
(
u1
,(
p1
,
Optional
.
of
(
UT
)))
(
u1
,(
p4
,
Optional
.
of
(
UT
)))
Since we are interested only in the products and unique locations, in the next step, we ignore userID
s (the key). We accomplish this through another JavaPairRDD.mapToPair()
function. After ignoring userID
s, we generate:
(
p4
,
CA
)
(
p4
,
GA
)
(
p1
,
GA
)
(
p2
,
GA
)
(
p3
,
UT
)
(
p1
,
UT
)
(
p1
,
UT
)
(
p4
,
UT
)
which has the desired information to generate the list of products and unique locations.
Spark Program
In Example 4-19, I present the high-level steps to show you how to use Spark’s built-in JavaPairRDD.leftOuterJoin()
method. Each step is discussed in detail after the example.
Example 4-19. High-level steps
1
// Step 1: import required classes and interfaces
2
public
class
SparkLeftOuterJoin
{
3
public
static
void
main
(
String
[]
args
)
throws
Exception
{
4
// Step 2: read input parameters
5
// Step 3: create Spark's context object
6
// Step 4: create RDD for users data
7
// Step 5: create (userID,location) pairs for users (the right table)
8
// Step 6: create RDD for transactions data
9
// Step 7: create (userID,product) pairs for transactions (the left table)
10
// Step 8: use Spark's built-in JavaPairRDD.leftOuterJoin() method
11
// Step 9: create (product, location) pairs
12
// Step 10: group (product, location) pairs by key
13
// Step 11: create final output (product, Set<location>) pairs by key
14
System
.
exit
(
0
);
15
}
16
}
Step 1: Import required classes and interfaces
We begin with the necessary imports (Example 4-20).Optional
2 represents an immutable object that may contain a non-null
reference to another object (useful for the left outer join, since the joined values may contain null
s if the key is in the left table, but not in the right table). The factory class JavaSparkContext
is used to create new RDDs.
Example 4-20. Step 1: import required classes and interfaces
1
// Step 1: import required classes and interfaces
2
import
scala.Tuple2
;
3
import
org.apache.spark.api.java.JavaRDD
;
4
import
org.apache.spark.api.java.JavaPairRDD
;
5
import
org.apache.spark.api.java.JavaSparkContext
;
6
import
org.apache.spark.api.java.function.Function
;
7
import
org.apache.spark.api.java.function.PairFunction
;
8
import
com.google.common.base.Optional
;
9
import
java.util.Set
;
10
import
java.util.HashSet
;
Step 2: Read input parameters
This step, shown in Example 4-21, reads the locations of the HDFS input files containing our users and transactions data. These input files will be used to create the left table (transactionsRDD
) and the right table (usersRDD
).
Example 4-21. Step 2: read input parameters
1
// Step 2: read input parameters
2
if
(
args
.
length
<
2
)
{
3
System
.
err
.
println
(
"Usage: LeftOuterJoin <users> <transactions>"
);
4
System
.
exit
(
1
);
5
}
6
7
String
usersInputFile
=
args
[
0
];
8
String
transactionsInputFile
=
args
[
1
];
9
System
.
out
.
println
(
"users="
+
usersInputFile
);
10
System
.
out
.
println
(
"transactions="
+
transactionsInputFile
);
Step 3: Create Spark’s context object
This step, shown in Example 4-22, creates a JavaSparkContext
object, which will be used to create new RDDs.
Example 4-22. Step 3: create Spark’s context object
1
// Step 3: create Spark's context object
2
JavaSparkContext
ctx
=
new
JavaSparkContext
();
Step 4: Create an RDD for the users data
This step, demonstrated in Example 4-23, creates usersRDD
, which is a set of (userID, location)
pairs. usersRDD
represents the “right” table for the left outer join operation.
Example 4-23. Step 4: create RDD for users data
1
// Step 4: create RDD for users data
2
JavaRDD
<
String
>
users
=
ctx
.
textFile
(
usersInputFile
,
1
);
Step 5: Create usersRDD (the right table)
As you can see in Example 4-24, this step creates the right table, usersRDD
, which contains (userID,location)
pairs from the users input data.
Example 4-24. Step 5: create (userID,location) pairs for users
1
// Step 5: create (userID,location) pairs for users
2
// <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
3
// Return a new RDD by applying a function to all elements of this RDD.
4
// PairFunction<T, K, V>
5
// T => Tuple2<K, V>
6
JavaPairRDD
<
String
,
String
>
usersRDD
=
7
// T K V
8
users
.
mapToPair
(
new
PairFunction
<
String
,
String
,
String
>()
{
9
public
Tuple2
<
String
,
String
>
call
(
String
s
)
{
10
String
[]
userRecord
=
s
.
split
(
"\t"
);
11
String
userID
=
userRecord
[
0
];
12
String
location
=
userRecord
[
1
];
13
return
new
Tuple2
<
String
,
String
>(
userID
,
location
);
14
}
15
});
Step 6: Create an RDD for the transactions data
This step, shown in Example 4-25, creates transactionsRDD
, which is a set of (userID, product)
pairs. transactionsRDD
represents the left table for the left outer join operation.
Example 4-25. Step 6: create RDD for transactions data
1
// Step 6: create RDD for the transactions data
2
JavaRDD
<
String
>
transactions
=
ctx
.
textFile
(
transactionsInputFile
,
1
);
Step 7: Create transactionsRDD (the left table)
This step, shown in Example 4-26, creates the left table, transactionsRDD
, which contains (userID,product)
pairs from the transactions input data.
Example 4-26. Step 7: create (userID,product) pairs for transactions
1
// Step 7: create (userID,product) pairs for transactions
2
// PairFunction<T, K, V>
3
// T => Tuple2<K, V>
4
// sample transaction input: t1 p3 u1 3 330
5
JavaPairRDD
<
String
,
String
>
transactionsRDD
=
6
// T K V
7
transactions
.
mapToPair
(
new
PairFunction
<
String
,
String
,
String
>()
{
8
public
Tuple2
<
String
,
String
>
call
(
String
s
)
{
9
String
[]
transactionRecord
=
s
.
split
(
"\t"
);
10
String
userID
=
transactionRecord
[
2
];
11
String
product
=
transactionRecord
[
1
];
12
return
new
Tuple2
<
String
,
String
>(
userID
,
product
);
13
}
14
});
Step 8: Use Spark’s built-in JavaPairRDD.leftOuterJoin() method
This is the core step for performing the left outer join operation, using Spark’s JavaPairRDD.leftOuterJoin()
method (see Example 4-27).
Example 4-27. Step 8: use Spark’s built-in JavaPairRDD.leftOuterJoin() method
1
// Step 8: use Spark's built-in JavaPairRDD.leftOuterJoin() method.
2
// JavaPairRDD<K,Tuple2<V,Optional<W>>> leftOuterJoin(JavaPairRDD<K,W> other)
3
// Perform a left outer join of this and other. For each element (k, v) in this,
4
// the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in
5
// other, or the pair (k, (v, None)) if no elements in other have key k.
6
//
7
// Here we perform a transactionsRDD.leftOuterJoin(usersRDD).
8
JavaPairRDD
<
String
,
Tuple2
<
String
,
Optional
<
String
>>>
joined
=
9
transactionsRDD
.
leftOuterJoin
(
usersRDD
);
10
joined
.
saveAsTextFile
(
"/output/1"
);
This step creates the following output (the result of the left outer join operation):
# hadoop fs -cat /output/1/part* (u4,(p4,Optional.of(CA))) (u5,(p4,Optional.of(GA))) (u2,(p1,Optional.of(GA))) (u2,(p2,Optional.of(GA))) (u1,(p3,Optional.of(UT))) (u1,(p1,Optional.of(UT))) (u1,(p1,Optional.of(UT))) (u1,(p4,Optional.of(UT)))
Step 9: Create (product, location) pairs
This step builds another JavaPairRDD
, which contains (product, location)
pairs. Note in Example 4-28 that we completely ignore the userID
s, since we are interested only in products and their unique user locations.
Example 4-28. Step 9: create (product, location) pairs
1
// Step 9: create (product, location) pairs
2
JavaPairRDD
<
String
,
String
>
products
=
3
joined
.
mapToPair
(
new
PairFunction
<
4
Tuple2
<
String
,
Tuple2
<
String
,
Optional
<
String
>>>,
// T
5
String
,
// K
6
String
>
// V
7
()
{
8
public
Tuple2
<
String
,
String
>
call
(
Tuple2
<
String
,
9
Tuple2
<
String
,
Optional
<
String
>>>
t
)
{
10
Tuple2
<
String
,
Optional
<
String
>>
value
=
t
.
_2
;
11
return
new
Tuple2
<
String
,
String
>(
value
.
_1
,
value
.
_2
.
get
());
12
}
13
});
14
products
.
saveAsTextFile
(
"/output/2"
);
This step creates the following output:
# hadoop fs -cat /output/2/part* (p4,CA) (p4,GA) (p1,GA) (p2,GA) (p3,UT) (p1,UT) (p1,UT) (p4,UT)
Step 10: Group (K=product, V=location) pairs by key
This step groups (K=product, V=location)
pairs by key. The result will be (K, V2)
, where V2
is a list of locations (which will include duplicate locations).
Example 4-29. Step 10: group (K=product, V=location) pairs by key
1
// Step 10: group (K=product, V=location) pairs by key
2
JavaPairRDD
<
String
,
Iterable
<
String
>>
productByLocations
=
products
.
groupByKey
();
3
productByLocations
.
saveAsTextFile
(
"/output/3"
);
This step creates the following output:
# hadoop fs -cat /output/3/p* (p1,[GA, UT, UT]) (p2,[GA]) (p3,[UT]) (p4,[CA, GA, UT])
Step 11: Create final output (K=product, V=Set(location))
This final step, shown in Example 4-30, removes duplicate locations and creates (K, V2)
, where V2
is a Tuple2<Set<location>, size>
.
Example 4-30. Step 11: create final output (K=product, V=Set<location>) pairs by key
1
// Step 11: create final output (K=product, V=Set<location>) pairs by K
2
JavaPairRDD
<
String
,
Tuple2
<
Set
<
String
>,
Integer
>>
productByUniqueLocations
=
3
productByLocations
.
mapValues
(
4
new
Function
<
Iterable
<
String
>,
// input
5
Tuple2
<
Set
<
String
>,
Integer
>
// output
6
>()
{
7
public
Tuple2
<
Set
<
String
>,
Integer
>
call
(
Iterable
<
String
>
s
)
{
8
Set
<
String
>
uniqueLocations
=
new
HashSet
<
String
>();
9
for
(
String
location
:
s
)
{
10
uniqueLocations
.
add
(
location
);
11
}
12
return
new
Tuple2
<
Set
<
String
>,
Integer
>(
uniqueLocations
,
13
uniqueLocations
.
size
());
14
}
15
});
16
17
productByUniqueLocations
.
saveAsTextFile
(
"/output/4"
);
This step creates the following final output:
# hadoop fs -cat /output/4/p* (p1,([UT, GA],2)) (p2,([GA],1)) (p3,([UT],1)) (p4,([UT, GA, CA],3))
Combining steps 10 and 11
It is possible to combine steps 10 and 11 into a single Spark operation. We accomplish this with Spark’s combineByKey()
, the most general of the per-key aggregation functions, which enables us to combine values with the same key in a flexible manner. What is the main difference between reduceByKey()
and combineByKey()
? reduceByKey()
reduces values of type V
into V
(the same data type—for example, adding or multiplying integer values). combineByKey()
, however, can combine/transform values of type V
into another type, C
—for example, we may want to combine/transform integer (V
) values into a set of integers (Set<Integer>
). In a nutshell, combineByKey()
allows us to return values that are not the same type as our input data. To use combineByKey()
we need to provide a number of functions. The simplest form of the combineByKey()
signature is shown here:
public <C> JavaPairRDD<K,C> combineByKey( Function<V,C> createCombiner, Function2<C,V,C> mergeValue, Function2<C,C,C> mergeCombiners ) Description: Generic function to combine the elements for each key using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a "combined type" C. Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three functions: - createCombiner, which turns a V into a C (e.g., creates a one-element list) - mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - mergeCombiners, to combine two Cs into a single one.
Our goal is to create a Set<String>
for each key (each key has a list of values, and each value is a string). To accomplish this, we need to implement three basic functions, as shown in Example 4-31.
Example 4-31. Basic functions to be used by combineByKey()
1
Function
<
String
,
Set
<
String
>>
createCombiner
=
2
new
Function
<
String
,
Set
<
String
>>()
{
3
@Override
4
public
Set
<
String
>
call
(
String
x
)
{
5
Set
<
String
>
set
=
new
HashSet
<
String
>();
6
set
.
add
(
x
);
7
return
set
;
8
}
9
};
10
Function2
<
Set
<
String
>,
String
,
Set
<
String
>>
mergeValue
=
11
new
Function2
<
Set
<
String
>,
String
,
Set
<
String
>>()
{
12
@Override
13
public
Set
<
String
>
call
(
Set
<
String
>
set
,
String
x
)
{
14
set
.
add
(
x
);
15
return
set
;
16
}
17
};
18
Function2
<
Set
<
String
>,
Set
<
String
>,
Set
<
String
>>
mergeCombiners
=
19
new
Function2
<
Set
<
String
>,
Set
<
String
>,
Set
<
String
>>()
{
20
@Override
21
public
Set
<
String
>
call
(
Set
<
String
>
a
,
Set
<
String
>
b
)
{
22
a
.
addAll
(
b
);
23
return
a
;
24
}
25
};
After implementing these three basic functions, we are ready to combine steps 10 and 11 by using combineByKey()
. Before we do so, let’s identify the input and output in Table 4-4.
Input | products(K:String, V:String) |
Output | productUniqueLocations(K: String, V: Set<String>) |
Transformer | combineByKey() |
Example 4-32 shows you how to use the combineByKey()
transformer.
Example 4-32. Using combineByKey()
1
JavaPairRDD
<
String
,
Set
<
String
>>
productUniqueLocations
=
2
products
.
combineByKey
(
createCombiner
,
mergeValue
,
mergeCombiners
);
3
// emit the final output
4
Map
<
String
,
Set
<
String
>>
productMap
=
productLocations
.
collectAsMap
();
5
for
(
Entry
<
String
,
Set
<
String
>>
entry
:
productMap
.
entrySet
())
{
6
System
.
out
.
println
(
entry
.
getKey
()
+
":"
+
entry
.
getValue
());
7
}
Sample Run on YARN
Input (right table)
# hadoop fs -cat /data/leftouterjoins/users.txt u1 UT u2 GA u3 CA u4 CA u5 GA
Input (left table)
# hadoop fs -cat /data/leftouterjoins/transactions.txt t1 p3 u1 3 330 t2 p1 u2 1 400 t3 p1 u1 3 600 t4 p2 u2 10 1000 t5 p4 u4 9 90 t6 p1 u1 4 120 t7 p4 u1 8 160 t8 p4 u5 2 40
Script
# cat ./run_left_outer_join_spark.sh #!/bin/bash export JAVA_HOME=/usr/java/jdk7 export SPARK_HOME=/usr/local/spark-1.0.0 export HADOOP_HOME=/usr/local/hadoop-2.5.0 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop BOOK_HOME=/mp/data-algorithms-book APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar USERS=/data/leftouterjoins/users.txt TRANSACTIONS=/data/leftouterjoins/transactions.txt prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin $SPARK_HOME/bin/spark-submit --class $prog \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 10 \ $APP_JAR $USERS $TRANSACTIONS
Generated HDFS output
# hadoop fs -cat /output/1/p* (u5,(p4,Optional.of(GA))) (u2,(p1,Optional.of(GA))) (u2,(p2,Optional.of(GA))) (u1,(p3,Optional.of(UT))) (u1,(p1,Optional.of(UT))) (u1,(p1,Optional.of(UT))) (u1,(p4,Optional.of(UT))) # hadoop fs -cat /output/2/p* (p4,CA) (p4,GA) (p1,GA) (p2,GA) (p3,UT) (p1,UT) (p1,UT) (p4,UT) # hadoop fs -cat /output/3/p* (p1,[GA, UT, UT]) (p2,[GA]) (p3,[UT]) (p4,[CA, GA, UT]) # hadoop fs -cat /output/4/p* (p1,([UT, GA],2)) (p2,([GA],1)) (p3,([UT],1)) (p4,([UT, GA, CA],3))
This chapter implemented the Left Outer Join design pattern, which is often used in analyzing business transactions in a distributed programming environment. The next chapter introduces another design pattern, Order Inversion, which will be implemented in the MapReduce paradigm.
1 edu.umd.cloud9.io.pair.PairOfStrings
(which implements WritableComparable<PairOfStrings>
)
2 com.google.common.base.Optional<T>
is an abstract class.
Get Data Algorithms now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.