Chapter 1. What Is Pig?
Pig provides an engine for executing data flows in parallel on Apache Hadoop.
It includes a language, Pig Latin, for expressing
these data flows. Pig Latin includes operators for many of the traditional
data operations (join
, sort
, filter
,
etc.), as well as providing the ability for users to develop their own
functions for reading, processing, and writing data.
Pig is an Apache open source project. This means users are free to download it as source or binary, use it for themselves, contribute to it, and—under the terms of the Apache License—use it in their products and change it as they see fit.
Pig Latin, a Parallel Data Flow Language
Pig Latin is a data flow language. This means it allows users to describe how data from one or more inputs should be read, processed, and then stored to one or more outputs in parallel. These data flows can be simple linear flows, or complex workflows that include points where multiple inputs are joined and where data is split into multiple streams to be processed by different operators. To be mathematically precise, a Pig Latin script describes a directed acyclic graph (DAG), where the edges are data flows and the nodes are operators that process the data.
This means that Pig Latin looks different from
many of the programming languages you may have seen. There are no
if
statements or for
loops in Pig Latin. This is
because traditional procedural and object-oriented programming languages
describe control flow, and data flow is a side effect of the program. Pig
Latin instead focuses on data flow. (For information on how to integrate
the data flow described by a Pig Latin script with control flow, see Chapter 8.)
Comparing Query and Data Flow Languages
After a cursory look, people often say that Pig Latin is a procedural version of SQL. Although there are certainly similarities, there are more differences. SQL is a query language. Its focus is to allow users to form queries. It lets users describe what question they want answered, but not how they want it answered. In Pig Latin, on the other hand, the user describes exactly how to process the input data.
Another major difference is that SQL is oriented around answering one question. When users want to do several data operations together, they must either write separate queries, storing the intermediate data into temporary tables, or use subqueries inside the query to do the earlier steps of the processing. However, many SQL users find subqueries confusing and difficult to form properly. Also, using subqueries creates an inside-out design where the first step in the data pipeline is the innermost query.
Pig, however, is designed with a long series of data operations in mind, so there is no need to write the data pipeline in an inverted set of subqueries or to worry about storing data in temporary tables. This is illustrated in Examples 1-1 and 1-2.
Consider a case where a user wants to group one table on a key and then join it with a second table. Because joins happen before grouping in a SQL query, this must be expressed either as a subquery or as two queries with the results stored in a temporary table. Example 1-1 will use a temporary table, as that is more readable.
Example 1-1. Group then join in SQL
CREATE TEMP TABLE t1AS
SELECT customer,sum
(
purchase)
AS
total_purchases FROM transactionsGROUP
BY
customer; SELECT customer, total_purchases, zipcode FROM t1, customer_profile WHERE t1.customer=
customer_profile.customer;
In Pig Latin, on the other hand, this looks like Example 1-2.1
Example 1-2. Group then join in Pig Latin
-- Load the transactions file, group it by customer, and sum their total purchases
txns=
load
'transactions'
as
(
customer, purchase);
grouped=
group
txnsby
customer; total=
foreach
groupedgenerate
group
,
SUM
(
txns.purchase)
as
tp;-- Load the customer_profile file
profile=
load
'customer_profile'
as
(
customer, zipcode);
-- Join the grouped and summed transactions and customer_profile data
answer=
join
totalby
group
,
profileby
customer;-- Write the results to the screen
dump
answer;
Furthermore, SQL and Pig were designed to live in different environments. SQL is designed for the RDBMS environment, where data is normalized and schemas and proper constraints are enforced (that is, there are no nulls in places they do not belong, etc.). Pig is designed for the Hadoop data-processing environment, where schemas are sometimes unknown or inconsistent. Data may not be properly constrained, and it is rarely normalized. As a result of these differences, Pig does not require data to be loaded into tables first. It can operate on data as soon as it is copied into HDFS.
An analogy with human languages and cultures might help. My wife and I (Alan) have been to France together a couple of times. I speak very little French. But because English is the language of commerce (and probably because Americans and the British like to vacation in France), there is enough English spoken in France for me to get by. My wife, on the other hand, speaks French. She has friends there to visit. She can talk to people we meet. She can explore the parts of France that are not on the common tourist itinerary. Her experience of France is much deeper than mine because she can speak the native language.
SQL is the English of data processing. It has the nice feature that everyone and every tool knows it, which means the barrier to adoption is very low. Our goal is to make Pig Latin the native language of parallel data-processing systems such as Hadoop. It may take some learning, but it will allow users to utilize the power of Hadoop much more fully.
Pig on Hadoop
Pig runs on Hadoop. It makes use of the Hadoop Distributed File System (HDFS) and Hadoop’s resource management system (YARN, as of Hadoop 2). HDFS is a distributed filesystem that stores files across all of the nodes in a Hadoop cluster. It handles breaking the files into large blocks and distributing them across different machines, including making multiple copies of each block so that if any one machine fails no data is lost. It presents a POSIX-like interface to users. By default, Pig reads input files from HDFS, uses HDFS to store intermediate data between MapReduce jobs, and writes its output to HDFS. As you will see in Chapter 10, it can also read input from and write output to sources other than HDFS.
YARN stands for “Yet Another Resource Negotiator.” It is the resource management layer of Hadoop 2. It consists of a per-cluster ResourceManager, a per-node NodeManager, and an API to write distributed applications that run on the cluster. While in Hadoop 1 the only available engine for executing operations was MapReduce, in Hadoop 2 the field has expanded. Other applications include Tez and Spark. Pig can currently run on MapReduce and Tez, with work ongoing to make it run on Spark.
MapReduce is a simple but powerful parallel data-processing paradigm. Every job in MapReduce consists of three main phases: map, shuffle, and reduce. In the map phase, the application has the opportunity to operate on each record in the input separately. Many maps are started at once, so that while the input may be gigabytes or terabytes in size, given enough machines the map phase can usually be completed in under one minute.
Part of the specification of a MapReduce job is the key on which data will be collected. For example, if you were processing web server logs for a website that required users to log in, you might choose the user ID to be your key so that you could see everything done by each user on your website. In the shuffle phase, which happens after the map phase, data is collected together by the key the user has chosen and distributed to different machines for the reduce phase. Every record for a given key will go to the same reducer.
In the reduce phase, the application is presented with each key, together with all of the records containing that key. Again this is done in parallel on many machines. After processing each group, the reducer can write its output. See the next section for a walkthrough of a simple MapReduce program.
Tez is an alternative to MapReduce for processing data on Hadoop. It executes a collection of connected tasks and moves data between those tasks. More formally, it executes directed acyclic graphs (DAGs) of tasks. Tez is more general and flexible than MapReduce. It provides better performance and lower latency, which will speed up most MapReduce applications. It has been adopted by high-level developer tools such as Pig, Hive, and Cascading as an internal execution engine. However, Tez is a low-level execution engine and is not meant to be used by end users directly. You can think of Tez as assembly language on Hadoop. You can get the best performance if you write your program in assembly language. However, how often do you actually do this? In most scenarios you write your program in a high-level language such as C++ or Java and let those tools compile your program into assembly language. The same applies to Tez and Pig. You write your script in Pig Latin, and Pig converts it into Tez tasks and executes it efficiently. The complexity of the Tez API is hidden inside Pig.
Since Tez is not user-facing, in the following discussion we will only focus on MapReduce and how it differs from Pig.
MapReduce’s “Hello World”
Consider a simple MapReduce application that counts the number of times each
word appears in a given text. This is the hello world
program of MapReduce. In this example the map phase will read each line in the text, one at a time.
It will then split out each word into a separate string, and, for each
word, it will output the word and a 1 to indicate it has seen the word
one time. The shuffle phase will use the word as the key, hashing the
records to reducers. The reduce phase will then sum up the number of times each
word was seen and write that together with the word as output. Let’s
consider the case of the nursery rhyme “Mary Had a Little Lamb.” Our
input will be:
Mary had a little lamb its fleece was white as snow and everywhere that Mary went the lamb was sure to go
Let’s assume that each line is sent to a different map task. In reality, each map is assigned much more data than this, but this simple example will be easier to follow. The data flow through MapReduce is shown in Figure 1-1.
Once the map phase is complete, the shuffle
phase will collect all records with the same word onto the same reducer.
For this example we assume that there are two reducers: all words that
start with A–L
are sent to the first
reducer, and words starting with M–Z
are sent to the second reducer. The reducers will then output the summed
counts for each word.
When Pig is running MapReduce as the execution engine, it compiles the Pig Latin scripts that users write into a series of one or more MapReduce jobs that it then executes. See Example 1-3 for a Pig Latin script that will do a word count of “Mary Had a Little Lamb.”
Example 1-3. Pig counts Mary and her lamb
-- Load input from the file named Mary, and call the single
-- field in the record 'line'.
input
=
load
'mary'
as
(
line);
-- TOKENIZE splits the line into a field for each word.
-- flatten will take the collection of records returned by
-- TOKENIZE and produce a separate record for each one, calling the single
-- field in the record word.
words=
foreach
input
generate
flatten
(
TOKENIZE
(
line))
as
word;-- Now group them together by each word.
grpd=
group
wordsby
word;-- Count them.
cntd=
foreach
grpdgenerate
group
,
COUNT
(
words);
-- Print out the results.
dump
cntd;
There is no need to be concerned with map, shuffle, and reduce phases when using Pig. It will manage decomposing the operators in your script into the appropriate MapReduce phases.
How Pig Differs from MapReduce
Earlier, we made the claim that a goal of the Pig team is to make Pig Latin the native language of parallel data-processing environments such as Hadoop. But does MapReduce really not provide enough? Why is Pig necessary?
Pig offers users several advantages over using
MapReduce directly. Pig Latin provides all of the standard
data-processing operations, such as join
,
filter
, group by
, order
by
, union
, etc. MapReduce provides the
group by
operation directly (in the shuffle and reduce phases), and it
provides the order by
operation indirectly through the way it implements the grouping.
Filtering and projection can be implemented trivially in the map phase.
But other operators—particularly join
—are not
provided and must instead be written by the user.
Pig furnishes some complex, nontrivial
implementations of these standard data operations. For example, because
the number of records per key in a dataset is rarely evenly distributed,
the data sent to the reducers is often skewed. That is, one reducer may
get 10 or more times the data as other reducers. Pig has join
and order
by
operators that will handle this case and (in some cases)
rebalance the reducers. But these took the Pig team months to write, and
rewriting them in MapReduce would be time consuming.
In MapReduce, the data processing inside the map and reduce phases is opaque to the system. This means that MapReduce has no opportunity to optimize or check the user’s code. Pig, on the other hand, can analyze a Pig Latin script and understand the data flow that the user is describing. That means it can do early error checking (did the user try to add a string field to an integer field?) and optimizations (can these two grouping operations be combined?).
MapReduce does not have a type system. This is intentional, and it gives users the flexibility to use their own data types and serialization frameworks. But the downside is that this further limits the system’s ability to check users’ code for errors both before and during runtime.
All of these points mean that Pig Latin is much lower-cost to write and maintain than Java code for MapReduce. Consider the following (very unscientific) experiment, where we wrote the same operation in Pig Latin and MapReduce. Given one file with user data and one with click data for a website, the Pig Latin script in Example 1-4 will find the five pages most visited by users between the ages of 18 and 25.
Example 1-4. Finding the top five URLs
Users=
load
'users'
as
(
name, age);
Fltrd=
filter
Usersby
age>=
18
and
age<=
25
;
Pages=
load
'pages'
as
(
user, url);
Jnd=
join
Fltrdby
name, Pagesby
user; Grpd=
group
Jndby
url; Smmd=
foreach
Grpdgenerate
group
,
COUNT
(
Jnd)
as
clicks; Srtd=
order
Smmdby
clicksdesc
;
Top5=
limit
Srtd5
;
store
Top5into
'top5sites'
;
The first line of this program loads the file
users and declares that this data
has two fields: name
and age
. It assigns the name of Users
to the input. The second line applies a
filter to Users
that passes through
records with an age
between 18
and 25
,
inclusive. All other records are discarded. Now the data has only
records of users in the age range we are interested in. The results of
this filter are named Fltrd
.
The second load
statement loads
pages and names it Pages
. It declares its schema to have two
fields, user
and url
.
The line Jnd = join
joins together
Fltrd
and Pages
using Fltrd.name
and Pages.user
as the key. After this join we have
found all the URLs each user has visited.
The line Grpd = group
collects
records together by URL, so for each value of url
, such as pignews.com/frontpage
, there will be one
record with a collection of all records that have that value in the
url
field. The next line then counts
how many records are collected together for each URL. So after this line
we now know, for each URL, how many times it was visited by users aged
18–25.
The next thing to do is to sort this from most
visits to least. The line Srtd = order
sorts on the count
value from the previous line and places it in desc
(descending) order. Thus, the largest value will be first. Finally, we
need only the top five pages, so the last line limits the sorted results
to only five records. The results of this are then stored back to HDFS
in the file top5sites.
In Pig Latin this comes to nine lines of code and took about 15 minutes to write and debug. The same code in MapReduce (omitted here for brevity) came out to about 170 lines of code and took four hours to get working. The Pig Latin will similarly be easier to maintain, as future developers can easily understand and modify this code.
There is, of course, a cost to all this. It is possible to develop algorithms in MapReduce that cannot be done easily in Pig. And the developer gives up a level of control. A good engineer can always, given enough time, write code that will outperform a generic system. So, for less common algorithms or extremely performance-sensitive ones, MapReduce is still the right choice. Basically, this is the same situation as choosing to code in Java versus a scripting language such as Python. Java has more power, but due to its lower-level nature, it requires more development time than scripting languages. Developers will need to choose the right tool for each job.
What Is Pig Useful For?
In our experience, Pig Latin use cases tend to fall into three separate categories: traditional extract-transform-load (ETL) data pipelines, research on raw data, and iterative processing.
The largest use case is data pipelines. A common example is web companies bringing in logs from their web servers, cleansing the data, and precomputing common aggregates before loading it into their data warehouses. In this case, the data is loaded onto the Hadoop cluster, and then Pig is used to clean out records from bots and records with corrupt data. It is also used to join web event data against user databases so that user cookies can be connected with known user information.
Another example of data pipelines is using Pig to build behavior prediction models. Pig is used to scan through all the user interactions with a website and split the users into various segments. Then, for each segment, a mathematical model is produced that predicts how members of that segment will respond to types of advertisements or news articles. In this way the website can show ads that are more likely to get clicked on, or offer news stories that are more likely to engage users and keep them coming back to the site.
Traditionally, ad hoc queries are done in languages such as SQL that make it easy to quickly form a question for the data to answer. However, for research on raw data, some users prefer Pig Latin. Because Pig can operate in situations where the schema is unknown, incomplete, or inconsistent, and because it can easily manage nested data, researchers who want to work on data before it has been cleaned and loaded into the warehouse often prefer Pig. Researchers who work with large datasets frequently use scripting languages such as Perl or Python to do their processing. Users with these backgrounds often prefer the data flow paradigm of Pig over the declarative query paradigm of SQL.
Users building iterative processing models are also starting to use Pig. Consider a news website that keeps a graph of all news stories on the Web that it is tracking. In this graph each news story is a node, and edges indicate relationships between the stories. For example, all stories about an upcoming election are linked together. Every five minutes a new set of stories comes in, and the data-processing engine must integrate them into the graph. Some of these stories are new, some are updates of existing stories, and some supersede existing stories. Some data-processing steps need to operate on this entire graph of stories. For example, a process that builds a behavioral targeting model needs to join user data against the entire graph of stories. Rerunning the entire join every five minutes is not feasible because it cannot be completed in five minutes with a reasonable amount of hardware. But the model builders do not want to update these models only on a daily basis, as that means an entire day of missed serving opportunities.
To cope with this problem, it is possible to first do a join against the entire graph on a regular basis—for example, daily. Then, as new data comes in every five minutes, a join can be done with just the new incoming data, and these results can be combined with the results of the join against the whole graph. This combination step takes some care, as the five-minute data contains the equivalent of inserts, updates, and deletes on the entire graph. It is possible and reasonably convenient to express this combination in Pig Latin.
One point that is implicit in everything we have said so far is that Pig (like MapReduce) is oriented around the batch processing of data. If you need to process gigabytes or terabytes of data, Pig is a good choice. But it expects to read all the records of a file and write all of its output sequentially. For workloads that require writing single or small groups of records, or looking up many different records in random order, Pig (like MapReduce) is not a good choice. See “NoSQL Databases” for a discussion of applications that are good for these use cases.
The Pig Philosophy
Early on, people who came to the Pig project as potential contributors did not always understand what the project was about. They were not sure how to best contribute or which contributions would be accepted and which would not. So, the Pig team produced a statement of the project’s philosophy that summarizes what Pig aspires to be:
- Pigs eat anything
Pig can operate on data whether it has metadata or not. It can operate on data that is relational, nested, or unstructured. And it can easily be extended to operate on data beyond files, including key/value stores, databases, etc.
- Pigs live anywhere
Pig is intended to be a language for parallel data processing. It is not tied to one particular parallel framework. It has been implemented first on Hadoop, but we do not intend that to be only on Hadoop.
- Pigs are domestic animals
Pig is designed to be easily controlled and modified by its users.
Pig allows integration of user code where ever possible, so it currently supports user defined field transformation functions, user defined aggregates, and user defined conditionals. These functions can be written in Java or in scripting languages that can compile down to Java (e.g. Jython). Pig supports user provided load and store functions. It supports external executables via its
stream
command and MapReduce JARs via itsmapreduce
command. It allows users to provide a custom partitioner for their jobs in some circumstances and to set the level of reduce parallelism for their jobs.Pig has an optimizer that rearranges some operations in Pig Latin scripts to give better performance, combines Map Reduce jobs together, etc. However, users can easily turn this optimizer off to prevent it from making changes that do not make sense in their situation.
- Pigs fly
Pig processes data quickly. We want to consistently improve performance, and not implement features in ways that weigh Pig down so it can’t fly.
Pig’s History
Pig started out as a research project in Yahoo! Research, where Yahoo! scientists
designed it and produced an initial implementation. As explained in a
paper presented at SIGMOD in 2008,2 the researchers felt that the MapReduce paradigm presented
by Hadoop is too low-level and rigid, and leads to a great deal of
custom user code that is hard to maintain and reuse.
At the same
time they observed that many MapReduce users were not comfortable with
declarative languages such as SQL. Thus, they set out to produce a
new language called Pig Latin that we have designed to fit in a sweet spot
between the declarative style of SQL, and the low-level, procedural style
of MapReduce.
Yahoo! Hadoop users started to adopt Pig, so a team of development engineers was assembled to take the research prototype and build it into a production-quality product. About this same time, in fall 2007, Pig was open sourced via the Apache Incubator. The first Pig release came a year later, in September 2008. Later that same year, Pig graduated from the Incubator and became a subproject of Apache Hadoop.
Early in 2009 other companies started to use Pig for their data processing. Amazon also added Pig as part of its Elastic MapReduce service. By the end of 2009 about half of Hadoop jobs at Yahoo! were Pig jobs. In 2010, Pig adoption continued to grow, and Pig graduated from a Hadoop subproject, becoming its own top-level Apache project.
1 Lines that start with --
are comments. See “Comments”.
2 Christopher Olston et al., “Pig Latin: A Not-So-Foreign Language for Data Processing,” available at http://portal.acm.org/citation.cfm?id=1376726.
Get Programming Pig, 2nd Edition now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.