Chapter 4. Introduction to Pig Latin

It is time to dig into Pig Latin. This chapter provides you with the basics of Pig Latin, enough to write your first useful scripts. More advanced features of Pig Latin are covered in Chapter 5.

Preliminary Matters

Pig Latin is a data flow language. Each processing step results in a new dataset, or relation. In input = load 'data', input is the name of the relation that results from loading the dataset data. A relation name is referred to as an alias. Relation names look like variables, but they are not. Once made, an assignment is permanent. It is possible to reuse relation names; for example, this is legitimate:

A = load 'NYSE_dividends' (exchange, symbol, date, dividends);
A = filter A by dividends > 0;
A = foreach A generate UPPER(symbol);

However, it is not recommended. It looks here as if you are reassigning A, but really you are creating new relations called A, and losing track of the old relations called A. Pig is smart enough to keep up, but it still is not a good practice. It leads to confusion when trying to read your programs (which A am I referring to?) and when reading error messages.

In addition to relation names, Pig Latin also has field names. They name a field (or column) in a relation. In the previous snippet of Pig Latin, dividends and symbol are examples of field names. These are somewhat like variables in that they will contain a different value for each record as it passes through the pipeline, but you cannot assign values to them.

Both relation and field names must start with an alphabetic character, and then they can have zero or more alphabetic, numeric, or _ (underscore) characters. All characters in the name must be ASCII.

Case Sensitivity

Keywords in Pig Latin are not case-sensitive; for example, LOAD is equivalent to load. But relation and field names are, so A = load 'foo'; is not equivalent to a = load 'foo';. UDF names are also case-sensitive; thus, COUNT is not the same UDF as count.

Comments

Pig Latin has two types of comment operators: SQL-style single-line comments (--) and Java-style multiline comments (/* */). For example:

A = load 'foo'; --this is a single-line comment
/*
 * This is a multiline comment.
 */
B = load /* a comment in the middle */'bar';

Input and Output

Before you can do anything interesting, you need to be able to add inputs and outputs to your data flows.

load

The first step in any data flow is to specify your input. In Pig Latin this is done with the load statement. By default, load looks for your data on HDFS in a tab-delimited file using the default load function PigStorage. divs = load '/data/examples/NYSE_dividends'; will look for a file called NYSE_dividends in the directory /data/examples. You can also specify relative pathnames. By default, your Pig jobs will run in your home directory on HDFS, /users/yourlogin. Unless you change directories, all relative paths will be evaluated from there. You can also specify a full URL for the path: for example, 'hdfs://nn.acme.com/data/examples/NYSE_dividends' to read the file from the HDFS instance that has nn.acme.com as a NameNode.

In practice, most of your data will not be in tab-separated text files. You also might be loading data from storage systems other than HDFS. Pig allows you to specify the function for loading your data with the using clause. For example, if you wanted to load your data from HBase, you would use the loader for HBase:

divs = load 'NYSE_dividends' using HBaseStorage();

If you do not specify a load function, the built-in function PigStorage will be used. You can also pass arguments to your load function via the using clause. For example, if you are reading comma-separated text data, PigStorage takes an argument to indicate which character to use as a separator:

divs = load 'NYSE_dividends' using PigStorage(',');

The load statement also can have an as clause, which allows you to specify the schema of the data you are loading (the syntax and semantics of declaring schemas in Pig Latin is discussed in “Schemas”):

divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);

When specifying a file to read from HDFS, you can specify a directory. In this case, Pig will find all files under the directory you specify and use them as input for that load statement. So, if you had a directory input with two datafiles today and yesterday under it, and you specified input as your file to load, Pig would read both today and yesterday as input. If the directory you specify contains other directories, files in those directories will be included as well.

PigStorage and TextLoader, the two built-in Pig load functions that operate on HDFS files, support globs.1 With globs, you can read multiple files that are not under the same directory or read some but not all files under a directory. Table 4-1 describes globs that are valid in Hadoop 0.20. Be aware that glob meaning is determined by HDFS, not Pig, so the globs that will work for you depend on your version of HDFS. Also, if you are issuing Pig Latin commands from a Unix shell command line, you will need to escape many of the glob characters to prevent your shell from expanding them.

Table 4-1. Globs in Hadoop 0.20
GlobMeaning
?Matches any single character.
*Matches zero or more characters.
[abc]Matches a single character from the character set (a,b,c).
[a-z]Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character.
[^abc]Matches a single character that is not in the character set (a,b,c). The ^ character must appear immediately to the right of the opening bracket.
[^a-z]Matches a single character that is not from the character range (a..z), inclusive. The ^ character must appear immediately to the right of the opening bracket.
\cRemoves (escapes) any special meaning of the character c.
{ab,cd}Matches a string from the string set {ab,cd}.

store

After you have finished processing your data, you will want to write it out somewhere. Pig provides the store statement for this purpose. In many ways it is the mirror image of the load statement. By default, Pig stores your data on HDFS in a tab-delimited file using PigStorage:2

store processed into '/data/examples/processed';

Pig will write the results of your processing into a directory processed in the directory /data/examples. You can specify relative pathnames as well as a full URL for the path, such as 'hdfs://nn.acme.com/data/examples/processed'.

If you do not specify a store function, PigStorage will be used. You can specify a different store function with a using clause:

store processed into 'processed' using
      HBaseStorage();

You can also pass arguments to your store function. For example, if you want to store your data as comma-separated text data, PigStorage takes an argument to indicate which character to use as a separator:

store processed into 'processed' using PigStorage(',');

As noted in “Running Pig”, when writing to a filesystem, processed will be a directory with part files rather than a single file. But how many part files will be created? That depends on the parallelism of the last job before the store. If it has reduces, it will be determined by the parallel level set for that job. See “parallel” for information on how this is determined. If it is a map-only job, it will be determined by the number of maps. See “Map Parallelism”.

dump

In most cases you will want to store your data somewhere when you are done processing it. But occasionally you will want to see it on the screen. This is particularly useful during debugging and prototyping sessions. It can also be useful for quick ad hoc jobs. dump directs the output of your script to your screen:

dump processed;

Unlike store, whose execution is deferred until the whole Pig script has been parsed, Pig will run the dump command immediately, without considering the Pig statements after it. It is possible that dump will generate a less optimal plan for the whole Pig script. Unless you are debugging, you should use store rather than dump.

dump outputs each record on a separate line, and fields are separated by commas. Nulls are indicated by missing values. If the output contains complex data, Pig will surround it with special marks. Maps are surrounded by [] (brackets), tuples by () (parentheses), and bags by {} (braces). Because each record in the output is a tuple, it is surrounded by ().

