This appendix gives a brief overview of Hadoop, focusing on elements that are of interest to Pig users. For a thorough discussion of Hadoop, see Hadoop: The Definitive Guide, by Tom White (O’Reilly). Hadoop’s two main components are MapReduce and HDFS.
MapReduce is the framework for running jobs in Hadoop. It provides a simple and powerful paradigm for parallelizing data processing.
The JobTracker is the central coordinator of jobs in MapReduce. It controls which jobs are being run, which resources they are assigned, etc. On each node in the cluster there is a TaskTracker that is responsible for running the map or reduce tasks assigned to it by the JobTracker.
MapReduce views its input as a collection of records. When reading from HDFS, a record is usually a single line of text. Each record has a key and a value. There is no requirement that data be sorted by key or that the keys must be unique. Similarly, MapReduce produces a set of records, each with a key and value.
MapReduce operates on data in jobs. Every job has one input and one output.[32] MapReduce breaks each job into a series of tasks. These tasks are of two primary types: map and reduce.
In the map phase, MapReduce gives the user an opportunity to operate on every record in the data set individually. This phase is commonly used to project out unwanted fields, transform fields, or apply filters. Certain types of joins and grouping can also be done in the map (e.g., joins where the data is already sorted or hash-based aggregation). There is no requirement that for every input record there should be one output record. Maps can choose to remove records or explode one record into multiple records.
Every MapReduce job specifies an
InputFormat
. This class is responsible for
determining how data is split across map tasks and for providing a
RecordReader
.
In order to specify how data is split across
tasks, an InputFormat
divides the input data into
a set of InputSplit
s. Each
InputSplit
is given to an individual map. In
addition to information on what to read, the
InputSplit
includes a list of nodes that should
be used to read the data. In this way, when the data resides on HDFS,
MapReduce is able to move the computation to the data.
The RecordReader
provided
by an InputFormat
reads input data and produces
key-value pairs to be passed into the map. This class controls how data
is decompressed (if necessary), and how it is converted to Java types that MapReduce can work with.
The combiner gives applications a chance to apply their reducer logic early on. As the map phase writes output, it is serialized and placed into an in-memory buffer. When this buffer fills, MapReduce will sort the buffer and then run the combiner if the application has provided an implementation for it. The resulting output is then written to local disk, to be picked up by the shuffle phase and sent to the reducers. MapReduce might choose not to run the combiner if it determines it will be more efficient not to.
After the shuffle, each reducer will have one input for each map. The reducer needs to merge these inputs in order to begin processing. It is not efficient to merge too many inputs simultaneously. Thus, if the number of inputs exceeds a certain value, the data will be merged and rewritten to disk before being given to the reducer. During this merge, the combiner will be applied in an attempt to reduce the size of the input data. See Hadoop’s documentation for a discussion of how and when this prereduce merge is triggered.
Because the combine phase will be run zero, one, or multiple times, the input and output keys and values of the combiner must be of the same type.
During the shuffle phase, MapReduce partitions data among the various reducers.
MapReduce uses a class called
Partitioner
to partition records to reducers
during the shuffle phase. An implementation of Partitioner
takes the key and value
of the record, as well as the total number of reduce tasks, and returns
the reduce task number that the record should go to. By default,
MapReduce uses HashPartitioner
, which calls
hashCode()
on the key and returns the result modulo of the number of
reduce tasks. MapReduce users can override this default to use their own
implementation of Partitioner
.
See
the Hadoop documentation for more details on Partitioner
s.
Data arriving on the reducer has been partitioned and sorted by the map, combine, and shuffle phases. By default, the data is sorted by the partition key. For example, if a user has a data set partitioned on user ID, in the reducer it will be sorted by user ID as well. Thus, MapReduce uses sorting to group like keys together. It is possible to specify additional sort keys beyond the partition key. So, for example, the user could choose to partition by user ID and also sort by timestamp. This feature is useful, as the user does not have to implement her own sorting on the reduce data.
The input to the reduce phase is each key from the shuffle plus all of the records associated with that key. Because all records with the same value for the key are now collected together, it is possible to do joins and aggregation operations such as counting. The MapReduce user explicitly controls parallelism in the reduce. MapReduce jobs that do not require a reduce phase can set the reduce count to zero. These are referred to as map-only jobs.
The reducer (or map in a map-only job) writes its output
via an OutputFormat
.
OutputFormat
is responsible for providing a
RecordWriter
, which takes the
key-value pairs produced by the task and stores them. This includes
serializing, possibly compressing, and writing them to HDFS, HBase, etc.
The OutputFormat
is also responsible for
providing the OutputCommitter
, which is used to
do post-output operations such as cleaning up after failure and
indicating to the storage medium that data is available (e.g., a
database commit).
Sometimes all or many of the tasks in a MapReduce job will need to access a single file or a set of files. For example, when joining a large file with a small file, one approach is to open the small file as a side file (that is, open it directly in your map task rather than specify it as an input to your MapReduce job), load it into memory, and do the join in the map phase. When thousands of map or reduce tasks attempt to open the same HDFS file simultaneously, this puts a large strain on the NameNode and the DataNodes storing that file. To avoid this situation, MapReduce provides the distributed cache. The distributed cache allows users to specify—as part of their MapReduce job—any HDFS files they want every task to have access to. These files are then copied onto the local disk of the task nodes as part of the task initiation. Map or reduce tasks can then read these as local files.
Part of the power of MapReduce is that it handles failure and retry for the user. If you have a MapReduce job that involves 10,000 map tasks (not an uncommon situation), the odds are reasonably high that at least one machine will fail during that job. Rather than trying to remove failure from the system, MapReduce is designed with the assumption that failure is common and must be coped with. When a given map or reduce task fails, MapReduce handles spawning a replacement task to do the work. Sometimes it does not even wait for tasks to fail. When a task is slow, it might spawn a duplicate to see if it can get the task done sooner. This is referred to as speculative execution. After a task fails a certain number of times (four by default), MapReduce gives up and declares the task and the job a failure.
The Hadoop Distributed File System (HDFS) stores files across all of the nodes in a Hadoop cluster. It handles breaking the files into large blocks and distributing them across different machines. It also makes multiple copies of each block so that if any one machine fails, no data is lost or unavailable. By default it makes three copies of each block, though this value is configurable. One copy is always written locally to the node where the write is executed. If your Hadoop cluster is spread across multiple racks, HDFS will write one copy of the block on the same rack as the machine where the write is happening, and one copy on a machine in a different rack. When a machine or disk dies or blocks are corrupted, HDFS will handle making another copy of the lost blocks to ensure that the proper number of replicas are maintained.
HDFS is designed specifically to support MapReduce. The block sizes are large, 64 MB by default. Many users set them higher, to 128 MB or even 256 MB. Storing data in large blocks works well for MapReduce’s batch model, where it is assumed that every job will read all of the records in a file. Modern disks are much faster at sequential read than seek. Thus for large data sets, if you require more than a few records, sequentially reading the entire data set outperforms random reads. The three-way duplication of data, beyond obviously providing fault tolerance, also serves MapReduce because it gives the JobTracker more options for locating map tasks on the same machine as one of the blocks.
HDFS presents a POSIX-like interface to users and provides standard filesystem features such as file ownership and permissions, security, and quotas.
The brain of HDFS is the NameNode. It is responsible for maintaining the master list of files in HDFS, and it handles the mapping of filenames to blocks, knowing where each block is stored, and making sure each block is replicated the appropriate number of times. DataNodes are machines that store HDFS data. They store each block in a separate file. Each DataNode is colocated with a TaskTracker to allow moving of the computation to data.
[32] It is possible to bend this rule, as Pig and many other applications do. For example, the one input can be a concatenation of multiple input files, and files can be opened on the side in tasks and written to or read from. But, conceptually, each job has one primary input and one primary output.
Get Programming Pig now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.