Pig Latin is a dataflow language. Unlike general-purpose programming
languages, it does not include control flow constructs such as
if
and for
. For many data-processing applications,
the operators Pig provides are sufficient. But there are classes of problems
that either require the data flow to be repeated an indefinite number of
times or need to branch based on the results of an operator. Iterative processing, where a calculation needs to be repeated
until the margin of error is within an acceptable limit, is one example. It
is not possible to know beforehand how many times the data flow will need to
be run before processing begins.
Blending data flow and control flow in one language is difficult to do in a way that is useful and intuitive. Building a general-purpose language and all the associated tools, such as IDEs and debuggers, is a considerable undertaking; also, there is no lack of such languages already. If we turned Pig Latin into a general-purpose language, it would require users to learn a much bigger language to process their data. For these reasons, we decided to embed Pig in existing scripting languages. This avoids the need to invent a new language while still providing users with the features they need to process their data.[21]
As with UDFs, we chose to use Python for the initial release of embedded Pig in version 0.9. The embedding interface is a Java class, so a Jython interpreter is used to run these Python scripts that embed Pig. This means Python 2.5 features can be used but Python 3 features cannot. In the future we hope to extend the system to other scripting languages that can access Java objects, such as JavaScript[22] and JRuby. Of course, since the Pig infrastructure is all in Java, it is possible to use this same interface to embed Pig into Java scripts.
This embedding is done in a JDBC-like style, where
your Python script first compiles a Pig Latin script, then binds variables
from Python to it, and finally runs it. It is also possible to do filesystem
operations, register JARs, and perform other utility operations through the
interface. The top-level class for this interface is
org.apache.pig.scripting.Pig
.
Throughout this chapter we will use an example of
calculating page rank from a web crawl. You can find this
example under examples/ch9 in the
example code. This code iterates over a set of URLs and links to produce a
page rank for each URL.[23] The input to this example is the webcrawl data set found in the examples. Each
record in this input contains a URL, a starting rank of 1
, and a bag with a tuple for each link found at
that URL:
http://pig.apache.org/privacypolicy.html 1 {(http://www.google.com/privacy.html)} http://www.google.com/privacypolicy.html 1 {(http://www.google.com/faq.html)} http://desktop.google.com/copyrights.html 1 {}
Even though control flow is done via a Python
script, it can still be run using Pig’s bin/pig script. bin/pig looks for the #!
line and
calls the appropriate interpreter. This allows you to use these scripts with
systems that expect to invoke a Pig Latin script. It also allows Pig to
include UDFs from this file automatically and to give correct line numbers
for error messages.
In order to use the Pig
class and
related objects, the code must first import them into the Python
script:
from org.apache.pig.scripting import *
Calling the static method Pig.compile
causes
Pig to do an initial compilation of the code. Because we have not bound
the variables yet, this check cannot completely verify the script. Type
checking and other semantic checking is not done at this phase—only the
syntax is checked. compile
returns a
Pig
object that can be bound to a set of
variables:
# pagerank.py P = Pig.compile(""" previous_pagerank = load '$docs_in' as (url:chararray, pagerank:float, links:{link:(url:chararray)}); outbound_pagerank = foreach previous_pagerank generate pagerank / COUNT(links) as pagerank, flatten(links) as to_url; cogrpd = cogroup outbound_pagerank by to_url, previous_pagerank by url; new_pagerank = foreach cogrpd generate group as url, (1 - $d) + $d * SUM (outbound_pagerank.pagerank) as pagerank, flatten(previous_pagerank.links) as links, flatten(previous_pagerank.pagerank) AS previous_pagerank; store new_pagerank into '$docs_out'; nonulls = filter new_pagerank by previous_pagerank is not null and pagerank is not null; pagerank_diff = foreach nonulls generate ABS (previous_pagerank - pagerank); grpall = group pagerank_diff all; max_diff = foreach grpall generate MAX (pagerank_diff); store max_diff into '$max_diff'; """)
The only pieces of this Pig Latin script that we
have not seen before are the four parameters, marked in the script as
$d
, $docs_in
, $docs_out
, and
$max_diff
. The syntax for these parameters is the same as for
parameter substitution. However, Pig expects these to be supplied by the
control flow script when bind
is called.
There are three other compilation methods in
addition to the one shown in this example.
compile
(String name, String
script)
takes a name in addition to the Pig Latin to be
compiled. This name can be used in other Pig Latin code blocks to import
this block:
P1 = Pig.compile("initial", """ A = load 'input'; ... """) P2 = Pig.compile(""" import initial; B = load 'more_input'; ... """)
There are two compilation methods called
compileFromFile
. These take the same arguments as
compile
, but they expect the script argument to refer to a
file containing the script, rather than the script itself.
Once your script has been compiled successfully, the next
step is to bind variables in the control flow to variables in Pig Latin.
In our example script this is done by providing a map to the
bind
call. The keys are the name of the variables in Pig
Latin. The values in the following example are literal string values that
are updated as the script progresses. They also could be references to
Python variables:
# pagerank.py params = { 'd': '0.5', 'docs_in': 'data/webcrawl' } for i in range(10): out = "out/pagerank_data_" + str(i + 1) max_diff = "out/max_diff_" + str(i + 1) params["docs_out"] = out params["max_diff"] = max_diff Pig.fs("rmr " + out) Pig.fs("rmr " + max_diff) bound = P.bind(params) stats = bound.runSingle() if not stats.isSuccessful(): raise 'failed' mdv = float(str(stats.result("max_diff").iterator().next().get(0))) print "max_diff_value = " + str(mdv) if mdv < 0.01: print "done at iteration " + str(i) break params["docs_in"] = out
For the initial run, the Pig Latin $d
will take on the value of 0.5
,
$docs_in
the filename webcrawl, $docs_out
out/pagerank_data_1, and $max_diff
out/max_diff_1.
bind
returns a
BoundScript
object. This object can be run,
explained, described, or illustrated. As is shown in this script, a single
Pig
object can be bound multiple times. A compile
is necessary only on the first pass, with different values being bound to
it each time.
In our example, bind
is given a
mapping of the variables to bind. If all of your Python variables and Pig
Latin variables have the same name, you can call bind
with no
arguments. This will cause bind
to look in the Python context
for variables of the same name as the parameters in Pig and use them. If
it cannot find appropriate variables, it will throw an error. We could
change our example script to look like this:
# pagerankbindnoarg.py d = 0.5 docs_in = 'data/webcrawl' for i in range(10): docs_out = "out/pagerank_data_" + str(i + 1) max_diff = "out/max_diff_" + str(i + 1) Pig.fs("rmr " + docs_out) Pig.fs("rmr " + max_diff) bound = P.bind() stats = bound.runSingle() if not stats.isSuccessful(): raise 'failed' mdv = float(str(stats.result("max_diff").iterator().next().get(0))) print "max_diff_value = " + str(mdv) if mdv < 0.01: print "done at iteration " + str(i) break docs_in = docs_out
Our example page rank script binds its compiled Pig Latin to different variables multiple times in order to iterate over the data. Each of these jobs is run separately, as is required by the iterative nature of calculating page rank. However, sometimes you want to run a set of jobs together; for example, consider calculating census data from countries all over the world. You want to run the same Pig Latin for each country, but you do not want to run them separately. There is no point in having a massively parallel system such as Hadoop if you are going to run jobs one at a time. You want to tell Pig to take your script and run it against input from all the countries at the same time.
There is a form of bind
that
provides this capability. Instead of taking a map of parameters, it
takes a list of maps of parameters. It still returns a single
BoundScript
object, but when run
is
called on this object, all of the separate instantiations of the script
will be run together:
#!/usr/bin/python from org.apache.pig.scripting import * pig = Pig.compile(""" input = load '$country' using CensusLoader(); ... store output into '$country_out'; """) params = [{'country': 'Afghanistan', 'country_out': 'af.out'}, ... {'country': 'Zimbabwe', 'country_out': 'zw.out'}] bound = pig.bind(params) stats = bound.run()
Once we have our BoundScript
object,
we can call runSingle
to run it. This tells Pig to run a
single Pig Latin script. This is appropriate when you have bound your
script to just one set of variables.
runSingle
returns a PigStats
object.
This object allows you to get your results and examine what happened in
your script, including status, error codes and messages if there was an
error, and statistics about the run itself. Table 9-1 summarizes the more important methods available for
PigStats
.
Table 9-1. PigStats methods
Method | Returns |
---|---|
result( | Given an alias, returns an
OutputStats object that describes the
output stored from that alias. You can get a results iterator from
OutputStats . |
isSuccessful() | Returns true if all went well, and false otherwise. |
getReturnCode() | Gets the return code from running Pig. See Table 2-1 for return code details. |
getErrorMessage() | Returns the error message if the run failed. This will try to pick the most relevant error message that was returned, most likely the last. |
getAllErrorMessages() | Returns a list of all of the error messages if the run failed. |
getOutputLocations() | Returns a list of location strings that were stored in the script. For example, if you wrote output to a file on HDFS, this will return the filename. |
getOutputNames() | Returns a list of aliases that were stored in the script. |
getRecordWritten() | Returns the total number of records written by the script. |
getBytesWritten() | Returns the total number of bytes written by the script. |
getNumberRecords( | Given an output location, returns the number of records written to that location. |
getNumberBytes( | Given an output location, returns the number of bytes written to that location. |
getDuration() | Wall clock time it took the script to run. |
getNumberJobs() | Number of MapReduce jobs run by this script. |
As seen in the example, the
OutputStats
object returned by
result()
can be used to get an iterator on the result set.
With this you can iterate through the tuples of your data, processing them
in your Python script. Standard Tuple
methods such
as get()
can be used to inspect the contents of each record.
See Interacting with Pig values for a discussion of working with
Tuple
s. Based on the results read in the iterator,
your Python script can decide whether to cease iteration and declare
success, raise an error, or continue with another iteration.
Warning
For this iterator to work, the store function you
use to store results from the alias must also be a
load function. Pig attempts to use the same class to load the results as
was used to store it. The default PigStorage
works well for
this.
If you bound your Pig
object to a
list of maps of parameters, rather than call runSingle
, you
should call run
. This will cause Pig to start a thread for
each binding and run it. All these jobs will be submitted to Hadoop at
the same time, making use of Hadoop’s parallelism. run
returns a list of PigStats
objects. The
PigStats
objects are guaranteed to be in the same
order in the list as in the maps of bound variables passed to
bind
. Thus the results of the first binding map are in the
first position of the PigStats
list, etc.
In addition to the compile
, bind
,
and run
methods presented so far, there are also utility
methods provided by Pig
and
BoundScript
.
Filesystem operations can be done by calling the static
method Pig.fs
. The string passed to it should be a
valid string for use in the Grunt shell (see Chapter 3).
The return code from running the shell command will be returned.
You can use register
,
define
, and set
in your compiled Pig Latin
statements as you do in nonembedded Pig Latin. However, you might wish to
register a JAR, define a function alias, or set a value that
you want to be effective through all your Pig Latin code blocks. In these
cases you can use the static methods of Pig
described in Table 9-2. The
register
s, define
s, and
set
s done by these methods will affect all Pig Latin
code compiled after they are called:
# register etc. will not affect this block. p1 = Pig.compile("...") Pig.registerJar("acme.jar") Pig.registerUDF("acme_python.py", "acme") Pig.define("d_to_e", "com.acme.financial.CurrencyConverter('dollar', 'euro'")) Pig.set("default_parallel", "100") # register etc. will affect p2 and p3 p2 = Pig.compile("...") p3 = Pig.compile("...")
Table 9-2. Pig utility methods
Once a script has been bound and a
BoundScript
returned, in addition to running the
script you can also call describe
, explain
, or
illustrate
. These do exactly what they would if they were in
a nonembedded Pig Latin script. However, they do not return the resulting
output to your script; instead, it is dumped to the standard out. (These
operators are intended for use in debugging rather than for returning data
directly to your script.)
[21] In some of the documentation, wiki pages, and issues on JIRA, embedded Pig is referred to as Turing Complete Pig. This was what the project was called when it first started, even though we did not make Pig itself Turing complete.
[22] There is already an experimental version of JavaScript in 0.9.
[23] The example code was graciously provided by Julien Le Dem.
Get Programming Pig now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.