Be careful; you might be inundated with the outputs of dump if your dataset is large.

Relational Operations

Relational operators are the main tools Pig Latin provides to operate on your data. They allow you to transform it by sorting, grouping, joining, projecting, and filtering. This section covers the basic relational operators. More advanced features of these operators, as well as advanced relational operators, are covered in “Advanced Relational Operations”. What is covered here will be enough to get you started on programming in Pig Latin.

foreach

foreach takes a set of expressions and applies them to every record in the data pipeline—hence the name foreach. From these expressions it generates new records to send down the pipeline to the next operator. For those familiar with database terminology, it is Pig’s projection operator. For example, the following code loads an entire record, but then removes all but the user and id fields from the record:

A = load 'input' as (user:chararray, id:long, address:chararray, phone:chararray,
      preferences:map[]);
B = foreach A generate user, id;

Expressions in foreach

foreach supports a list of expressions. The simplest are constants and field references. The syntax for constants has already been discussed in “Types”. Field references can be by name (as shown in the preceding example) or by position. Positional references are preceded by a $ (dollar sign) and start from zero:

prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
             volume, adj_close);
gain   = foreach prices generate close - open;
gain2  = foreach prices generate $6 - $3;

Here, the relations gain and gain2 will contain the same values. Positional-style references are useful in situations where the schema is unknown or undeclared.

In addition to using names and positions, you can refer to all fields using * (an asterisk), which produces a tuple that contains all the fields. You can also refer to ranges of fields using .. (two periods). This is particularly useful when you have many fields and do not want to repeat them all in your foreach command:

prices    = load 'NYSE_daily' as (exchange, symbol, date, open,
                high, low, close, volume, adj_close);
beginning = foreach prices generate ..open; -- produces exchange, symbol, 
                                            -- date, open
middle    = foreach prices generate open..close; -- produces open, high, 
                                                 -- low, close
end       = foreach prices generate volume..; -- produces volume, adj_close

Standard arithmetic operators for integers and floating-point numbers are supported: + for addition, - for subtraction, * for multiplication, and / for division. These operators return values of their own type, so 5/2 is 2, whereas 5.0/2.0 is 2.5. In addition, for integers the modulo operator % is supported. The unary negative operator (-) is also supported for both integers and floating-point numbers. Pig Latin obeys the standard mathematical precedence rules. For information on what happens when arithmetic operators are applied across different types (for example, 5/2.0), see “Casts”.

Null values are viral for all arithmetic operators. That is, x + null = null for all values of x.

Pig also provides a binary condition operator, often referred to as bincond. It begins with a Boolean test, followed by a ?, then the value to return if the test is true, then a :, and finally the value to return if the test is false. If the test returns null, bincond returns null. Both value arguments of the bincond must return the same type:

2 == 2 ? 1 : 4 --returns 1 
2 == 3 ? 1 : 4 --returns 4 
null == 2 ? 1 : 4 -- returns null
2 == 2 ? 1 : 'fred' -- type error; both values must be of the same type

Note that when you use the bincond operator, you will need to put the entire bincond expression inside parentheses:

daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                date:chararray, open:float, high:float, low:float, close:float,
                volume:int, adj_close:float);
updown = foreach daily generate (close>open?'up':'down');

Pig will return a syntax error if you omit the parentheses.

To extract data from complex types, use the projection operators. For maps this is # (the pound sign or hash), followed by the name of the key as a string:

bball = load 'baseball' as (name:chararray, team:chararray,
          position:bag{t:(p:chararray)}, bat:map[]);
avg = foreach bball generate bat#'batting_average';

Keep in mind that the value associated with a key may be of any type. If you reference a key that does not exist in the map, the result is a null. Tuple projection is done with ., the dot operator. As with top-level records, the field can be referenced by name (if you have a schema for the tuple) or by position:

A = load 'input' as (t:tuple(x:int, y:int));
B = foreach A generate t.x, t.$1;

Referencing a field name that does not exist in the tuple will produce an error. Referencing a nonexistent positional field in the tuple will return null. Bag projection is not as straightforward as map and tuple projection. Bags do not guarantee that their tuples are stored in any order, so allowing a projection of the tuple inside the bag would not be meaningful. Instead, when you project fields in a bag, you are creating a new bag with only those fields:

A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.x;

This will produce a new bag whose tuples have only the field x in them. You can project multiple fields in a bag by surrounding the fields with parentheses and separating them by commas:

A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.(x, y);

This seemingly pedantic distinction that b.x is a bag and not a scalar value has consequences. Consider the following Pig Latin, which will not work:

A = load 'foo' as (x:chararray, y:int, z:int);
B = group A by x; -- produces bag A containing all records for a given value of x
C = foreach B generate SUM(A.y + A.z);

It is clear what the programmer is trying to do here. But because A.y and A.z are bags and the addition operator is not defined on bags, this will produce an error.3 The correct way to do this calculation in Pig Latin is:

A = load 'foo' as (x:chararray, y:int, z:int);
A1 = foreach A generate x, y + z as yz;
B = group A1 by x;
C = foreach B generate SUM(A1.yz);

UDFs in foreach

User-defined functions (UDFs) can be invoked in foreach statements. These are called evaluation functions, or eval funcs. Because they are part of a foreach statement, these UDFs take one record at a time and produce one output. Keep in mind that either the input or the output can be a bag, so this one record can contain a bag of records:

-- udf_in_foreach.pig
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
--make sure all strings are uppercase
upped = foreach divs generate UPPER(symbol) as symbol, dividends;
grpd  = group upped by symbol;   --output a bag upped for each value of symbol
--take a bag of integers, and produce one result for each group
sums  = foreach grpd generate group, SUM(upped.dividends);

In addition, eval funcs can take * as an argument, which passes the entire record to the function. They can also be invoked with no arguments at all.

For a complete list of UDFs that are provided with Pig, see Appendix A. For a discussion of how to invoke UDFs not distributed as part of Pig, see “User-Defined Functions”.

Generating complex data

