Chapter 4. Introduction to Pig Latin
It is time to dig into Pig Latin. This chapter provides you with the basics of Pig Latin, enough to write your first useful scripts. More advanced features of Pig Latin are covered in Chapter 5.
Preliminary Matters
Pig Latin is a data flow language. Each processing step results in a
new dataset, or relation. In
input = load 'data'
, input
is the name of the relation that results from loading the dataset
data
. A relation name is referred to as
an alias. Relation names look like variables, but
they are not. Once made, an assignment is permanent. It is possible to
reuse relation names; for example, this is legitimate:
A=
load
'NYSE_dividends'
(
exchange, symbol, date, dividends);
A=
filter
Aby
dividends>
0
;
A=
foreach
Agenerate
UPPER
(
symbol);
However, it is not recommended. It looks here as
if you are reassigning A
, but really
you are creating new relations called A
, and losing track of the old relations called
A
. Pig is smart enough to keep up, but
it still is not a good practice. It leads to confusion when trying to read
your programs (which A
am I referring
to?) and when reading error messages.
In addition to relation names, Pig Latin also has
field names. They name a field (or column) in a relation. In
the previous snippet of Pig Latin, dividends
and symbol
are examples of field names. These are
somewhat like variables in that they will contain a different value for
each record as it passes through the pipeline, but you cannot assign
values to them.
Both relation and field names must start with an alphabetic
character, and then they can have zero or more alphabetic, numeric, or
_
(underscore) characters. All
characters in the name must be ASCII.
Input and Output
Before you can do anything interesting, you need to be able to add inputs and outputs to your data flows.
load
The first step in any data flow is to specify
your input. In Pig Latin this is done with the load
statement. By
default, load
looks for your data on HDFS in a
tab-delimited file using the default load function
PigStorage
. divs = load
'/data/examples/NYSE_dividends';
will look for a file called
NYSE_dividends in the directory
/data/examples. You can also
specify relative pathnames. By default, your Pig jobs will run in your
home directory on HDFS, /users/yourlogin
.
Unless you change directories, all relative paths will be evaluated from
there. You can also specify a full URL for the path: for example,
'hdfs://nn.acme.com/data/examples/NYSE_dividends'
to read the file from the HDFS instance that has nn.acme.com
as a NameNode.
In practice, most of your data will not be in
tab-separated text files. You also might be loading data from storage
systems other than HDFS. Pig allows you to specify the function for
loading your data with the using
clause. For example, if you wanted to load your data from HBase,
you would use the loader for HBase:
divs=
load
'NYSE_dividends'
using
HBaseStorage
();
If you do not specify a load function, the
built-in function PigStorage
will be used. You can also
pass arguments to your load function via the using
clause.
For example, if you are reading comma-separated text data,
PigStorage
takes an argument to indicate which character to
use as a separator:
divs=
load
'NYSE_dividends'
using
PigStorage
(
','
);
The load
statement also can
have an as
clause, which allows you to
specify the schema of the data you are loading (the syntax and
semantics of declaring schemas in Pig Latin is discussed in “Schemas”):
divs=
load
'NYSE_dividends'
as
(
exchange, symbol, date, dividends);
When specifying a file
to read
from HDFS, you can specify a directory. In this case, Pig will find all
files under the directory you specify and use them as input for that
load
statement. So, if you had a directory input with two datafiles today and yesterday under it, and you specified
input as your file to load, Pig
would read both today and yesterday as input. If the directory you
specify contains other directories, files in those directories will be
included as well.
PigStorage
and
TextLoader
, the two built-in Pig load functions that operate on HDFS files,
support globs.1 With globs, you can read multiple files that are not under
the same directory or read some but not all files under a directory.
Table 4-1 describes globs that are valid in Hadoop
0.20. Be aware that glob meaning is determined by HDFS, not Pig, so the globs that will work for you depend on
your version of HDFS. Also, if you are issuing Pig Latin commands
from a Unix shell command line, you will need to escape many of the
glob characters to prevent your shell from expanding
them.
store
After you have finished processing your data, you will want to write it out
somewhere. Pig provides the store
statement for this purpose. In many ways it is the mirror image of the
load
statement. By default, Pig
stores your data on HDFS in a tab-delimited file using
PigStorage
:2
store
processedinto
'/data/examples/processed'
;
Pig will write the results of your processing
into a directory processed in the
directory /data/examples. You can
specify relative pathnames as well as a full URL for the path, such as
'hdfs://nn.acme.com/data/examples/processed'
.
If you do not specify a store function,
PigStorage
will be used. You can specify a different store
function with a using
clause:
store
processedinto
'processed'
using
HBaseStorage
();
You can also pass arguments to your store
function. For example, if you want to store your data as comma-separated
text data, PigStorage
takes an
argument to indicate which character to use as a separator:
store
processedinto
'processed'
using
PigStorage
(
','
);
As noted in “Running Pig”, when
writing to a filesystem, processed
will be a directory with part files rather than a single file. But how
many part files will be created? That depends on the parallelism of the
last job before the store
. If it has reduces, it will be
determined by the parallel
level set
for that job. See “parallel” for information on how
this is determined. If it is a map-only job, it will be determined by
the number of maps. See “Map Parallelism”.
dump
In most cases you will want to store your data somewhere when you are done processing it.
But occasionally you will want to see it on the screen. This is
particularly useful during debugging and prototyping sessions. It can
also be useful for quick ad hoc jobs. dump
directs the output of your script to your
screen:
dump
processed;
Unlike store
, whose execution
is deferred until the whole Pig script has been parsed, Pig will run the
dump
command immediately, without
considering the Pig statements after it. It is possible that dump
will generate a less optimal plan for the
whole Pig script. Unless you are debugging, you should use store
rather than dump
.
dump
outputs
each record on a separate line, and fields are separated by commas.
Nulls are indicated by missing values. If the output contains complex
data, Pig will surround it with special marks. Maps are surrounded by []
(brackets),
tuples by ()
(parentheses),
and bags by {}
(braces). Because each record in the output is a tuple, it is
surrounded by ()
.
Be careful; you might be inundated with the outputs of dump
if your dataset is large.
Relational Operations
Relational operators are the main tools Pig Latin provides to operate on your data. They allow you to transform it by sorting, grouping, joining, projecting, and filtering. This section covers the basic relational operators. More advanced features of these operators, as well as advanced relational operators, are covered in “Advanced Relational Operations”. What is covered here will be enough to get you started on programming in Pig Latin.
foreach
foreach
takes
a set of expressions and applies them to every record in the
data pipeline—hence the name foreach
.
From these expressions it generates new records to send down the
pipeline to the next operator. For those familiar with database
terminology, it is Pig’s projection operator. For example, the following
code loads an entire record, but then removes all but the user
and id
fields from the record:
A=
load
'input'
as
(
user:chararray
,
id:long
,
address:chararray
,
phone:chararray
,
preferences:map
[]);
B=
foreach
Agenerate
user, id;
Expressions in foreach
foreach
supports a list of expressions. The simplest are constants and
field references. The syntax for constants has already been discussed
in “Types”. Field references can be by name (as shown in the preceding
example) or by position. Positional references are preceded by a
$
(dollar sign) and start from zero:
prices=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
gain=
foreach
pricesgenerate
close-
open; gain2=
foreach
pricesgenerate
$6-
$3;
Here, the relations gain
and gain2
will contain the same values.
Positional-style references are useful in situations where the schema
is unknown or undeclared.
In addition to using names and positions, you
can refer to all fields using *
(an
asterisk), which produces a tuple that contains all the fields. You
can also refer to ranges of fields using ..
(two periods). This is particularly
useful when you have many fields and do not want to repeat them all in
your foreach
command:
prices=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
beginning=
foreach
pricesgenerate
..
open;-- produces exchange, symbol,
-- date, open
middle=
foreach
pricesgenerate
open..close;-- produces open, high,
-- low, close
end=
foreach
pricesgenerate
volume..;-- produces volume, adj_close
Standard arithmetic operators for integers and floating-point
numbers are supported: +
for
addition, -
for subtraction, *
for
multiplication, and /
for division. These operators return values of their own type, so
5/2
is 2
, whereas 5.0/2.0
is 2.5
. In addition, for integers the modulo
operator %
is supported. The unary
negative operator (-
) is also
supported for both integers and floating-point numbers. Pig Latin
obeys the standard mathematical precedence rules. For information on
what happens when arithmetic operators are applied across different
types (for example, 5/2.0
), see
“Casts”.
Null values are viral for all arithmetic operators. That is, x + null = null
for all values of x
.
Pig also provides a binary condition operator, often referred to as
bincond. It begins with a Boolean test, followed
by a ?
, then the value to return if
the test is true, then a :
, and
finally the value to return if the test is false. If the test returns
null
, bincond returns null
. Both value arguments of the bincond
must return the same type:
2
==
2
?
1
:4
--returns 1
2
==
3
?
1
:4
--returns 4
null
==
2
?
1
:4
-- returns null
2
==
2
?
1
:'fred'
-- type error; both values must be of the same type
Note that when you use the bincond operator, you will need to put the entire bincond expression inside parentheses:
daily=
load
'NYSE_daily'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
open:float
,
high:float
,
low:float
,
close:float
,
volume:int
,
adj_close:float
);
updown=
foreach
dailygenerate
(
close>open?'up'
:'down'
);
Pig will return a syntax error if you omit the parentheses.
To extract data from complex types, use the
projection operators. For maps this is #
(the pound sign
or hash), followed by the name of the key as a string:
bball=
load
'baseball'
as
(
name:chararray
,
team:chararray
,
position:bag
{
t:(
p:chararray
)}
,
bat:map
[]);
avg
=
foreach
bballgenerate
bat#
'batting_average'
;
Keep in mind that the value associated with a
key may be of any type. If you reference a key that does
not exist in the map, the result is a null
. Tuple projection is done with .
, the dot
operator. As with top-level records, the field can be referenced by
name (if you have a schema for the tuple) or by position:
A=
load
'input'
as
(
t:tuple
(
x:int
,
y:int
));
B=
foreach
Agenerate
t.x, t.$1;
Referencing a field name that does not exist
in the tuple will produce an error. Referencing a nonexistent
positional field in the tuple will return null
. Bag projection is not as straightforward as map and tuple projection.
Bags do not guarantee that their tuples are stored in any order, so
allowing a projection of the tuple inside the bag would not be
meaningful. Instead, when you project fields in a bag, you are
creating a new bag with only those fields:
A=
load
'input'
as
(
b:bag
{
t:(
x:int
,
y:int
)});
B=
foreach
Agenerate
b.x;
This will produce a new bag whose tuples have
only the field x
in them. You can
project multiple fields in a bag by surrounding the fields with
parentheses and separating them by commas:
A=
load
'input'
as
(
b:bag
{
t:(
x:int
,
y:int
)});
B=
foreach
Agenerate
b.(
x, y);
This seemingly pedantic distinction that
b.x
is a bag and not a scalar value
has consequences. Consider the following Pig Latin, which will not
work:
A=
load
'foo'
as
(
x:chararray
,
y:int
,
z:int
);
B=
group
Aby
x;-- produces bag A containing all records for a given value of x
C=
foreach
Bgenerate
SUM
(
A.y+
A.z);
It is clear what the programmer is trying to
do here. But because A.y
and
A.z
are bags and the addition
operator is not defined on bags, this will produce an error.3 The correct way to do this calculation in Pig Latin is:
A=
load
'foo'
as
(
x:chararray
,
y:int
,
z:int
);
A1=
foreach
Agenerate
x, y+
zas
yz; B=
group
A1by
x; C=
foreach
Bgenerate
SUM
(
A1.yz);
UDFs in foreach
User-defined functions (UDFs) can be invoked in foreach
statements. These are
called evaluation functions, or eval
funcs. Because they are part of a foreach
statement, these UDFs take one record at a time and produce one
output. Keep in mind that either the input or the output can be a bag,
so this one record can contain a bag of records:
-- udf_in_foreach.pig
divs=
load
'NYSE_dividends'
as
(
exchange, symbol, date, dividends);
--make sure all strings are uppercase
upped=
foreach
divsgenerate
UPPER
(
symbol)
as
symbol, dividends; grpd=
group
uppedby
symbol;--output a bag upped for each value of symbol
--take a bag of integers, and produce one result for each group
sums=
foreach
grpdgenerate
group
,
SUM
(
upped.dividends);
In addition, eval funcs can take *
as an argument, which passes the entire
record to the function. They can also be invoked with no arguments at
all.
For a complete list of UDFs that are provided with Pig, see Appendix A. For a discussion of how to invoke UDFs not distributed as part of Pig, see “User-Defined Functions”.
Generating complex data
It is sometimes useful in a foreach
statement to build a
complex field from simple fields. For example, you might want to
combine two simple types together into a tuple. The same notation used
for complex constants (see “Complex Types”) can be
used to build complex types from scalar fields: []
for
maps, ()
for tuples, and {}
for bags. There are some differences
though:
Instead of using
#
to separate the map key and value, use a comma:divs
=
load
'NYSE_dividends'
as
(
exchange
:
chararray
,
symbol,
date,
dividends
:
double
)
;
maps
=
foreach
divs
generate
[
exchange,
dividends];
describe
maps;
maps
:
{
map
[
double
]
}
The reason for using the comma as a delimiter is because
#
is also used in map projection. There could be ambiguity if it were used as the key/value delimiter. Consider the expression[a#b#c]
. Pig has no idea if this means a map of keya#b
and valuec
, or a map of keya
and valueb#c
. With the comma delimiter,[a#b, c]
creates a map of the keya#b
and the valuec
. We don’t have such a problem in map constant.You cannot create a single-field tuple—Pig will treat parentheses as an arithmetic operator:
divs
=
load
'NYSE_dividends'
as
(
exchange
:
chararray
,
symbol,
date,
dividends
:
double
)
;
tuples
=
foreach
divs
generate
(
exchange
)
,
(
exchange,
dividends
)
as
t;
describe
tuples;
tuples
:
{
exchange
:
chararray
,
t
:
(
exchange
:
chararray
,
dividends
:
double
)
}
(exchange)
will not create a tuple in this example.If the bag consists of single-item tuples, the parentheses around the item can be omitted:
divs
=
load
'NYSE_dividends'
as
(
exchange
:
chararray
,
symbol
:
chararray
,
date,
dividends
)
;
bags
=
foreach
divs
generate
{
exchange,
symbol};
describe
bags;
bags
:
You can also use the built-in UDFs TOMAP
, TOTUPLE
, and TOBAG
to generate complex fields from simple
fields.
Naming fields in foreach
The result of each foreach
statement is a new tuple, usually with a different schema than
the tuple that was input to foreach
. Pig can almost
always infer the data types of the fields in this schema from the
foreach
statement. But it cannot always infer the names
of those fields. For fields that are simple projections with no other
operators applied, Pig keeps the same names as before:
divs
=
load
'NYSE_dividends'
as
(
exchange
:
chararray
,
symbol
:
chararray
,
date
:
chararray
,
dividends
:
float
)
;
sym
=
foreach
divs
generate
symbol;
describe
sym;
sym
:
{
symbol
:
chararray
}
Once any expression beyond simple projection
is applied, however, Pig does not assign a name to the field. If you
do not explicitly assign a name, the field will be nameless and will
be addressable only via a positional parameter; for example, $0
. You can assign a name with the
as
clause:
divs
=
load
'NYSE_dividends'
as
(
exchange
:
chararray
,
symbol
:
chararray
,
date
:
chararray
,
dividends
:
float
)
;
in_cents
=
foreach
divs
generate
dividends
*
100.0
as
dividend,
dividends
*
100.0
;
describe
in_cents;
in_cents
:
{
dividend
:
double
,
double
}
The second field is unnamed since we didn’t assign a name to it.
After a flatten
, which can produce multiple values, you can assign a name to
each value:
crawl=
load
'webcrawl'
as
(
url, pageid);
extracted=
foreach
crawlgenerate
flatten
(
REGEX_EXTRACT_ALL
(
url,'(http|https)://(.*?)/(.*)'
))
as
(
protocol, host, path);
Notice that in a foreach
the
as
is attached to each expression. This is different from in
load
, where it is attached to the entire statement. The
reason for this will become clear when we discuss
flatten
.4
In a few cases Pig cannot infer the data type of an expression
in a foreach
statement. Consider
the following example:
extracted=
foreach
crawlgenerate
flatten
(
REGEX_EXTRACT_ALL
(
url,'(http|https)://(.*?)/(.*)'
))
as
(
protocol:chararray
,
host:chararray
,
path:chararray
);
The UDF REGEX_EXTRACT_ALL
only declares its return type as a tuple; it does not declare
the types of the fields of that tuple. However, we know it will return
a chararray. In this case you have the option to declare a type along
with the name, as shown in this example. The syntax is the same as for
declaring names and types in load
. Note that until
version 0.16, Pig does not insert a cast operator. It assumes you are
correct in declaring the type. If you are not, this could result in an
error.5
The following script will result in a compilation error:
in_cents=
foreach
divsgenerate
(
symbol, dividends)
as
(
symbol:chararray
,
dividends:double
);
On the left side, (symbol,
dividends)
will create a tuple. On the right side, (symbol:chararray, dividends:double)
in the
as
statement declares two
individual fields. They don’t match. We need to change it to:
in_cents=
foreach
divsgenerate
(
symbol, dividends)
as
(
t:(
symbol:chararray
,
dividends:double
));
in_cents:{
t:(
symbol:chararray
,
dividends:double
)}
CASE expressions
Starting with Pig 0.12, you can use CASE
for multicondition branches. (If you are using Pig 0.11 or earlier, a
semantically equivalent chain of bincond operators can be constructed.
CASE
was added because it is much simpler.) Here’s an
example:
processed=
FOREACH
loadedGENERATE
(
CASE
i WHEN0
THEN'one'
WHEN1
THEN'two'
ELSE'three'
END);
Depending on the value of i
,
foreach
will generate a chararray value of 'one'
, 'two'
, or 'three'
.
Both the conditions (in this example 0
and 1
)
and the condition values (in this example 'one'
, etc.) can be expressions. If no value
matches the condition and ELSE
is missing, the result
will be null
. This syntax cannot be
used to test for null values, since null ==
null
is null
, not
true
. In the following example, if
gender is 'M'
the result is
1
, if gender is 'F'
the result is 2
, and if gender is something else, such as
'other'
, the result is null
:
processed=
FOREACH
loadedGENERATE
(
CASE
UPPER
(
gender)
WHENUPPER
(
'm'
)
THEN1
WHENUPPER
(
'f'
)
THEN2
END)
as
gendercode;
filter
The filter
statement allows you to select which records will be retained in
your data pipeline. A filter
contains a predicate. If that
predicate evaluates to true for a given record, that record will be
passed down the pipeline. Otherwise, it will not.
Predicates can contain the equality operators you expect, including ==
to test
equality and !=
, >
, >=
, <
, and <=
. These comparators can be used on any
scalar data type. ==
and !=
can be applied to maps and tuples. To use
these with two tuples, both tuples must have either the same schema or
no schema. None of the equality operators can be applied to bags.
Starting with Pig 0.12, you can test whether the value of a scalar field is within a set
of values using the IN
operator:
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
cme_ctb_cht=
filter
divsby
symbolin
(
'CME'
,
'CTB'
,
'CHT'
);
Pig Latin follows the operator precedence rules that are standard in most programming
languages, where arithmetic operators have precedence over equality
operators. So, x + y == a + b
is
equivalent to (x + y) == (a +
b)
.
For chararrays, users can test to see whether the chararray matches a regular expression:
-- filter_matches.pig
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
startswithcm=
filter
divsby
symbolmatches
'CM.*'
;
Note
Pig uses Java’s regular
expression format. This format requires the entire chararray to match, not just
a portion as in Perl-style regular expressions. For example, if you
are looking for all fields that contain the string
fred
, you must say '.*fred.*'
and not 'fred'
. The latter will match only the
chararray fred
.
You can find chararrays that do not match a
regular expression by preceding the test with not
:
-- filter_not_matches.pig
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
notstartswithcm=
filter
divsby
not
symbolmatches
'CM.*'
;
You can combine multiple predicates into one by
using the Boolean operators and
and or
, and you can reverse the
outcome of any predicate by using the Boolean not
operator. As is standard, the precedence
of Boolean operators, from highest to lowest, is not
, and
,
or
. Thus, a
and b or not c
is equivalent to (a
and b) or (not c)
.
Pig will short-circuit Boolean operations when
possible. If the first (left) predicate of an and
is false, the second (right) predicate
will not be evaluated. So, in 1 == 2 and
udf(x)
, the UDF will never be invoked. Similarly, if the first
predicate of an or
is true, the
second predicate will not be evaluated. 1 == 1
or udf(x)
will never invoke the UDF.
For Boolean operators, nulls follow the SQL ternary logic. Thus, x ==
null
results in a value of null
, not true
(even when
x
is
null
also) or false
. Filters pass through only those values
that are true
. Consider the following
example:
A=
load
'input.txt'
as
(
x:int
,
y:int
);
B1=
filter
Aby
x==
2
;
C1=
store
B1into
'output1'
;
B2=
filter
Aby
x!=
2
;
C2=
store
B2into
'output2'
;
If the input file had three rows, (2,1)
, (null,1)
, (4,1)
, C1
would store (2,1)
, C2
would store (4,1)
, and (null,1)
would appear in neither C1
nor C2
.
The way to look for null values is to use the is null
operator, which returns true
whenever the value is null
. To find
values that are not null, use is not
null
.
Likewise, null
neither matches nor fails to match any
regular expression value.
Just as there are UDFs to be used in evaluation
expressions, there are UDFs specifically for filtering records, called
filter funcs. These are eval funcs that return a Boolean value and can be invoked
in the filter
statement. Filter funcs can also be
used in foreach
statements.
group
The group
statement collects together records with the same key. It is the first operator we have looked at that
shares its syntax with SQL, but it is important to understand that the
grouping operator in Pig Latin is fundamentally different from the one
in SQL. In SQL the group by
clause creates a group that must feed directly into one or more
aggregate functions. In Pig Latin there is no direct connection between
group
and aggregate functions. Instead, group
does exactly what it says: collects all records with the same value for
the provided key together into a bag. You can then pass this to an
aggregate function if you want, or do other things with it:
-- count.pig
daily=
load
'NYSE_daily'
as
(
exchange, stock);
grpd=
group
dailyby
stock; cnt=
foreach
grpdgenerate
group
,
COUNT
(
daily);
That example groups records by the key stock
and then counts them. It is just as
legitimate to group them and then store them for processing at a later
time:
-- group.pig
daily=
load
'NYSE_daily'
as
(
exchange, stock);
grpd=
group
dailyby
stock;store
grpdinto
'by_group'
;
The records coming out of the group by
statement have two fields: the key
and the bag of collected records. The key field is named group
.6 The bag is named for the alias that was grouped, so in the
previous examples it will be named daily
and have the same schema as the relation
daily
. If the relation daily
has no schema, the bag daily
will have no schema. For each record in
the group, the entire record (including the key) is in the bag. Changing
the last line of the previous script from store grpd...
to describe grpd;
will produce:
grpd:{
group
:bytearray
,
daily:{
exchange:bytearray
,
stock:bytearray
}}
You can also group on multiple keys, but the
keys must be surrounded by parentheses. The resulting records still have
two fields. In this case, the group
field is a tuple with a field for each key:
--twokey.pig
daily
=
load
'NYSE_daily'
as
(
exchange,
stock,
date,
dividends
)
;
grpd
=
group
daily
by
(
exchange,
stock
)
;
avg
=
foreach
grpd
generate
group
,
AVG
(
daily.dividends
)
;
describe
grpd;
grpd
:
{
group
:
(
exchange
:
bytearray
,
stock
:
bytearray
)
,
daily
:
{
exchange
:
bytearray
,
stock
:
bytearray
,
date
:
bytearray
,
dividends
:
bytearray
}
}
You can also use all
to group together all of the records in
your pipeline:
--countall.pig
daily=
load
'NYSE_daily'
as
(
exchange, stock);
grpd=
group
dailyall
;
cnt=
foreach
grpdgenerate
COUNT
(
daily);
The record coming out of group all
has the chararray literal all
as a
key. Usually this does not matter because you will pass the bag directly
to an aggregate function such as COUNT
. But if you plan to store the record or
use it for another purpose, you might want to project out the artificial
key first.
group
is the first operator we have
looked at that usually will force a reduce phase.7 Grouping means collecting all records where the key has
the same value. If the pipeline is in a map phase, this will force it to shuffle and then reduce.
If the pipeline is already in a reduce phase, this will force it to pass
through the map, shuffle, and reduce phases.
Because grouping collects
all records with the same value for the key, you
often get skewed results. That is, just because you have specified that
your job will have 100 reducers, there is no reason to expect that the
number of values per key will be distributed evenly. The values might
have a Gaussian or power law distribution.8 For example, suppose you have an index of web pages and
you group by the base URL. Certain values, such as yahoo.com
, are going to have far more entries
than most, which means that some reducers will get far more data than
others. Because your MapReduce job is not finished (and any subsequent
ones cannot start) until all your reducers have finished, this skew will
significantly slow your processing. In some cases it will also be
impossible for one reducer to manage that much data.
Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop’s combiner. This does not remove all skew, but it places a bound on it. And because for most jobs the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.
Unfortunately, not all calculations can be done using the combiner. Calculations that can be decomposed into any number of steps, such as sum, are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user’s actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing the user’s interaction with the site.
Pig’s operators and built-in UDFs use the
combiner whenever possible, because of its skew-reducing features and
because early aggregation greatly reduces the amount of data shipped
over the network and written to disk, thus speeding performance
significantly. UDFs can indicate when they can work with the combiner by
implementing the Algebraic
interface. For
information on how to make your UDFs use the combiner, see “The Algebraic Interface”.
For information on how to determine the level of
parallelism when executing your group
operation, see “parallel”. Also, keep in mind that when using group
all
, you are necessarily serializing your pipeline. That is, this
step and any steps after it until you split out the single bag now
containing all of your records will not be done in parallel.
Finally, group
handles nulls in the
same way that SQL handles them: by collecting all records
with a null
key into the same group.
Note that this is in direct contradiction to the way expressions handle
nulls (remember that neither null ==
null
nor null != null
is
true) and to the way join
(see “join”) handles nulls.
order by
The order
statement sorts your data for you, producing a total order for your output
data. Total order means that not only is the data sorted in each
partition of your data, but it is also guaranteed that all records in
partition n
are less than all records in
partition n - 1
for all
n
. When your data is stored on HDFS, where
each partition is a part file, this means that cat
will output your data in order.
The syntax of order
is similar to
group
. You indicate a key or set of keys by which you wish to order your data. One glaring
difference is that there are no parentheses around the keys when
multiple keys are indicated in order
:
--order.pig
daily=
load
'NYSE_daily'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
open:float
,
high:float
,
low:float
,
close:float
,
volume:int
,
adj_close:float
);
bydate=
order
dailyby
date;--order2key.pig
daily=
load
'NYSE_daily'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
open:float
,
high:float
,
low:float
,
close:float
,
volume:int
,
adj_close:float
);
bydatensymbol=
order
dailyby
date, symbol;
It is also possible to reverse the order of the
sort by appending desc
to a key in the sort. In
order
statements with multiple keys, desc
applies only to the key it immediately follows. Other keys will
still be sorted in ascending order:
--orderdesc.pig
daily=
load
'NYSE_daily'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
open:float
,
high:float
,
low:float
,
close:float
,
volume:int
,
adj_close:float
);
byclose=
order
dailyby
closedesc
,
open;dump
byclose;-- open still sorted in ascending order
Data is sorted based on the types of the
indicated fields: numeric values are sorted numerically; chararray
fields are sorted lexically; and bytearray fields are sorted lexically, using byte values
rather than character values. Sorting by map, tuple, or bag fields
produces errors. For all data types, nulls are taken to be smaller than all possible values for that
type, and thus will always appear first (or last when desc
is used).
As discussed in the previous section, skew of
the values in data is very common. This affects order
just
as it does group
, causing some reducers to take
significantly longer than others. To address this, Pig balances the
output across reducers. It does this by first sampling the input of the
order
statement to get an estimate of the key distribution.
Based on this sample, it then builds a partitioner that produces a
balanced total order. For example, suppose you are ordering on a
chararray field with the values a, b, e, e, e,
e, e, e, m, q, r, z
, and you have three reducers. The
partitioner in this case would decide to partition your data such that
values a
–e
go to reducer 1, e
goes to reducer 2, and m
–z
go to
reducer 3. Notice that the value e
can be sent to either reducer 1 or 2. Some records with key e
will be sent to reducer 1 and some to 2.
This allows the partitioner to distribute the data evenly. In practice,
we rarely see variance in reducer time exceed 10% when using this
algorithm.
An important side effect of the way Pig
distributes records to minimize skew is that it breaks the MapReduce convention
that all instances of a given key are sent to the same partition. If you
have other processing that depends on this convention, do not use Pig’s
order
statement to sort data for it.
order
always causes your data
pipeline to go through a reduce phase. This is necessary to collect all
equal records together. Also, Pig adds an additional MapReduce job to
your pipeline to do the sampling. Because this sampling is very
lightweight (it reads only the first record of every block9), it generally takes less than 5% of the total job
time.
distinct
The distinct
statement is very simple. It removes duplicate records. It works only on entire records, not on
individual fields:
--distinct.pig
-- Find a distinct list of ticker symbols for each exchange.
-- This load will truncate the records, picking up just the first two fields.
daily=
load
'NYSE_daily'
as
(
exchange:chararray
,
symbol:chararray
);
uniq=
distinct
daily;
Because it needs to collect like records
together in order to determine whether they are duplicates, distinct
forces a reduce phase. It does make
use of the combiner to remove any duplicate records it can delete in the
map phase.
The use of distinct
shown here is
equivalent to select distinct x
in SQL. To learn how to do
the equivalent of select count(distinct x)
, see “Nested foreach”.
join
join
is one
of the workhorses of data processing, and it is likely to be in many of your
Pig Latin scripts. join
selects records from one input to
put together with records from another input. This is done by indicating
keys for each input. When those keys are equal,10 the two rows are joined. Records for which no match is
found are dropped:
--join.pig
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
divs=
load
'NYSE_dividends'
as
(
exchange, symbol, date, dividends);
jnd=
join
dailyby
symbol, divsby
symbol;
You can also join on multiple keys. In all cases you must have the same number of keys, and they must be of the same or compatible types (where compatible means that an implicit cast can be inserted; see “Casts”):
-- join2key.pig
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
divs=
load
'NYSE_dividends'
as
(
exchange, symbol, date, dividends);
jnd=
join
dailyby
(
symbol, date)
,
divsby
(
symbol, date);
Like foreach
, join
preserves the names of the fields of the inputs passed to it. It also
prepends the name of the relation the field came from, followed by ::
. Adding
describe jnd;
to the end of the previous example
produces:
jnd:{
daily::exchange:bytearray
,
daily::symbol:bytearray
,
daily::date:bytearray
,
daily::open:bytearray
,
daily::high:bytearray
,
daily::low:bytearray
,
daily::close:bytearray
,
daily::volume:bytearray
,
daily::adj_close:bytearray
,
divs::exchange:bytearray
,
divs::symbol:bytearray
,
divs::date:bytearray
,
divs::dividends:bytearray
}
The daily::
prefix needs to be used only when the field name is no longer unique in
the record. In this example, you will need to use daily::date
or divs::date
if you wish to refer to one of the
date
fields after the join, but
fields such as open
and divs
do not need a prefix because there is no
ambiguity.
Pig also supports outer
joins. In outer joins, records that do not have a match on the other
side are included, with null values being filled in for the missing
fields. Outer joins can be left
, right
, or
full
. A left outer join means records from the left side
will be included even when they do not have a match on the right side.
Likewise, a right outer join means records from the right side will be
included even when they do not have a match on the left side. A full
outer join means records from both sides are taken even when they do not
have matches. Here’s an example of a left outer join:
--leftjoin.pig
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
divs=
load
'NYSE_dividends'
as
(
exchange, symbol, date, dividends);
jnd=
join
dailyby
(
symbol, date)
left
outer
,
divsby
(
symbol, date);
outer
is optional and can be
omitted. Unlike in some SQL implementations, however, full
is not optional. C = join A by x outer, B by u;
will
generate a syntax error, not a full outer join.
Outer joins are supported only when Pig knows the schema of the data on the side(s) for which it might need to fill in nulls. Thus, for left outer joins, it must know the schema of the right side; for right outer joins, it must know the schema of the left side; and for full outer joins, it must know both. This is because, without the schema, Pig will not know how many null values to fill in.11
As in SQL, null values for keys do not match anything—even null values from the other input. So, for inner joins, all records with null key values are dropped. For outer joins, they will be retained but will not match any records from the other input.
Pig can also do multiple joins in a single operation, as long as they are all being joined on the same key(s). This can be done only for inner joins:
A=
load
'input1'
as
(
x, y);
B=
load
'input2'
as
(
u, v);
C=
load
'input3'
as
(
e, f);
alpha=
join
Aby
x, Bby
u, Cby
e;
Self joins are supported, though the data must be loaded twice:
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends);
divs2=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends);
jnd=
join
divs1by
symbol, divs2by
symbol; increased=
filter
jndby
divs1::date<
divs2::dateand
divs1::dividends<
divs2::dividends;
If the preceding code were changed to the following, it would fail:
--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends);
jnd=
join
divs1by
symbol, divs1by
symbol; increased=
filter
jndby
divs1::date<
divs2::dateand
divs1::dividends<
divs2::dividends;
It seems like this ought to work, since Pig could
split the divs1
dataset and send it
to join
twice. But the problem is
that field names would be ambiguous after the join, so the load
statement must be written twice. The next
best thing would be for Pig to figure out that these two
load
statements are loading the same input and then run the
load only once, but it does not do that currently.
Pig does these joins in MapReduce by using the map phase to annotate each record
with which input it came from. It then uses the join key as the shuffle
key. Thus, join
forces a new reduce phase. Once all of the
records with the same value for the key are collected together, Pig does
a cross product between the records from both inputs. To minimize memory
usage, it has MapReduce order the records coming into the reducer using
the input annotation it added in the map phase. So, all of the records
for the left input arrive first, and Pig caches these in memory. All of
the records for the right input then arrive. As each of these records
arrives, it is crossed with each record from the left side to produce an
output record. In a multiway join, the left n
- 1 inputs are held in memory, and the n
th is
streamed through. It is important to keep this in mind when writing
joins in your Pig queries. If you know that one of your inputs has more
records per value of the chosen key, placing that larger input on the
right side of your join will lower memory usage and possibly increase
your script’s performance.
limit
Sometimes you want to see only a limited number of results. limit
allows you
do this:
--limit.pig
divs=
load
'NYSE_dividends'
;
first10=
limit
divs10
;
The example here will return at most 10 lines
(if your input has less than 10 lines total, it will return them all).
Note that for all operators except order
, Pig does not
guarantee the order in which records are produced. Thus, because
NYSE_dividends has more than 10
records, the example script could return different results every time
it’s run. Putting an order
immediately
before the limit
will guarantee that the same results are
returned every time.
limit
causes
an additional reduce phase, since it needs to collect the records
together to count how many it is returning. It does optimize this phase
by limiting the output of each map and then applying the limit again in
the reducer. In the case where limit
is combined with
order
, the two are done together on the map and the reduce.
That is, on the map side, the records are sorted by MapReduce and the
limit is applied in the combiner. They are then sorted again by
MapReduce as part of the shuffle, and Pig applies the limit again in the
reducer.
Whenever possible, Pig terminates reading of the
input early once it has reached the number of records specified by
limit
. This can significantly reduce the time it takes to
run the script.
sample
sample
offers
a simple way to get a sample of your data. It reads through all of
your data but returns only a percentage of rows. What percentage it
returns is expressed as a double value, between 0 and 1. So, in the
following example, 0.1
indicates
10%:
--sample.pig
divs=
load
'NYSE_dividends'
;
some=
sample
divs0.1
;
Currently the sampling algorithm is very simple.
The sample A 0.1
is rewritten to filter A by random()
<= 0.1
. Obviously this is nondeterministic, so results of a
script with sample
will vary with every run. Also, the
percentage will not be an exact match, but close. There has been
discussion about adding more sophisticated sampling techniques, but it
has not been done yet.
parallel
One of Pig’s core claims is that it provides a language for parallel data
processing. One of the tenets of Pig’s philosophy is that Pigs are
domestic animals (see “The Pig Philosophy”), so Pig prefers
that you tell it how parallel to be. To this end, it provides the
parallel
clause.
The parallel
clause can be attached
to any relational operator in Pig Latin. However, it controls only
reduce-side parallelism, so it makes sense only for operators that force
a reduce phase. These are:
group
*, order
,
distinct
, join
*,
limit
, cogroup
*,
cross
, and union
.
Operators marked with an asterisk have multiple implementations,
some of which force a reduce and some of which do not.
union
forces a reduce only in Tez. For details on this and
on operators not covered in this chapter, see Chapter 5. parallel
also works in
local modes, though it does not make Pig scripts run faster than a
single reduce as reduces run serially.12
Let’s take a look at an example:
--parallel.pig
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl=
group
dailyby
symbolparallel
10
;
In this example, parallel
will
cause the MapReduce job spawned by Pig to have 10 reducers. If you list
the files inside output, you will
see 10 part files.
However, the group all
statement aggregates all
inputs, and it is impossible to use more than one reducer. No matter how
many reduce tasks you specify in the parallel
clause, Pig
will only use one:
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl=
group
dailyall
parallel
10
;
You can also see this by listing the files in output.
parallel
clauses apply only to the statements to
which they are attached; they do not carry through the script. So, if
this group
were followed by an order
,
parallel
would need to be set for that order
separately. Most likely the group
will reduce your data
size significantly and you will want to change the parallelism:
--parallel.pig
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl=
group
dailyby
symbolparallel
10
;
average=
foreach
bysymblgenerate
group
,
AVG
(
daily.close)
as
avg
;
sorted=
order
averageby
avg
desc
parallel
2
;
If, however, you do not want to set parallel
separately for every reduce-invoking
operator in your script, you can set a script-wide value using the
set
command. In this script, all MapReduce jobs will be
done with 10 reducers:
--defaultparallel.pig
set
default_parallel10
;
daily=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl=
group
dailyby
symbol; average=
foreach
bysymblgenerate
group
,
AVG
(
daily.close)
as
avg
;
sorted=
order
averageby
avg
desc
;
When you set a default_parallel
level, you can still add a
parallel
clause to any statement to override the default
value. Thus, you can set a default value as a base to use in most cases
and specifically add a parallel
clause only when you have
an operator that needs a different value.
All of this is rather static, however. What
happens if you run the same script across different inputs that have
different characteristics? Or what if your input data varies
significantly sometimes? You do not want to have to edit your script
each time. Using parameter substitution, you can write your parallel
clauses with variables, providing
values for those variables at runtime. See “Parameter Substitution”
for details.
So far, we have assumed that you know what your
parallel
value should be. See “Select the Right Level of Parallelism” for information on how to determine
that.
Finally, what happens if you do not specify a
parallel
level? Pig will do a gross
estimate of what the parallelism should be set to. It looks at the
initial input size, assumes there will be no data size changes, and then
allocates a reducer for every 1 GB of data by default. This can be
customized by setting pig.exec.reducers.bytes.per.reducer
. The
maximum number of reducers to be used in the estimation is defined in
pig.exec.reducers.max
, which defaults
to 999
. It must be emphasized that
this is not a good algorithm. It is provided only to prevent mistakes
that result in scripts running very slowly and, in some extreme cases,
mistakes that cause MapReduce itself to have problems. This is a safety
net, not an optimizer.
User-Defined Functions
Much of the power of Pig lies in its ability to let users combine its operators with their own or others’ code via UDFs. Pig UDFs can be written in many programming languages, including Java, Jython, JavaScript, Ruby, Groovy (since Pig 0.11), and Python13 (since Pig 0.12).
Pig itself comes packaged with a large number of built-in UDFs. For a complete list and descriptions of the built-in UDFs, see “Built-in UDFs”.
PiggyBank is a collection of user-contributed UDFs that is packaged and released along with Pig. PiggyBank UDFs are not included in the Pig JAR, and thus you have to register them manually in your scripts. See “PiggyBank” for more information.
There are also third-party Pig UDF libraries, notably DataFu from LinkedIn and Elephant Bird from Twitter.
Of course, you can also write your own UDFs or use those written by other users. For details of how to write your own, see Chapter 9. Finally, you can use some static Java functions as UDFs as well.
Registering Java UDFs
When you use a UDF that is not already built into Pig, Pig needs to be able
to find the Java JAR that contains the UDF. If the JAR is in your Java
classpath, then Pig will automatically locate and load the JAR. If not, you have to tell Pig where to look for that
UDF. This is done via the register
command. For example, let’s say you want to use the Reverse
UDF provided in
PiggyBank (for information on where to find the PiggyBank
JAR, see “PiggyBank”):
--register.pig
register 'your_path_to_piggybank
/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate
org.apache.pig.piggybank.evaluation.string.Reverse(symbol);
This example tells Pig that it needs to include
code from your_path_to_piggybank
/piggybank.jar.
Note
If your UDF requires additional JARs beyond the one it is contained in (e.g., third-party libraries), Pig may ship those automatically if the UDF supports “auto-shipping” (see “Shipping JARs Automatically” and “Shipping JARs Automatically”). With auto-shipping, the UDF will tell Pig about the dependent JARs it requires. However, if your UDF implementation is old or does not support the new auto-shipping features, you will have to register those JARs manually even if they are present in your Java classpath. Pig cannot do the dependency analysis to determine what additional JARs a UDF requires.
In this example, we have to give Pig the full
package and class name of the UDF. This verbosity can be alleviated in
two ways. The first option is to use the define
command (see
“define and UDFs”). The second option is to include a set of
packages on the command line for Pig to search when looking for UDFs.
So, if instead of invoking Pig with pig
register.pig
we change our invocation to this:
pig
-
Dudf.import.list=org.apache.pig.piggybank.evaluation.stringregister
.
pig
We can change our script to:
register 'your_path_to_piggybank
/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate Reverse(symbol);
Using yet another property, we can get rid of
the register
command as
well. If we add -Dpig.additional.jars=/usr/local/pig/piggybank/piggybank.jar
to our command line, this command is no longer necessary.14
You may wonder if you should add your JARs to the classpath as well. The quick answer is no. When you register the JARs, Pig also changes its own class loader to include them, so it will find the JARs in the frontend even though the classpath does not include them. However, if you do not register your JARs and you are expecting Pig to ship JARs automatically (using the auto-shipping feature discussed in the previous note), you need to add them in your classpath. Otherwise, Pig won’t know how to find them.
In many cases it is better to deal with
registration and definition issues explicitly in the script via the
register
and define
commands than to use these
properties. Otherwise, everyone who runs your script has to know how to
configure the command line. However, in some situations your scripts
will always use the same set of JARs and always look in the same places
for them. For instance, you might have a set of JARs used by everyone in
your company. In this case, placing these properties in a shared
properties file and using that with your Pig Latin scripts will make
sharing those UDFs easier and ensure that everyone is using the correct
versions of them.
The register
command can also take
an HDFS path as well as other protocols that Hadoop understands, such as
S3. You could say register
'hdfs:///user/jar/acme.jar';
or register 's3://mybucket/jar/acme.jar';
if your
JARs are stored in HDFS or S3.
register
accepts globs too, so if all of the JARs you need were stored in
one directory, you could include them all with register
'/usr/local/share/pig/udfs/*.jar'
.
Registering UDFs in Scripting Languages
register
is also used to locate resources for scripting UDFs that you use in your
Pig Latin scripts. In this case you do not register a JAR, but rather a
script that contains your UDF. The script must be referenced from your
current directory. Using the examples provided in the example code,
copying udfs/python/production.py
to the data directory looks like
this:
--batting_production.pig
register
'production.py'
using
jythonas
bballudfs; players=
load
'baseball'
as
(
name:chararray
,
team:chararray
,
pos:bag
{
t:(
p:chararray
)}
,
bat:map
[]);
nonnull=
filter
playersby
bat#
'slugging_percentage'
is
not
null
and
bat#
'on_base_percentage'
is
not
null
;
calcprod=
foreach
nonnullgenerate
name, bballudfs.production(
(
float
)
bat#
'slugging_percentage'
,
(
float
)
bat#
'on_base_percentage'
);
The important differences here are the
using jython
and as bballudfs
portions of the
register
statement. using jython
tells Pig
that this UDF is written in Jython, not Java, and it should use Jython to compile
the UDF.
as bballudfs
defines a namespace
that UDFs from this file are placed in. All UDFs from this file must now
be invoked as bballudfs.
.
Each script file you load should be given a separate namespace. This
avoids naming collisions when you register two scripts with duplicate
function names.udfname
You can register UDFs written in JavaScript, JRuby, Groovy, and Python using similar syntax:
- JavaScript
register 'production.js' using javascript as bballudfs;
- JRuby
register 'production.rb' using jruby as bballudfs;
- Groovy
register 'production.groovy' using groovy as bballudfs;
- Python
register 'production.py' using streaming_python as bballudfs;
define and UDFs
As was alluded to earlier, define
can be
used to provide an alias so that you do not have to use full package
names for your Java UDFs. It can also be used to provide constructor
arguments to your UDFs. define
is used in defining
streaming commands too, but this section covers only its UDF-related
features. For information on using define
with streaming,
see “stream”.
The following provides an example of using define
to
provide an alias for
org.apache.pig.piggybank.evaluation.string.Reverse
:
--define.pig
register 'your_path_to_piggybank
/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);
Eval and filter functions can also take one or
more strings as constructor arguments. If you are using a UDF that takes
constructor arguments, define
is the place to provide those
arguments. For example, consider a method
CurrencyConverter
that takes two constructor
arguments, the first indicating which currency you are converting from
and the second which currency you are converting to:
--define_constructor_args.pig
register
'acme.jar'
;
define
convert com.acme.financial.CurrencyConverter(
'dollar'
,
'euro'
);
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
backwards=
foreach
divsgenerate
convert
(
dividends);
Calling Static Java Functions
Java has a rich collection of utilities and libraries. Because Pig is implemented in Java, some of these functions can be exposed to Pig users. Starting in version 0.8, Pig offers invoker methods that allow you to treat certain static Java functions as if they were Pig UDFs.
Any public static Java function that takes either no arguments or some
combination of int
, long
, float
,
double
, String
, or arrays thereof15 and returns an int
, long
,
float
, double
, or String
value
can be invoked in this way.
Because Pig Latin does not support overloading
on return types, there is an invoker for each return type:
InvokeForInt
,
InvokeForLong
,
InvokeForFloat
,
InvokeForDouble
, and
InvokeForString
. You must pick the appropriate invoker for the type you wish to return. These
methods take two constructor arguments. The first is the full package,
class name, and method name. The second is a space-separated list of
parameters the Java function expects. Only the types of the parameters
are given. If the parameter is an array, []
(square
brackets) are appended to the type name. If the method takes no
parameters, the second constructor argument is omitted.
For example, if you wanted to use Java’s
Integer
class to translate decimal values to hexadecimal values, you could
do:
--invoker.pig
define
hexInvokeForString
(
'java.lang.Integer.toHexString'
,
'int'
);
divs=
load
'NYSE_daily'
as
(
exchange, symbol, date, open, high, low, close, volume, adj_close);
nonnull=
filter
divsby
volumeis
not
null
;
inhex=
foreach
nonnullgenerate
symbol,hex
((
int
)
volume);
If your method takes an array of types, Pig will
expect to pass it a bag where each tuple
has a single field of that type. So, if you had a Java method
called com.yourcompany.Stats.stdev
that took an array of doubles, you could use it like this:
define
stdevInvokeForDouble
(
'com.acme.Stats.stdev'
,
'double[]'
);
A=
load
'input'
as
(
id:int
,
dp:double
);
B=
group
Aby
id; C=
foreach
Bgenerate
group
,
stdev
(
A.dp);
Warning
Invokers do not use the Accumulator
or
Algebraic
interfaces, and are thus likely to be
much slower and to use much more memory than UDFs written specifically
for Pig. This means that before you pass an array argument to an
invoker method, you should think carefully about whether those
inefficiencies are acceptable. For more information on these
interfaces, see “The Accumulator Interface” and “The Algebraic Interface”.
Invoking Java functions in this way does have a small cost because reflection is used to find and invoke the methods.
Invoker functions throw a Java IllegalArgumentException
when
they are passed null input. You should place a filter before the
invocation to prevent this.
Calling Hive UDFs
Pig 0.15 added support for calling Apache Hive UDFs inside Pig.
There are three types of UDFs in Hive: standard UDFs, UDTFs (user-defined table-generating functions), and UDAFs (user-defined aggregate functions). Hive’s standard UDFs are similar to Pig’s standard UDFs, which take one input and produce one output at a time.
A Hive standard UDF extends either org.apache.hadoop.hive.ql.exec.UDF
or org.apache.hadoop.hive.ql.udf.generic.GenericUDF
.
Hive’s UDTFs, on the other hand, are able to produce any number
of outputs per input. Hive UDTFs extend org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
.
Hive’s UDAFs are similar to Pig’s algebraic UDFs in the sense
that both aggregate the inputs and produce one output. Hive UDAFs extend
either org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
or org.apache.hadoop.hive.ql.exec.UDAF
.
For more information about Hive UDFs, please refer to the Hive
UDF documentation.
Hive UDFs are accessed via corresponding built-in Pig UDFs. For a
standard Hive UDF, invoke HiveUDF
,
for a Hive UDTF invoke HiveUDTF
, and
for a Hive UDAF invoke HiveUDAF
.
All three types of UDF share the same syntax. Here is one example of invoking a standard Hive UDF:
define
lowerHiveUDF
(
'org.apache.hadoop.hive.ql.udf.generic.StringLower'
);
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
lowersymbol=
foreach
divsgenerate
exchange,lower
(
symbol)
,
date, dividends;
You need to pass the name of the Hive UDF as a constructor
argument to HiveUDF
.
The name you pass can be the fully qualified class name, or the
abbreviation defined in the Hive FunctionRegistry
for built-in Hive UDFs. In the preceding example,
org.apache.hadoop.hive.ql.udf.generic.StringLower
is a Hive built-in UDF and has the abbreviation lower
, so we can rewrite the define
statement as follows:
define
lowerHiveUDF
(
'lower'
);
Here is an example using a Hive UDTF:
define
posexplodeHiveUDTF
(
'posexplode'
);
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
grouped=
group
divsby
symbol; exploded=
foreach
groupedgenerate
flatten
(
posexplode
(
divs));
For every input, HiveUDTF
creates a bag of the outputs. There is also one additional output
that includes the outputs generated in the close
method of
the Hive UDTF. This is because the Hive UDTF may choose to generate an
additional output when it is informed that it has seen all its
input.16 You will need to flatten the bag to get the same flattened
outputs as Hive.
Here is an example with HiveUDAF
:
define
varianceHiveUDAF
(
'variance'
);
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:double
);
grouped=
group
divsby
symbol; aggregated=
foreach
groupedgenerate
variance
(
divs.dividends);
HiveUDAF
is implemented as an
algebraic UDF. Algebraic Pig UDFs aggregate their input both in the
combiner and in the reducer. However, the way Hive aggregates inputs is
very different from how Pig does it. Hive does not use the combiner;
rather, it handles aggregation itself in the map task. HiveUDAF
fills the gap by simulation. In the
combiner, HiveUDAF
invokes the
iterate
and terminatePartial
methods of the
Hive UDAF. In the reducer, HiveUDAF
invokes the Hive UDAF’s merge
and terminate
methods.
The other difference between Hive UDFs and Pig UDFs is in the
initialization stage. Hive uses the class
ObjectInspector
to communicate the input schema to a UDF on the frontend, while Pig uses
its Schema
class. Hive’s
ObjectInspector
carries a little more
information: whether an input field is a constant or not. Pig UDFs have
no way to tell and thus cannot pass this information to Hive UDFs. To
fill the gap, HiveUDF
provides an
optional second constructor argument carrying constant inputs. Here is
one example:
define
in_fileHiveUDF
(
'in_file'
,
'(null, \'symbol.txt\')'
);
divs=
load
'NYSE_dividends'
as
(
exchange:chararray
,
symbol:chararray
,
date:chararray
,
dividends:float
);
infile=
foreach
divsgenerate
symbol,in_file
(
symbol,'symbol.txt'
);
The second construct or argument, (null,
'symbol.txt')
, tells Pig the second argument for
in_file
is a string constant, 'symbol.txt'
.
For the first field, which is not a constant, we simply put
null
. Note that the single quotes need to be escaped.
HiveUDF
understands double quotes as
well, so you can use double quotes instead of escaping:
define
in_fileHiveUDF
(
'in_file'
,
'(null, "symbol.txt")'
);
In the foreach
statement, you need to put the
'symbol.txt'
string in the position
of the second input field as well. This is because some Hive UDFs use
the constant inferred from ObjectInspector
while other Hive
UDFs use the actual input data. It is assumed both are the same
value.
Pig also simulates the runtime environment for Hive UDFs. The configuration object and counters inside Hive UDFs function properly within Pig.
Not surprisingly, there is some overhead when using Hive UDFs in Pig. This is caused by converting input and output objects between Pig data types and Hive data types, simulations, extra method calls, runtime environment preparations, etc. You should expect approximately a 10–20% slowdown compared to implementing the same functionality in a Pig UDF. But the convenience of invoking existing functionality rather than reimplementing it in many cases outweighs the performance losses. Finally, it is worth mentioning that as a result of this feature, an interesting third-party library now available in Pig is Hivemall, which is a large set of Hive UDFs for machine learning.17 With it, you can use many popular machine learning algorithms in Pig.
1 Any loader that uses FileInputFormat
as its InputFormat
will support globs. Most
loaders that load data from HDFS use this InputFormat
.
2 A single function can be both a load and a store function, as
PigStorage
is.
3 You might object and say that Pig could figure out what is
intended here and do it, since SUM(A.y +
A.z)
could be decomposed to “foreach
record in A
, add y
and z
and then take the sum.” This is true.
But when we change the group
to
a cogroup
so that there are two
bags A
and B
involved (see “cogroup”) and change the sum to SUM(A.y + B.z)
, because neither A
nor B
guarantees any ordering, this is not a
well-defined operation. The rationale in designing the language
was that it was better to be consistent and always say that bags
could not be added rather than allowing it in some instances and
not others.
4 We will discuss the details of flatten
in “flatten”. Here all you need to know is that,
in this example flatten
generates three fields:
protocol
, host
, and path
.
5 This will change in the future: Pig will insert a real cast operator to cast the field to the declared type. More detail can be found in PIG-2315.
6 Thus, the keyword group
is overloaded in Pig
Latin. This is unfortunate and confusing, but would be hard to
change now.
7 As explained previously, we are using MapReduce as the backend throughout the book for examples, unless otherwise noted. The same notion applies to Tez or Spark, although it may not be called a “reduce phase.”
8 In our experience, the vast majority of data tracking human activity follows a power law distribution.
9 Actually, it reads all records but skips all but the first.
10 Actually, joins can be on any condition, not just equality, but Pig only supports joins on equality (called equi-joins). See “cross” for information on how to do non-equi-joins in Pig.
11 You may object that Pig could determine this by looking at other records in the join and inferring the correct number of fields. However, this does not work, for two reasons. First, when no schema is present, Pig does not enforce a semantic that every record has the same schema. So, assuming Pig can infer one record from another is not valid. Second, there might be no records in the join that match, and thus Pig might have no record to infer from.
12 Older versions of Pig ignore parallel
in local mode due to a Hadoop
bug. This bug was fixed in Pig 0.14. Note that parallel tasks are
run serially in local mode.
13 The difference between Jython UDFs and Python UDFs is that the former use Jython and the latter use CPython as the engine. In fact, Python is the only non-JVM language in the list. Python UDFs are usually slower since they run in a separate process and thus need to serialize the parameters in and deserialize the result out. However, since Jython does not support all Python features and libraries, Python provides an alternative.
14 The delimiter in pig.additional.jars
is a
colon. On Windows, the colon is also used in drive partitions. To
resolve this conflict, Pig 0.14 introduced
pig.additional.jars.comma
, which uses a comma as the
delimiter instead.
15 For int
, long
, float
, and double
, invoker methods can call Java
functions that take the scalar types but not the associated Java
classes (so int
but not Integer
,
etc.).
16 You may wonder what values other fields in the
foreach
will have for this additional output. Pig
attaches an artificial all null input to the foreach
before generating the outputs.
Thus, fields besides the HiveUDTF
field will be null (unless they have a constant or a UDF that
produces a nonnull result).
17 At the time of writing, Hivemall had just been accepted as an Apache incubator project.
Get Programming Pig, 2nd Edition now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.