It is sometimes useful in a foreach statement to build a complex field from simple fields. For example, you might want to combine two simple types together into a tuple. The same notation used for complex constants (see “Complex Types”) can be used to build complex types from scalar fields: [] for maps, () for tuples, and {} for bags. There are some differences though:

  • Instead of using # to separate the map key and value, use a comma:

    divs  = load 'NYSE_dividends' as
        (exchange:chararray, symbol, date, dividends:double);
    maps = foreach divs generate [exchange, dividends];
    describe maps;
    
    maps: {map[double]}

    The reason for using the comma as a delimiter is because # is also used in map projection. There could be ambiguity if it were used as the key/value delimiter. Consider the expression [a#b#c]. Pig has no idea if this means a map of key a#b and value c, or a map of key a and value b#c. With the comma delimiter, [a#b, c] creates a map of the key a#b and the value c. We don’t have such a problem in map constant.

  • You cannot create a single-field tuple—Pig will treat parentheses as an arithmetic operator:

    divs  = load 'NYSE_dividends' as
        (exchange:chararray, symbol, date, dividends:double);
    tuples = foreach divs generate (exchange), (exchange, dividends) as t;
    describe tuples;
    
    tuples: {exchange: chararray,t: (exchange: chararray,dividends: double)}

    (exchange) will not create a tuple in this example.

  • If the bag consists of single-item tuples, the parentheses around the item can be omitted:

    divs  = load 'NYSE_dividends' as
        (exchange:chararray, symbol:chararray, date, dividends);
    bags = foreach divs generate {exchange, symbol};
    describe bags;
    
    bags: 

You can also use the built-in UDFs TOMAP, TOTUPLE, and TOBAG to generate complex fields from simple fields.

Naming fields in foreach

The result of each foreach statement is a new tuple, usually with a different schema than the tuple that was input to foreach. Pig can almost always infer the data types of the fields in this schema from the foreach statement. But it cannot always infer the names of those fields. For fields that are simple projections with no other operators applied, Pig keeps the same names as before:

divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
           date:chararray, dividends:float);
sym  = foreach divs generate symbol;
describe sym;

sym: {symbol: chararray}

Once any expression beyond simple projection is applied, however, Pig does not assign a name to the field. If you do not explicitly assign a name, the field will be nameless and will be addressable only via a positional parameter; for example, $0. You can assign a name with the as clause:

divs     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
in_cents = foreach divs generate dividends * 100.0 as dividend, 
                dividends * 100.0; 
describe in_cents;

in_cents: {dividend: double,double}

The second field is unnamed since we didn’t assign a name to it.

After a flatten, which can produce multiple values, you can assign a name to each value:

crawl = load 'webcrawl' as (url, pageid);
extracted = foreach crawl generate flatten(REGEX_EXTRACT_ALL(url,
    '(http|https)://(.*?)/(.*)')) as (protocol,
    host, path);

Notice that in a foreach the as is attached to each expression. This is different from in load, where it is attached to the entire statement. The reason for this will become clear when we discuss flatten.4

In a few cases Pig cannot infer the data type of an expression in a foreach statement. Consider the following example:

extracted = foreach crawl generate flatten(REGEX_EXTRACT_ALL(url,
    '(http|https)://(.*?)/(.*)')) as (protocol:chararray, host:chararray, 
    path:chararray);

The UDF REGEX_EXTRACT_ALL only declares its return type as a tuple; it does not declare the types of the fields of that tuple. However, we know it will return a chararray. In this case you have the option to declare a type along with the name, as shown in this example. The syntax is the same as for declaring names and types in load. Note that until version 0.16, Pig does not insert a cast operator. It assumes you are correct in declaring the type. If you are not, this could result in an error.5

The following script will result in a compilation error:

in_cents = foreach divs generate (symbol, dividends)
    as (symbol:chararray, dividends:double);

On the left side, (symbol, dividends) will create a tuple. On the right side, (symbol:chararray, dividends:double) in the as statement declares two individual fields. They don’t match. We need to change it to:

in_cents = foreach divs generate (symbol, dividends)
    as (t:(symbol:chararray, dividends:double));

This produces a tuple schema:

in_cents: {t: (symbol: chararray,dividends: double)}

CASE expressions

Starting with Pig 0.12, you can use CASE for multicondition branches. (If you are using Pig 0.11 or earlier, a semantically equivalent chain of bincond operators can be constructed. CASE was added because it is much simpler.) Here’s an example:

processed = FOREACH loaded GENERATE (
  CASE i
     WHEN 0 THEN 'one'
     WHEN 1 THEN 'two'
     ELSE 'three'
  END
);

Depending on the value of i, foreach will generate a chararray value of 'one', 'two', or 'three'.

Both the conditions (in this example 0 and 1) and the condition values (in this example 'one', etc.) can be expressions. If no value matches the condition and ELSE is missing, the result will be null. This syntax cannot be used to test for null values, since null == null is null, not true. In the following example, if gender is 'M' the result is 1, if gender is 'F' the result is 2, and if gender is something else, such as 'other', the result is null:

processed = FOREACH loaded GENERATE (
  CASE UPPER(gender)
     WHEN UPPER('m') THEN 1
     WHEN UPPER('f') THEN 2
  END
) as gendercode;

filter

The filter statement allows you to select which records will be retained in your data pipeline. A filter contains a predicate. If that predicate evaluates to true for a given record, that record will be passed down the pipeline. Otherwise, it will not.

Predicates can contain the equality operators you expect, including == to test equality and !=, >, >=, <, and <=. These comparators can be used on any scalar data type. == and != can be applied to maps and tuples. To use these with two tuples, both tuples must have either the same schema or no schema. None of the equality operators can be applied to bags.

Starting with Pig 0.12, you can test whether the value of a scalar field is within a set of values using the IN operator:

divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
    date:chararray, dividends:float);
cme_ctb_cht = filter divs by symbol in ('CME', 'CTB', 'CHT');

Pig Latin follows the operator precedence rules that are standard in most programming languages, where arithmetic operators have precedence over equality operators. So, x + y == a + b is equivalent to (x + y) == (a + b).

For chararrays, users can test to see whether the chararray matches a regular expression:

-- filter_matches.pig
divs         = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                  date:chararray, dividends:float);
startswithcm = filter divs by symbol matches 'CM.*';
Note

Pig uses Java’s regular expression format. This format requires the entire chararray to match, not just a portion as in Perl-style regular expressions. For example, if you are looking for all fields that contain the string fred, you must say '.*fred.*' and not 'fred'. The latter will match only the chararray fred.

You can find chararrays that do not match a regular expression by preceding the test with not:

-- filter_not_matches.pig
divs            = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:float);
notstartswithcm = filter divs by not symbol matches 'CM.*';

You can combine multiple predicates into one by using the Boolean operators and and or, and you can reverse the outcome of any predicate by using the Boolean not operator. As is standard, the precedence of Boolean operators, from highest to lowest, is not, and, or. Thus, a and b or not c is equivalent to (a and b) or (not c).

Pig will short-circuit Boolean operations when possible. If the first (left) predicate of an and is false, the second (right) predicate will not be evaluated. So, in 1 == 2 and udf(x), the UDF will never be invoked. Similarly, if the first predicate of an or is true, the second predicate will not be evaluated. 1 == 1 or udf(x) will never invoke the UDF.

For Boolean operators, nulls follow the SQL ternary logic. Thus, x == null results in a value of null, not true (even when x is null also) or false. Filters pass through only those values that are true. Consider the following example:

A = load 'input.txt' as (x:int, y:int);
B1 = filter A by x == 2;
C1 = store B1 into 'output1';
B2 = filter A by x != 2;
C2 = store B2 into 'output2';

If the input file had three rows, (2,1), (null,1), (4,1), C1 would store (2,1), C2 would store (4,1), and (null,1) would appear in neither C1 nor C2. The way to look for null values is to use the is null operator, which returns true whenever the value is null. To find values that are not null, use is not null.

Likewise, null neither matches nor fails to match any regular expression value.

Just as there are UDFs to be used in evaluation expressions, there are UDFs specifically for filtering records, called filter funcs. These are eval funcs that return a Boolean value and can be invoked in the filter statement. Filter funcs can also be used in foreach statements.

group

The group statement collects together records with the same key. It is the first operator we have looked at that shares its syntax with SQL, but it is important to understand that the grouping operator in Pig Latin is fundamentally different from the one in SQL. In SQL the group by clause creates a group that must feed directly into one or more aggregate functions. In Pig Latin there is no direct connection between group and aggregate functions. Instead, group does exactly what it says: collects all records with the same value for the provided key together into a bag. You can then pass this to an aggregate function if you want, or do other things with it:

-- count.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily by stock;
cnt   = foreach grpd generate group, COUNT(daily);

That example groups records by the key stock and then counts them. It is just as legitimate to group them and then store them for processing at a later time:

-- group.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily by stock;
store grpd into 'by_group';

The records coming out of the group by statement have two fields: the key and the bag of collected records. The key field is named group.6 The bag is named for the alias that was grouped, so in the previous examples it will be named daily and have the same schema as the relation daily. If the relation daily has no schema, the bag daily will have no schema. For each record in the group, the entire record (including the key) is in the bag. Changing the last line of the previous script from store grpd... to describe grpd; will produce:

grpd: {group: bytearray,daily: {exchange: bytearray,stock: bytearray}}

You can also group on multiple keys, but the keys must be surrounded by parentheses. The resulting records still have two fields. In this case, the group field is a tuple with a field for each key:

--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd  = group daily by (exchange, stock);
avg   = foreach grpd generate group, AVG(daily.dividends);
describe grpd;

grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
stock: bytearray,date: bytearray,dividends: bytearray}}

You can also use all to group together all of the records in your pipeline:

--countall.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily all;
cnt   = foreach grpd generate COUNT(daily);

The record coming out of group all has the chararray literal all as a key. Usually this does not matter because you will pass the bag directly to an aggregate function such as COUNT. But if you plan to store the record or use it for another purpose, you might want to project out the artificial key first.

group is the first operator we have looked at that usually will force a reduce phase.7 Grouping means collecting all records where the key has the same value. If the pipeline is in a map phase, this will force it to shuffle and then reduce. If the pipeline is already in a reduce phase, this will force it to pass through the map, shuffle, and reduce phases.

Because grouping collects all records with the same value for the key, you often get skewed results. That is, just because you have specified that your job will have 100 reducers, there is no reason to expect that the number of values per key will be distributed evenly. The values might have a Gaussian or power law distribution.8 For example, suppose you have an index of web pages and you group by the base URL. Certain values, such as yahoo.com, are going to have far more entries than most, which means that some reducers will get far more data than others. Because your MapReduce job is not finished (and any subsequent ones cannot start) until all your reducers have finished, this skew will significantly slow your processing. In some cases it will also be impossible for one reducer to manage that much data.

Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop’s combiner. This does not remove all skew, but it places a bound on it. And because for most jobs the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.

Unfortunately, not all calculations can be done using the combiner. Calculations that can be decomposed into any number of steps, such as sum, are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user’s actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing the user’s interaction with the site.

Pig’s operators and built-in UDFs use the combiner whenever possible, because of its skew-reducing features and because early aggregation greatly reduces the amount of data shipped over the network and written to disk, thus speeding performance significantly. UDFs can indicate when they can work with the combiner by implementing the Algebraic interface. For information on how to make your UDFs use the combiner, see “The Algebraic Interface”.

For information on how to determine the level of parallelism when executing your group operation, see “parallel”. Also, keep in mind that when using group all, you are necessarily serializing your pipeline. That is, this step and any steps after it until you split out the single bag now containing all of your records will not be done in parallel.

Finally, group handles nulls in the same way that SQL handles them: by collecting all records with a null key into the same group. Note that this is in direct contradiction to the way expressions handle nulls (remember that neither null == null nor null != null is true) and to the way join (see “join”) handles nulls.

order by

The order statement sorts your data for you, producing a total order for your output data. Total order means that not only is the data sorted in each partition of your data, but it is also guaranteed that all records in partition n are less than all records in partition n - 1 for all n. When your data is stored on HDFS, where each partition is a part file, this means that cat will output your data in order.

The syntax of order is similar to group. You indicate a key or set of keys by which you wish to order your data. One glaring difference is that there are no parentheses around the keys when multiple keys are indicated in order:

--order.pig
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
bydate  = order daily by date;

--order2key.pig
daily          = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                    date:chararray, open:float, high:float, low:float,
                    close:float, volume:int, adj_close:float);
bydatensymbol  = order daily by date, symbol;

It is also possible to reverse the order of the sort by appending desc to a key in the sort. In order statements with multiple keys, desc applies only to the key it immediately follows. Other keys will still be sorted in ascending order:

--orderdesc.pig
daily    = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
byclose  = order daily by close desc, open;
dump byclose; -- open still sorted in ascending order

Data is sorted based on the types of the indicated fields: numeric values are sorted numerically; chararray fields are sorted lexically; and bytearray fields are sorted lexically, using byte values rather than character values. Sorting by map, tuple, or bag fields produces errors. For all data types, nulls are taken to be smaller than all possible values for that type, and thus will always appear first (or last when desc is used).

As discussed in the previous section, skew of the values in data is very common. This affects order just as it does group, causing some reducers to take significantly longer than others. To address this, Pig balances the output across reducers. It does this by first sampling the input of the order statement to get an estimate of the key distribution. Based on this sample, it then builds a partitioner that produces a balanced total order. For example, suppose you are ordering on a chararray field with the values a, b, e, e, e, e, e, e, m, q, r, z, and you have three reducers. The partitioner in this case would decide to partition your data such that values ae go to reducer 1, e goes to reducer 2, and mz go to reducer 3. Notice that the value e can be sent to either reducer 1 or 2. Some records with key e will be sent to reducer 1 and some to 2. This allows the partitioner to distribute the data evenly. In practice, we rarely see variance in reducer time exceed 10% when using this algorithm.

An important side effect of the way Pig distributes records to minimize skew is that it breaks the MapReduce convention that all instances of a given key are sent to the same partition. If you have other processing that depends on this convention, do not use Pig’s order statement to sort data for it.

order always causes your data pipeline to go through a reduce phase. This is necessary to collect all equal records together. Also, Pig adds an additional MapReduce job to your pipeline to do the sampling. Because this sampling is very lightweight (it reads only the first record of every block9), it generally takes less than 5% of the total job time.

distinct

The distinct statement is very simple. It removes duplicate records. It works only on entire records, not on individual fields:

--distinct.pig
-- Find a distinct list of ticker symbols for each exchange.
-- This load will truncate the records, picking up just the first two fields.
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray);
uniq    = distinct daily;

Because it needs to collect like records together in order to determine whether they are duplicates, distinct forces a reduce phase. It does make use of the combiner to remove any duplicate records it can delete in the map phase.

The use of distinct shown here is equivalent to select distinct x in SQL. To learn how to do the equivalent of select count(distinct x), see “Nested foreach”.

join

join is one of the workhorses of data processing, and it is likely to be in many of your Pig Latin scripts. join selects records from one input to put together with records from another input. This is done by indicating keys for each input. When those keys are equal,10 the two rows are joined. Records for which no match is found are dropped:

--join.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd   = join daily by symbol, divs by symbol;

You can also join on multiple keys. In all cases you must have the same number of keys, and they must be of the same or compatible types (where compatible means that an implicit cast can be inserted; see “Casts”):

-- join2key.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd   = join daily by (symbol, date), divs by (symbol, date);

Like foreach, join preserves the names of the fields of the inputs passed to it. It also prepends the name of the relation the field came from, followed by ::. Adding describe jnd; to the end of the previous example produces:

jnd: {daily::exchange: bytearray,daily::symbol: bytearray,daily::date: bytearray,
daily::open: bytearray,daily::high: bytearray,daily::low: bytearray,
daily::close: bytearray,daily::volume: bytearray,daily::adj_close: bytearray,
divs::exchange: bytearray,divs::symbol: bytearray,divs::date: bytearray,
divs::dividends: bytearray}

The daily:: prefix needs to be used only when the field name is no longer unique in the record. In this example, you will need to use daily::date or divs::date if you wish to refer to one of the date fields after the join, but fields such as open and divs do not need a prefix because there is no ambiguity.

Pig also supports outer joins. In outer joins, records that do not have a match on the other side are included, with null values being filled in for the missing fields. Outer joins can be left, right, or full. A left outer join means records from the left side will be included even when they do not have a match on the right side. Likewise, a right outer join means records from the right side will be included even when they do not have a match on the left side. A full outer join means records from both sides are taken even when they do not have matches. Here’s an example of a left outer join:

--leftjoin.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd   = join daily by (symbol, date) left outer, divs by (symbol, date);

outer is optional and can be omitted. Unlike in some SQL implementations, however, full is not optional. C = join A by x outer, B by u; will generate a syntax error, not a full outer join.

Outer joins are supported only when Pig knows the schema of the data on the side(s) for which it might need to fill in nulls. Thus, for left outer joins, it must know the schema of the right side; for right outer joins, it must know the schema of the left side; and for full outer joins, it must know both. This is because, without the schema, Pig will not know how many null values to fill in.11

As in SQL, null values for keys do not match anything—even null values from the other input. So, for inner joins, all records with null key values are dropped. For outer joins, they will be retained but will not match any records from the other input.

Pig can also do multiple joins in a single operation, as long as they are all being joined on the same key(s). This can be done only for inner joins:

A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;

Self joins are supported, though the data must be loaded twice:

--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
divs2     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
jnd       = join divs1 by symbol, divs2 by symbol;
increased = filter jnd by divs1::date < divs2::date and
                divs1::dividends < divs2::dividends;

If the preceding code were changed to the following, it would fail:

--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
jnd       = join divs1 by symbol, divs1 by symbol;
increased = filter jnd by divs1::date < divs2::date and
                divs1::dividends < divs2::dividends;

It seems like this ought to work, since Pig could split the divs1 dataset and send it to join twice. But the problem is that field names would be ambiguous after the join, so the load statement must be written twice. The next best thing would be for Pig to figure out that these two load statements are loading the same input and then run the load only once, but it does not do that currently.

Pig does these joins in MapReduce by using the map phase to annotate each record with which input it came from. It then uses the join key as the shuffle key. Thus, join forces a new reduce phase. Once all of the records with the same value for the key are collected together, Pig does a cross product between the records from both inputs. To minimize memory usage, it has MapReduce order the records coming into the reducer using the input annotation it added in the map phase. So, all of the records for the left input arrive first, and Pig caches these in memory. All of the records for the right input then arrive. As each of these records arrives, it is crossed with each record from the left side to produce an output record. In a multiway join, the left n - 1 inputs are held in memory, and the nth is streamed through. It is important to keep this in mind when writing joins in your Pig queries. If you know that one of your inputs has more records per value of the chosen key, placing that larger input on the right side of your join will lower memory usage and possibly increase your script’s performance.

limit

Sometimes you want to see only a limited number of results. limit allows you do this:

--limit.pig
divs    = load 'NYSE_dividends';
first10 = limit divs 10;

The example here will return at most 10 lines (if your input has less than 10 lines total, it will return them all). Note that for all operators except order, Pig does not guarantee the order in which records are produced. Thus, because NYSE_dividends has more than 10 records, the example script could return different results every time it’s run. Putting an order immediately before the limit will guarantee that the same results are returned every time.

limit causes an additional reduce phase, since it needs to collect the records together to count how many it is returning. It does optimize this phase by limiting the output of each map and then applying the limit again in the reducer. In the case where limit is combined with order, the two are done together on the map and the reduce. That is, on the map side, the records are sorted by MapReduce and the limit is applied in the combiner. They are then sorted again by MapReduce as part of the shuffle, and Pig applies the limit again in the reducer.

Whenever possible, Pig terminates reading of the input early once it has reached the number of records specified by limit. This can significantly reduce the time it takes to run the script.

sample

sample offers a simple way to get a sample of your data. It reads through all of your data but returns only a percentage of rows. What percentage it returns is expressed as a double value, between 0 and 1. So, in the following example, 0.1 indicates 10%:

--sample.pig
divs = load 'NYSE_dividends';
some = sample divs 0.1;

Currently the sampling algorithm is very simple. The sample A 0.1 is rewritten to filter A by random() <= 0.1. Obviously this is nondeterministic, so results of a script with sample will vary with every run. Also, the percentage will not be an exact match, but close. There has been discussion about adding more sophisticated sampling techniques, but it has not been done yet.

parallel

One of Pig’s core claims is that it provides a language for parallel data processing. One of the tenets of Pig’s philosophy is that Pigs are domestic animals (see “The Pig Philosophy”), so Pig prefers that you tell it how parallel to be. To this end, it provides the parallel clause.

The parallel clause can be attached to any relational operator in Pig Latin. However, it controls only reduce-side parallelism, so it makes sense only for operators that force a reduce phase. These are: group*, order, distinct, join*, limit, cogroup*, cross, and union. Operators marked with an asterisk have multiple implementations, some of which force a reduce and some of which do not. union forces a reduce only in Tez. For details on this and on operators not covered in this chapter, see Chapter 5. parallel also works in local modes, though it does not make Pig scripts run faster than a single reduce as reduces run serially.12

Let’s take a look at an example:

--parallel.pig
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily by symbol parallel 10;

In this example, parallel will cause the MapReduce job spawned by Pig to have 10 reducers. If you list the files inside output, you will see 10 part files.

However, the group all statement aggregates all inputs, and it is impossible to use more than one reducer. No matter how many reduce tasks you specify in the parallel clause, Pig will only use one:

daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily all parallel 10;

You can also see this by listing the files in output.

parallel clauses apply only to the statements to which they are attached; they do not carry through the script. So, if this group were followed by an order, parallel would need to be set for that order separately. Most likely the group will reduce your data size significantly and you will want to change the parallelism:

--parallel.pig
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily by symbol parallel 10;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc parallel 2;

If, however, you do not want to set parallel separately for every reduce-invoking operator in your script, you can set a script-wide value using the set command. In this script, all MapReduce jobs will be done with 10 reducers:

--defaultparallel.pig
set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

When you set a default_parallel level, you can still add a parallel clause to any statement to override the default value. Thus, you can set a default value as a base to use in most cases and specifically add a parallel clause only when you have an operator that needs a different value.

All of this is rather static, however. What happens if you run the same script across different inputs that have different characteristics? Or what if your input data varies significantly sometimes? You do not want to have to edit your script each time. Using parameter substitution, you can write your parallel clauses with variables, providing values for those variables at runtime. See “Parameter Substitution” for details.

So far, we have assumed that you know what your parallel value should be. See “Select the Right Level of Parallelism” for information on how to determine that.

Finally, what happens if you do not specify a parallel level? Pig will do a gross estimate of what the parallelism should be set to. It looks at the initial input size, assumes there will be no data size changes, and then allocates a reducer for every 1 GB of data by default. This can be customized by setting pig.exec.reducers.bytes.per.reducer. The maximum number of reducers to be used in the estimation is defined in pig.exec.reducers.max, which defaults to 999. It must be emphasized that this is not a good algorithm. It is provided only to prevent mistakes that result in scripts running very slowly and, in some extreme cases, mistakes that cause MapReduce itself to have problems. This is a safety net, not an optimizer.

User-Defined Functions

Much of the power of Pig lies in its ability to let users combine its operators with their own or others’ code via UDFs. Pig UDFs can be written in many programming languages, including Java, Jython, JavaScript, Ruby, Groovy (since Pig 0.11), and Python13 (since Pig 0.12).

Pig itself comes packaged with a large number of built-in UDFs. For a complete list and descriptions of the built-in UDFs, see “Built-in UDFs”.

PiggyBank is a collection of user-contributed UDFs that is packaged and released along with Pig. PiggyBank UDFs are not included in the Pig JAR, and thus you have to register them manually in your scripts. See “PiggyBank” for more information.

There are also third-party Pig UDF libraries, notably DataFu from LinkedIn and Elephant Bird from Twitter.

Of course, you can also write your own UDFs or use those written by other users. For details of how to write your own, see Chapter 9. Finally, you can use some static Java functions as UDFs as well.

Registering Java UDFs

When you use a UDF that is not already built into Pig, Pig needs to be able to find the Java JAR that contains the UDF. If the JAR is in your Java classpath, then Pig will automatically locate and load the JAR. If not, you have to tell Pig where to look for that UDF. This is done via the register command. For example, let’s say you want to use the Reverse UDF provided in PiggyBank (for information on where to find the PiggyBank JAR, see “PiggyBank”):

--register.pig
register 'your_path_to_piggybank/piggybank.jar';
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate
                org.apache.pig.piggybank.evaluation.string.Reverse(symbol);

This example tells Pig that it needs to include code from your_path_to_piggybank/piggybank.jar.

Note

If your UDF requires additional JARs beyond the one it is contained in (e.g., third-party libraries), Pig may ship those automatically if the UDF supports “auto-shipping” (see “Shipping JARs Automatically” and “Shipping JARs Automatically”). With auto-shipping, the UDF will tell Pig about the dependent JARs it requires. However, if your UDF implementation is old or does not support the new auto-shipping features, you will have to register those JARs manually even if they are present in your Java classpath. Pig cannot do the dependency analysis to determine what additional JARs a UDF requires.

In this example, we have to give Pig the full package and class name of the UDF. This verbosity can be alleviated in two ways. The first option is to use the define command (see “define and UDFs”). The second option is to include a set of packages on the command line for Pig to search when looking for UDFs. So, if instead of invoking Pig with pig register.pig we change our invocation to this:

pig -Dudf.import.list=org.apache.pig.piggybank.evaluation.string register.pig

We can change our script to:

register 'your_path_to_piggybank/piggybank.jar';
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate Reverse(symbol);

Using yet another property, we can get rid of the register command as well. If we add -Dpig.additional.jars=/usr/local/pig/piggybank/piggybank.jar to our command line, this command is no longer necessary.14

You may wonder if you should add your JARs to the classpath as well. The quick answer is no. When you register the JARs, Pig also changes its own class loader to include them, so it will find the JARs in the frontend even though the classpath does not include them. However, if you do not register your JARs and you are expecting Pig to ship JARs automatically (using the auto-shipping feature discussed in the previous note), you need to add them in your classpath. Otherwise, Pig won’t know how to find them.

In many cases it is better to deal with registration and definition issues explicitly in the script via the register and define commands than to use these properties. Otherwise, everyone who runs your script has to know how to configure the command line. However, in some situations your scripts will always use the same set of JARs and always look in the same places for them. For instance, you might have a set of JARs used by everyone in your company. In this case, placing these properties in a shared properties file and using that with your Pig Latin scripts will make sharing those UDFs easier and ensure that everyone is using the correct versions of them.

The register command can also take an HDFS path as well as other protocols that Hadoop understands, such as S3. You could say register 'hdfs:///user/jar/acme.jar'; or register 's3://mybucket/jar/acme.jar'; if your JARs are stored in HDFS or S3.

register accepts globs too, so if all of the JARs you need were stored in one directory, you could include them all with register '/usr/local/share/pig/udfs/*.jar'.

Registering UDFs in Scripting Languages

register is also used to locate resources for scripting UDFs that you use in your Pig Latin scripts. In this case you do not register a JAR, but rather a script that contains your UDF. The script must be referenced from your current directory. Using the examples provided in the example code, copying udfs/python/production.py to the data directory looks like this:

--batting_production.pig
register 'production.py' using jython as bballudfs;
players  = load 'baseball' as (name:chararray, team:chararray,
                pos:bag{t:(p:chararray)}, bat:map[]);
nonnull  = filter players by bat#'slugging_percentage' is not null and
                bat#'on_base_percentage' is not null;
calcprod = foreach nonnull generate name, bballudfs.production(
                (float)bat#'slugging_percentage',
                (float)bat#'on_base_percentage');

The important differences here are the using jython and as bballudfs portions of the register statement. using jython tells Pig that this UDF is written in Jython, not Java, and it should use Jython to compile the UDF.

as bballudfs defines a namespace that UDFs from this file are placed in. All UDFs from this file must now be invoked as bballudfs.udfname. Each script file you load should be given a separate namespace. This avoids naming collisions when you register two scripts with duplicate function names.

You can register UDFs written in JavaScript, JRuby, Groovy, and Python using similar syntax:

JavaScript

register 'production.js' using javascript as bballudfs;

JRuby

register 'production.rb' using jruby as bballudfs;

Groovy

register 'production.groovy' using groovy as bballudfs;

Python

register 'production.py' using streaming_python as bballudfs;

define and UDFs

As was alluded to earlier, define can be used to provide an alias so that you do not have to use full package names for your Java UDFs. It can also be used to provide constructor arguments to your UDFs. define is used in defining streaming commands too, but this section covers only its UDF-related features. For information on using define with streaming, see “stream”.

The following provides an example of using define to provide an alias for org.apache.pig.piggybank.evaluation.string.Reverse:

--define.pig
register 'your_path_to_piggybank/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);

Eval and filter functions can also take one or more strings as constructor arguments. If you are using a UDF that takes constructor arguments, define is the place to provide those arguments. For example, consider a method CurrencyConverter that takes two constructor arguments, the first indicating which currency you are converting from and the second which currency you are converting to:

--define_constructor_args.pig
register 'acme.jar';
define convert com.acme.financial.CurrencyConverter('dollar', 'euro');
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate convert(dividends);

Calling Static Java Functions

Java has a rich collection of utilities and libraries. Because Pig is implemented in Java, some of these functions can be exposed to Pig users. Starting in version 0.8, Pig offers invoker methods that allow you to treat certain static Java functions as if they were Pig UDFs.

Any public static Java function that takes either no arguments or some combination of int, long, float, double, String, or arrays thereof15 and returns an int, long, float, double, or String value can be invoked in this way.

Because Pig Latin does not support overloading on return types, there is an invoker for each return type: InvokeForInt, InvokeForLong, InvokeForFloat, InvokeForDouble, and InvokeForString. You must pick the appropriate invoker for the type you wish to return. These methods take two constructor arguments. The first is the full package, class name, and method name. The second is a space-separated list of parameters the Java function expects. Only the types of the parameters are given. If the parameter is an array, [] (square brackets) are appended to the type name. If the method takes no parameters, the second constructor argument is omitted.

For example, if you wanted to use Java’s Integer class to translate decimal values to hexadecimal values, you could do:

--invoker.pig
define hex InvokeForString('java.lang.Integer.toHexString', 'int');
divs  = load 'NYSE_daily' as (exchange, symbol, date, open, high, low,
            close, volume, adj_close);
nonnull = filter divs by volume is not null;
inhex = foreach nonnull generate symbol, hex((int)volume);

If your method takes an array of types, Pig will expect to pass it a bag where each tuple has a single field of that type. So, if you had a Java method called com.yourcompany.Stats.stdev that took an array of doubles, you could use it like this:

define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]');
A = load 'input' as (id: int, dp:double);
B = group A by id;
C = foreach B generate group, stdev(A.dp);
Warning

Invokers do not use the Accumulator or Algebraic interfaces, and are thus likely to be much slower and to use much more memory than UDFs written specifically for Pig. This means that before you pass an array argument to an invoker method, you should think carefully about whether those inefficiencies are acceptable. For more information on these interfaces, see “The Accumulator Interface” and “The Algebraic Interface”.

Invoking Java functions in this way does have a small cost because reflection is used to find and invoke the methods.

Invoker functions throw a Java IllegalArgumentException when they are passed null input. You should place a filter before the invocation to prevent this.

Calling Hive UDFs

Pig 0.15 added support for calling Apache Hive UDFs inside Pig.

There are three types of UDFs in Hive: standard UDFs, UDTFs (user-defined table-generating functions), and UDAFs (user-defined aggregate functions). Hive’s standard UDFs are similar to Pig’s standard UDFs, which take one input and produce one output at a time.

A Hive standard UDF extends either org.apache.hadoop.hive.ql.exec.UDF or org.apache.hadoop.hive.ql.udf.generic.GenericUDF. Hive’s UDTFs, on the other hand, are able to produce any number of outputs per input. Hive UDTFs extend org.apache.hadoop.hive.ql.udf.generic.GenericUDTF. Hive’s UDAFs are similar to Pig’s algebraic UDFs in the sense that both aggregate the inputs and produce one output. Hive UDAFs extend either org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver or org.apache.hadoop.hive.ql.exec.UDAF . For more information about Hive UDFs, please refer to the Hive UDF documentation.

Hive UDFs are accessed via corresponding built-in Pig UDFs. For a standard Hive UDF, invoke HiveUDF, for a Hive UDTF invoke HiveUDTF, and for a Hive UDAF invoke HiveUDAF.

All three types of UDF share the same syntax. Here is one example of invoking a standard Hive UDF:

define lower HiveUDF('org.apache.hadoop.hive.ql.udf.generic.StringLower');
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:float);
lowersymbol = foreach divs generate exchange, lower(symbol), date, dividends;

You need to pass the name of the Hive UDF as a constructor argument to HiveUDF. The name you pass can be the fully qualified class name, or the abbreviation defined in the Hive FunctionRegistry for built-in Hive UDFs. In the preceding example, org.apache.hadoop.hive.ql.udf.generic.StringLower is a Hive built-in UDF and has the abbreviation lower, so we can rewrite the define statement as follows:

define lower HiveUDF('lower');

Here is an example using a Hive UDTF:

define posexplode HiveUDTF('posexplode');
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:float);
grouped = group divs by symbol;
exploded = foreach grouped generate flatten(posexplode(divs));

For every input, HiveUDTF creates a bag of the outputs. There is also one additional output that includes the outputs generated in the close method of the Hive UDTF. This is because the Hive UDTF may choose to generate an additional output when it is informed that it has seen all its input.16 You will need to flatten the bag to get the same flattened outputs as Hive.

Here is an example with HiveUDAF:

define variance HiveUDAF('variance');
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:double);
grouped = group divs by symbol;
aggregated = foreach grouped generate variance(divs.dividends);

HiveUDAF is implemented as an algebraic UDF. Algebraic Pig UDFs aggregate their input both in the combiner and in the reducer. However, the way Hive aggregates inputs is very different from how Pig does it. Hive does not use the combiner; rather, it handles aggregation itself in the map task. HiveUDAF fills the gap by simulation. In the combiner, HiveUDAF invokes the iterate and terminatePartial methods of the Hive UDAF. In the reducer, HiveUDAF invokes the Hive UDAF’s merge and terminate methods.

The other difference between Hive UDFs and Pig UDFs is in the initialization stage. Hive uses the class ObjectInspector to communicate the input schema to a UDF on the frontend, while Pig uses its Schema class. Hive’s ObjectInspector carries a little more information: whether an input field is a constant or not. Pig UDFs have no way to tell and thus cannot pass this information to Hive UDFs. To fill the gap, HiveUDF provides an optional second constructor argument carrying constant inputs. Here is one example:

define in_file HiveUDF('in_file', '(null, \'symbol.txt\')');
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:float);
infile = foreach divs generate symbol, in_file(symbol, 'symbol.txt');

The second construct or argument, (null, 'symbol.txt'), tells Pig the second argument for in_file is a string constant, 'symbol.txt'. For the first field, which is not a constant, we simply put null. Note that the single quotes need to be escaped. HiveUDF understands double quotes as well, so you can use double quotes instead of escaping:

define in_file HiveUDF('in_file', '(null, "symbol.txt")');

In the foreach statement, you need to put the 'symbol.txt' string in the position of the second input field as well. This is because some Hive UDFs use the constant inferred from ObjectInspector while other Hive UDFs use the actual input data. It is assumed both are the same value.

Pig also simulates the runtime environment for Hive UDFs. The configuration object and counters inside Hive UDFs function properly within Pig.

Not surprisingly, there is some overhead when using Hive UDFs in Pig. This is caused by converting input and output objects between Pig data types and Hive data types, simulations, extra method calls, runtime environment preparations, etc. You should expect approximately a 10–20% slowdown compared to implementing the same functionality in a Pig UDF. But the convenience of invoking existing functionality rather than reimplementing it in many cases outweighs the performance losses. Finally, it is worth mentioning that as a result of this feature, an interesting third-party library now available in Pig is Hivemall, which is a large set of Hive UDFs for machine learning.17 With it, you can use many popular machine learning algorithms in Pig.

1 Any loader that uses FileInputFormat as its InputFormat will support globs. Most loaders that load data from HDFS use this InputFormat.

2 A single function can be both a load and a store function, as PigStorage is.

3 You might object and say that Pig could figure out what is intended here and do it, since SUM(A.y + A.z) could be decomposed to “foreach record in A, add y and z and then take the sum.” This is true. But when we change the group to a cogroup so that there are two bags A and B involved (see “cogroup”) and change the sum to SUM(A.y + B.z), because neither A nor B guarantees any ordering, this is not a well-defined operation. The rationale in designing the language was that it was better to be consistent and always say that bags could not be added rather than allowing it in some instances and not others.

4 We will discuss the details of flatten in “flatten”. Here all you need to know is that, in this example flatten generates three fields: protocol, host, and path.

5 This will change in the future: Pig will insert a real cast operator to cast the field to the declared type. More detail can be found in PIG-2315.

6 Thus, the keyword group is overloaded in Pig Latin. This is unfortunate and confusing, but would be hard to change now.

7 As explained previously, we are using MapReduce as the backend throughout the book for examples, unless otherwise noted. The same notion applies to Tez or Spark, although it may not be called a “reduce phase.”

8 In our experience, the vast majority of data tracking human activity follows a power law distribution.

9 Actually, it reads all records but skips all but the first.

10 Actually, joins can be on any condition, not just equality, but Pig only supports joins on equality (called equi-joins). See “cross” for information on how to do non-equi-joins in Pig.

11 You may object that Pig could determine this by looking at other records in the join and inferring the correct number of fields. However, this does not work, for two reasons. First, when no schema is present, Pig does not enforce a semantic that every record has the same schema. So, assuming Pig can infer one record from another is not valid. Second, there might be no records in the join that match, and thus Pig might have no record to infer from.

12 Older versions of Pig ignore parallel in local mode due to a Hadoop bug. This bug was fixed in Pig 0.14. Note that parallel tasks are run serially in local mode.

13 The difference between Jython UDFs and Python UDFs is that the former use Jython and the latter use CPython as the engine. In fact, Python is the only non-JVM language in the list. Python UDFs are usually slower since they run in a separate process and thus need to serialize the parameters in and deserialize the result out. However, since Jython does not support all Python features and libraries, Python provides an alternative.

14 The delimiter in pig.additional.jars is a colon. On Windows, the colon is also used in drive partitions. To resolve this conflict, Pig 0.14 introduced pig.additional.jars.comma, which uses a comma as the delimiter instead.

15 For int, long, float, and double, invoker methods can call Java functions that take the scalar types but not the associated Java classes (so int but not Integer, etc.).

16 You may wonder what values other fields in the foreach will have for this additional output. Pig attaches an artificial all null input to the foreach before generating the outputs. Thus, fields besides the HiveUDTF field will be null (unless they have a constant or a UDF that produces a nonnull result).

17 At the time of writing, Hivemall had just been accepted as an Apache incubator project.

Get Programming Pig, 2nd Edition now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.