Chapter 4. Client API: Advanced Features
Now that you understand the basic client API, we will discuss the advanced features that HBase offers to clients.
Filters
HBase filters are a powerful feature that can greatly enhance your effectiveness when working with data stored in tables. You will find predefined filters, already provided by HBase for your use, as well as a framework you can use to implement your own. You will now be introduced to both.
Introduction to Filters
The two prominent read functions for HBase are get()
and scan()
, both supporting either direct access
to data or the use of a start and end key, respectively. You can limit
the data retrieved by progressively adding more limiting selectors to
the query. These include column families, column qualifiers, timestamps
or ranges, as well as version number.
While this gives you control over what is
included, it is missing more fine-grained features, such as selection of
keys, or values, based on regular expressions. Both classes support
filters for exactly these reasons: what cannot be
solved with the provided API functionality to filter row or column keys,
or values, can be achieved with filters. The base interface is aptly named Filter
, and there is a list of concrete
classes supplied by HBase that you can use without doing any
programming.
You can, on the other hand, extend the
Filter
classes to implement your own
requirements. All the filters are actually applied on the server side,
also called predicate pushdown. This ensures the
most efficient selection of the data that needs to be transported back
to the client. You could implement most of the filter functionality in
your client code as well, but you would have to transfer much more
data—something you need to avoid at scale.
Figure 4-1 shows how the filters are configured on the client, then serialized over the network, and then applied on the server.
The filter hierarchy
The lowest level in the filter hierarchy is
the Filter
interface, and the
abstract FilterBase
class that
implements an empty shell, or skeleton, that is used by the actual
filter classes to avoid having the same boilerplate code in each of
them.
Most concrete filter classes are direct descendants of
FilterBase
, but a few use another,
intermediate ancestor class. They all work the same way: you define a new instance of the filter you want to apply
and hand it to the Get
or Scan
instances, using:
setFilter(filter)
While you initialize the filter instance
itself, you often have to supply parameters for whatever the filter is
designed for. There is a special subset of filters, based on Compare
Filter
, that
ask you for at least two specific parameters, since they are used by
the base class to perform its task. You will learn about the two
parameter types next so that you can use them in context.
Note
Filters have access to the entire row they are applied to. This means that they can decide the fate of a row based on any available information. This includes the row key, column qualifiers, actual value of a column, timestamps, and so on.
When referring to values, or comparisons, as we will discuss shortly, this can be applied to any of these details. Specific filter implementations are available that consider only one of those criteria each.
Comparison operators
As CompareFilter
-based
filters add one more feature to the base FilterBase
class, namely the
compare()
operation, it has to have a user-supplied
operator type that defines how the result of the comparison is
interpreted. The values are listed in Table 4-1.
The comparison operators define what is included, or excluded, when the filter is applied. This allows you to select the data that you want as either a range, subset, or exact and single match.
Comparators
The second type that you need to provide to CompareFilter
-related classes is a
comparator, which is needed to compare various
values and keys in different ways. They are derived from WritableByteArrayComparable
, which
implements Writable
, and Comparable
. You do not have to go
into the details if you just want to use an implementation provided by
HBase and listed in Table 4-2. The
constructors usually take the control value, that is, the one to
compare each table value against.
Note
The last three comparators listed in Table 4-2—the BitComparator
, RegexStringComparator
, and SubstringComparator
—only
work with the EQUAL
and NOT_EQUAL
operators, as the compareTo()
of these comparators returns
0
for a match or 1
when there is no match. Using them in a
LESS
or GREATER
comparison will yield erroneous
results.
Each of the comparators usually has a
constructor that takes the comparison value. In other words, you need
to define a value you compare each cell against. Some of these
constructors take a byte[]
, a byte
array, to do the binary comparison, for example, while others take a
String
parameter—since the data
point compared against is assumed to be some sort of readable
text. Example 4-1 shows some of these in
action.
Comparison Filters
The first type of supplied filter implementations are the
comparison filters. They take the comparison operator and comparator
instance as described earlier. The constructor of each of them has the
same signature, inherited from CompareFilter
:
CompareFilter(CompareOp valueCompareOp, WritableByteArrayComparable valueComparator)
You need to supply this comparison operator and comparison class for the filters to do their work. Next you will see the actual filters implementing a specific comparison.
Note
Please keep in mind that the general contract of the HBase filter API means you are filtering out information—filtered data is omitted from the results returned to the client. The filter is not specifying what you want to have, but rather what you do not want to have returned when reading data.
In contrast, all filters based on CompareFilter
are doing the
opposite, in that they include the matching
values. In other words, be careful when choosing the comparison
operator, as it makes the difference in regard to what the server
returns. For example, instead of using LESS
to skip some information, you may need
to use GREATER_OR_EQUAL
to include
the desired data points.
RowFilter
This filter gives you the ability to filter data based on row keys.
Example 4-1 shows how the filter can use different comparator instances to get the desired results. It also uses various operators to include the row keys, while omitting others. Feel free to modify the code, changing the operators to see the possible results.
Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0")); Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22"))); scan.setFilter(filter1); ResultScanner scanner1 = table.getScanner(scan); for (Result res : scanner1) { System.out.println(res); } scanner1.close(); Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*-.5")); scan.setFilter(filter2); ResultScanner scanner2 = table.getScanner(scan); for (Result res : scanner2) { System.out.println(res); } scanner2.close(); Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("-5")); scan.setFilter(filter3); ResultScanner scanner3 = table.getScanner(scan); for (Result res : scanner3) { System.out.println(res); } scanner3.close();
Here is the full printout of the example on the console:
Adding rows to table... Scanning table #1... keyvalues={row-1/colfam1:col-0/1301043190260/Put/vlen=7} keyvalues={row-10/colfam1:col-0/1301043190908/Put/vlen=8} keyvalues={row-100/colfam1:col-0/1301043195275/Put/vlen=9} keyvalues={row-11/colfam1:col-0/1301043190982/Put/vlen=8} keyvalues={row-12/colfam1:col-0/1301043191040/Put/vlen=8} keyvalues={row-13/colfam1:col-0/1301043191172/Put/vlen=8} keyvalues={row-14/colfam1:col-0/1301043191318/Put/vlen=8} keyvalues={row-15/colfam1:col-0/1301043191429/Put/vlen=8} keyvalues={row-16/colfam1:col-0/1301043191509/Put/vlen=8} keyvalues={row-17/colfam1:col-0/1301043191593/Put/vlen=8} keyvalues={row-18/colfam1:col-0/1301043191673/Put/vlen=8} keyvalues={row-19/colfam1:col-0/1301043191771/Put/vlen=8} keyvalues={row-2/colfam1:col-0/1301043190346/Put/vlen=7} keyvalues={row-20/colfam1:col-0/1301043191841/Put/vlen=8} keyvalues={row-21/colfam1:col-0/1301043191933/Put/vlen=8} keyvalues={row-22/colfam1:col-0/1301043191998/Put/vlen=8} Scanning table #2... keyvalues={row-15/colfam1:col-0/1301043191429/Put/vlen=8} keyvalues={row-25/colfam1:col-0/1301043192140/Put/vlen=8} keyvalues={row-35/colfam1:col-0/1301043192665/Put/vlen=8} keyvalues={row-45/colfam1:col-0/1301043193138/Put/vlen=8} keyvalues={row-55/colfam1:col-0/1301043193729/Put/vlen=8} keyvalues={row-65/colfam1:col-0/1301043194092/Put/vlen=8} keyvalues={row-75/colfam1:col-0/1301043194457/Put/vlen=8} keyvalues={row-85/colfam1:col-0/1301043194806/Put/vlen=8} keyvalues={row-95/colfam1:col-0/1301043195121/Put/vlen=8} Scanning table #3... keyvalues={row-5/colfam1:col-0/1301043190562/Put/vlen=7} keyvalues={row-50/colfam1:col-0/1301043193332/Put/vlen=8} keyvalues={row-51/colfam1:col-0/1301043193514/Put/vlen=8} keyvalues={row-52/colfam1:col-0/1301043193603/Put/vlen=8} keyvalues={row-53/colfam1:col-0/1301043193654/Put/vlen=8} keyvalues={row-54/colfam1:col-0/1301043193696/Put/vlen=8} keyvalues={row-55/colfam1:col-0/1301043193729/Put/vlen=8} keyvalues={row-56/colfam1:col-0/1301043193766/Put/vlen=8} keyvalues={row-57/colfam1:col-0/1301043193802/Put/vlen=8} keyvalues={row-58/colfam1:col-0/1301043193842/Put/vlen=8} keyvalues={row-59/colfam1:col-0/1301043193889/Put/vlen=8}
You can see how the first filter did an exact match on the row key, including all of those rows that have a key, equal to or less than the given one. Note once again the lexicographical sorting and comparison, and how it filters the row keys.
The second filter does a regular expression match, while the third uses a substring match approach. The results show that the filters work as advertised.
FamilyFilter
This filter works very similar to the RowFilter
, but applies the comparison to the
column families available in a row—as opposed to the row key. Using
the available combinations of operators and comparators you can filter
what is included in the retrieved data on a column family level. Example 4-2 shows how to use this.
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3"))); Scan scan = new Scan(); scan.setFilter(filter1); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println(result); } scanner.close(); Get get1 = new Get(Bytes.toBytes("row-5")); get1.setFilter(filter1); Result result1 = table.get(get1); System.out.println("Result of get(): " + result1); Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("colfam3"))); Get get2 = new Get(Bytes.toBytes("row-5")); get2.addFamily(Bytes.toBytes("colfam1")); get2.setFilter(filter2); Result result2 = table.get(get2); System.out.println("Result of get(): " + result2);
The output—reformatted and abbreviated for the sake of readability—shows the filter in action. The input data has four column families, with two columns each, and 10 rows in total.
Adding rows to table... Scanning table... keyvalues={row-1/colfam1:col-0/1303721790522/Put/vlen=7, row-1/colfam1:col-1/1303721790574/Put/vlen=7, row-1/colfam2:col-0/1303721790522/Put/vlen=7, row-1/colfam2:col-1/1303721790574/Put/vlen=7} keyvalues={row-10/colfam1:col-0/1303721790785/Put/vlen=8, row-10/colfam1:col-1/1303721790792/Put/vlen=8, row-10/colfam2:col-0/1303721790785/Put/vlen=8, row-10/colfam2:col-1/1303721790792/Put/vlen=8} ... keyvalues={row-9/colfam1:col-0/1303721790778/Put/vlen=7, row-9/colfam1:col-1/1303721790781/Put/vlen=7, row-9/colfam2:col-0/1303721790778/Put/vlen=7, row-9/colfam2:col-1/1303721790781/Put/vlen=7} Result of get(): keyvalues={row-5/colfam1:col-0/1303721790652/Put/vlen=7, row-5/colfam1:col-1/1303721790664/Put/vlen=7, row-5/colfam2:col-0/1303721790652/Put/vlen=7, row-5/colfam2:col-1/1303721790664/Put/vlen=7} Result of get(): keyvalues=NONE
The last get()
shows that you can (inadvertently)
create an empty set by applying a filter for exactly one column
family, while specifying a different column family selector using addFamily()
.
QualifierFilter
Example 4-3 shows how the same logic is applied on the column qualifier level. This allows you to filter specific columns from the table.
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2"))); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println(result); } scanner.close(); Get get = new Get(Bytes.toBytes("row-5")); get.setFilter(filter); Result result = table.get(get); System.out.println("Result of get(): " + result);
ValueFilter
This filter makes it possible to include only columns that
have a specific value. Combined with the RegexStringComparator
, for example, this can
filter using powerful expression syntax. Example 4-4 showcases this feature. Note, though,
that with certain comparators—as explained earlier—you can only employ
a subset of the operators. Here a substring match is performed and
this must be combined with an EQUAL
, or NOT_EQUAL
, operator.
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4")); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner.close(); Get get = new Get(Bytes.toBytes("row-5")); get.setFilter(filter); Result result = table.get(get); for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); }
DependentColumnFilter
Here you have a more complex filter that does not simply filter out data based on directly available information. Rather, it lets you specify a dependent column—or reference column—that controls how other columns are filtered. It uses the timestamp of the reference column and includes all other columns that have the same timestamp. Here are the constructors provided:
DependentColumnFilter(byte[] family, byte[] qualifier) DependentColumnFilter(byte[] family, byte[] qualifier, boolean dropDependentColumn) DependentColumnFilter(byte[] family, byte[] qualifier, boolean dropDependentColumn, CompareOp valueCompareOp, WritableByteArrayComparable valueComparator)
Since it is based on CompareFilter
, it also offers you to further
select columns, but for this
filter it does so based on their values. Think of it as a combination
of a Value
Filter
and a filter selecting on a
reference timestamp. You can optionally hand in your own operator and
comparator pair to enable this feature. The class provides
constructors, though, that let you omit the operator and comparator
and disable the value filtering, including all columns by default,
that is, performing the timestamp filter based on the reference column
only.
Example 4-5 shows the filter in use. You
can see how the optional values can be handed in as well. The dropDependentColumn
parameter is giving you
additional control over how the reference column is handled: it is
either included or dropped by the filter, setting this parameter to
false
or true
, respectively.
private static void filter(boolean drop, CompareFilter.CompareOp operator, WritableByteArrayComparable comparator) throws IOException { Filter filter; if (comparator != null) { filter = new DependentColumnFilter(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"), drop, operator, comparator); } else { filter = new DependentColumnFilter(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"), drop); } Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner.close(); Get get = new Get(Bytes.toBytes("row-5")); get.setFilter(filter); Result result = table.get(get); for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } public static void main(String[] args) throws IOException { filter(true, CompareFilter.CompareOp.NO_OP, null); filter(false, CompareFilter.CompareOp.NO_OP, null); filter(true, CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("val-5"))); filter(false, CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("val-5"))); filter(true, CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*\\.5")); filter(false, CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*\\.5")); }
Warning
This filter is not
compatible with the batch feature of the scan operations, that is,
setting Scan.setBatch()
to a
number larger than zero. The filter needs to see the entire row to
do its work, and using batching will not carry the reference column
timestamp over and would result in erroneous results.
If you try to enable the batch mode nevertheless, you will get an error:
Exception org.apache.hadoop.hbase.filter.IncompatibleFilterException: Cannot set batch on a scan using a filter that returns true for filter.hasFilterRow
The example also proceeds slightly differently compared to the earlier filters, as it sets the version to the column number for a more reproducible result. The implicit timestamps that the servers use as the version could result in fluctuating results as you cannot guarantee them using the exact time, down to the millisecond.
The filter()
method used is called with
different parameter combinations, showing how using the built-in value
filter and the drop flag is affecting the returned data set.
Dedicated Filters
The second type of supplied filters are based directly on
FilterBase
and implement more
specific use cases. Many of these filters are only really applicable
when performing scan operations, since they filter out entire rows. For
get()
calls, this is often too
restrictive and would result in a
very harsh filter approach: include the whole row or nothing at
all.
SingleColumnValueFilter
You can use this filter when you have exactly one column that decides if an entire row should be returned or not. You need to first specify the column you want to track, and then some value to check against. The constructors offered are:
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value) SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, WritableByteArrayComparable comparator)
The first one is a convenience function as
it simply creates a BinaryComparator
instance internally on your
behalf. The second takes the same parameters we used for the Compare
Filter
-based classes. Although the
SingleColumnValueFilter
does not
inherit from the CompareFilter
directly, it still uses the same parameter types.
The filter class also exposes a few auxiliary methods you can use to fine-tune its behavior:
boolean getFilterIfMissing() void setFilterIfMissing(boolean filterIfMissing) boolean getLatestVersionOnly() void setLatestVersionOnly(boolean latestVersionOnly)
The former controls what happens to rows
that do not have the column at all. By default, they are included in
the result, but you can use setFilterIfMissing(true)
to reverse that
behavior, that is, all rows that do not have the reference column are
dropped from the result.
Note
You must include the column you want to
filter by, in other words, the reference column, into the families
you query for—using addColumn()
,
for example. If you fail to do so, the column is considered missing
and the result is either empty, or contains all rows, based on the
getFilterIfMissing()
result.
By using setLatestVersionOnly(false)
—the default is
true
—you can change the default
behavior of the filter, which is only to check the newest version of
the reference column, to instead include previous versions in the
check as well. Example 4-6
combines these features to select a specific set of rows only.
SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"), CompareFilter.CompareOp.NOT_EQUAL, new SubstringComparator("val-5")); filter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner.close(); Get get = new Get(Bytes.toBytes("row-6")); get.setFilter(filter); Result result = table.get(get); System.out.println("Result of get: "); for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); }
SingleColumnValueExcludeFilter
The SingleColumnValueFilter
we just discussed is
extended in this class to provide slightly different semantics: the
reference column, as handed into the constructor, is omitted from the
result. In other words, you have the same features, constructors, and
methods to control how this filter works. The only difference is that
you will never get the column you are checking against as part of the
Result
instance(s) on the client
side.
PrefixFilter
Given a prefix, specified when you instantiate the filter instance, all rows that match this prefix are returned to the client. The constructor is:
public PrefixFilter(byte[] prefix)
Example 4-7 has this applied to the usual test data set.
Filter filter = new PrefixFilter(Bytes.toBytes("row-1")); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner.close(); Get get = new Get(Bytes.toBytes("row-5")); get.setFilter(filter); Result result = table.get(get); for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); }
It is interesting to see how the get()
call fails to return anything, because
it is asking for a row that does not match the
filter prefix. This filter does not make much sense when doing
get()
calls but is highly useful
for scan operations.
The scan also is actively ended when the filter encounters a row key that is larger than the prefix. In this way, and combining this with a start row, for example, the filter is improving the overall performance of the scan as it has knowledge of when to skip the rest of the rows altogether.
PageFilter
You paginate through rows by employing this filter. When you
create the instance, you specify a pageSize
parameter, which controls how many
rows per page should be returned.
Note
There is a fundamental issue with
filtering on physically separate servers. Filters run on different
region servers in parallel and cannot retain or communicate their
current state across those boundaries. Thus, each filter is required
to scan at least up to pageCount
rows before ending the scan. This means a slight inefficiency is
given for the PageFilter
as more
rows are reported to the client than necessary. The final
consolidation on the client obviously has visibility into all
results and can reduce what is accessible through the API
accordingly.
The client code would need to remember the last row that was returned, and then, when another iteration is about to start, set the start row of the scan accordingly, while retaining the same filter properties.
Because pagination is setting a strict limit on the number of rows to be returned, it is possible for the filter to early out the entire scan, once the limit is reached or exceeded. Filters have a facility to indicate that fact and the region servers make use of this hint to stop any further processing.
Example 4-8 puts this together, showing how a client can reset the scan to a new start row on the subsequent iterations.
Filter filter = new PageFilter(15); int totalRows = 0; byte[] lastRow = null; while (true) { Scan scan = new Scan(); scan.setFilter(filter); if (lastRow != null) { byte[] startRow = Bytes.add(lastRow, POSTFIX); System.out.println("start row: " + Bytes.toStringBinary(startRow)); scan.setStartRow(startRow); } ResultScanner scanner = table.getScanner(scan); int localRows = 0; Result result; while ((result = scanner.next()) != null) { System.out.println(localRows++ + ": " + result); totalRows++; lastRow = result.getRow(); } scanner.close(); if (localRows == 0) break; } System.out.println("total rows: " + totalRows);
Because of the lexicographical sorting of the row keys by HBase and the comparison taking care of finding the row keys in order, and the fact that the start key on a scan is always inclusive, you need to add an extra zero byte to the previous key. This will ensure that the last seen row key is skipped and the next, in sorting order, is found. The zero byte is the smallest increment, and therefore is safe to use when resetting the scan boundaries. Even if there were a row that would match the previous plus the extra zero byte, the scan would be correctly doing the next iteration—this is because the start key is inclusive.
KeyOnlyFilter
Some applications need to access just the keys of each
KeyValue
, while omitting the actual
data. The KeyOnlyFilter
provides
this functionality by applying the filter’s ability to modify the
processed columns and cells, as they pass through. It does so by
applying the KeyValue.convertToKeyOnly(boolean)
call that
strips out the data part.
The constructor of this filter has a
boolean
parameter, named lenAsVal
. It is handed to the convertToKeyOnly()
call as-is, controlling
what happens to the value part of each KeyValue
instance processed. The default
false
simply sets the value to zero
length, while the opposite true
sets the value to the number representing the length of the original
value.
The latter may be useful to your application when quickly iterating over columns, where the keys already convey meaning and the length can be used to perform a secondary sort, for example. Client API: Best Practices has an example.
FirstKeyOnlyFilter
If you need to access the first column—as sorted implicitly by HBase—in each row, this filter will provide this feature. Typically this is used by row counter type applications that only need to check if a row exists. Recall that in column-oriented databases a row really is composed of columns, and if there are none, the row ceases to exist.
Another possible use case is relying on the column sorting in lexicographical order, and setting the column qualifier to an epoch value. This would sort the column with the oldest timestamp name as the first to be retrieved. Combined with this filter, it is possible to retrieve the oldest column from every row using a single scan.
This class makes use of another optimization feature provided by the filter framework: it indicates to the region server applying the filter that the current row is done and that it should skip to the next one. This improves the overall performance of the scan, compared to a full table scan.
InclusiveStopFilter
The row boundaries of a scan are inclusive for the start
row, yet exclusive for the stop row. You can overcome the stop row
semantics using this filter, which includes the
specified stop row. Example 4-9 uses
the filter to start at row-3
, and
stop at row-5
inclusively.
The output on the console, when running the example code, confirms that the filter works as advertised:
Adding rows to table... Results of scan: keyvalues={row-3/colfam1:col-0/1301337961569/Put/vlen=7} keyvalues={row-30/colfam1:col-0/1301337961610/Put/vlen=8} keyvalues={row-31/colfam1:col-0/1301337961612/Put/vlen=8} keyvalues={row-32/colfam1:col-0/1301337961613/Put/vlen=8} keyvalues={row-33/colfam1:col-0/1301337961614/Put/vlen=8} keyvalues={row-34/colfam1:col-0/1301337961615/Put/vlen=8} keyvalues={row-35/colfam1:col-0/1301337961616/Put/vlen=8} keyvalues={row-36/colfam1:col-0/1301337961617/Put/vlen=8} keyvalues={row-37/colfam1:col-0/1301337961618/Put/vlen=8} keyvalues={row-38/colfam1:col-0/1301337961619/Put/vlen=8} keyvalues={row-39/colfam1:col-0/1301337961620/Put/vlen=8} keyvalues={row-4/colfam1:col-0/1301337961571/Put/vlen=7} keyvalues={row-40/colfam1:col-0/1301337961621/Put/vlen=8} keyvalues={row-41/colfam1:col-0/1301337961622/Put/vlen=8} keyvalues={row-42/colfam1:col-0/1301337961623/Put/vlen=8} keyvalues={row-43/colfam1:col-0/1301337961624/Put/vlen=8} keyvalues={row-44/colfam1:col-0/1301337961625/Put/vlen=8} keyvalues={row-45/colfam1:col-0/1301337961626/Put/vlen=8} keyvalues={row-46/colfam1:col-0/1301337961627/Put/vlen=8} keyvalues={row-47/colfam1:col-0/1301337961628/Put/vlen=8} keyvalues={row-48/colfam1:col-0/1301337961629/Put/vlen=8} keyvalues={row-49/colfam1:col-0/1301337961630/Put/vlen=8} keyvalues={row-5/colfam1:col-0/1301337961573/Put/vlen=7}
TimestampsFilter
When you need fine-grained control over what versions are
included in the scan result, this filter provides the means. You have
to hand in a List
of
timestamps:
TimestampsFilter(List<Long> timestamps)
Note
As you have seen throughout the book so far, a version is a specific value of a column at a unique point in time, denoted with a timestamp. When the filter is asking for a list of timestamps, it will attempt to retrieve the column versions with the matching timestamps.
Example 4-10 sets up a filter with three timestamps and adds a time range to the second scan.
List<Long> ts = new ArrayList<Long>(); ts.add(new Long(5)); ts.add(new Long(10)); ts.add(new Long(15)); Filter filter = new TimestampsFilter(ts); Scan scan1 = new Scan(); scan1.setFilter(filter); ResultScanner scanner1 = table.getScanner(scan1); for (Result result : scanner1) { System.out.println(result); } scanner1.close(); Scan scan2 = new Scan(); scan2.setFilter(filter); scan2.setTimeRange(8, 12); ResultScanner scanner2 = table.getScanner(scan2); for (Result result : scanner2) { System.out.println(result); } scanner2.close();
Here is the output on the console in an abbreviated form:
Adding rows to table... Results of scan #1: keyvalues={row-1/colfam1:col-10/10/Put/vlen=8, row-1/colfam1:col-15/15/Put/vlen=8, row-1/colfam1:col-5/5/Put/vlen=7} keyvalues={row-10/colfam1:col-10/10/Put/vlen=9, row-10/colfam1:col-15/15/Put/vlen=9, row-10/colfam1:col-5/5/Put/vlen=8} keyvalues={row-100/colfam1:col-10/10/Put/vlen=10, row-100/colfam1:col-15/15/Put/vlen=10, row-100/colfam1:col-5/5/Put/vlen=9} ... Results of scan #2: keyvalues={row-1/colfam1:col-10/10/Put/vlen=8} keyvalues={row-10/colfam1:col-10/10/Put/vlen=9} keyvalues={row-100/colfam1:col-10/10/Put/vlen=10} keyvalues={row-11/colfam1:col-10/10/Put/vlen=9} ...
The first scan, only using the filter, is outputting the column values for all three specified timestamps as expected. The second scan only returns the timestamp that fell into the time range specified when the scan was set up. Both time-based restrictions, the filter and the scanner time range, are doing their job and the result is a combination of both.
ColumnCountGetFilter
You can use this filter to only retrieve a specific maximum number of columns per row. You can set the number using the constructor of the filter:
ColumnCountGetFilter(int n)
Since this filter stops the entire scan once
a row has been found that matches the maximum number of columns
configured, it is not useful for scan operations, and in fact, it was
written to test filters in get()
calls.
ColumnPaginationFilter
Similar to the PageFilter
,
this one can be used to page through columns in a row. Its constructor
has two parameters:
ColumnPaginationFilter(int limit, int offset)
It skips all columns up to the number given
as offset
, and then includes
limit
columns afterward. Example 4-11 has this applied to a
normal scan.
Running this example should render the following output:
Adding rows to table... Results of scan: keyvalues={row-01/colfam1:col-15/15/Put/vlen=9, row-01/colfam1:col-16/16/Put/vlen=9, row-01/colfam1:col-17/17/Put/vlen=9, row-01/colfam1:col-18/18/Put/vlen=9, row-01/colfam1:col-19/19/Put/vlen=9} keyvalues={row-02/colfam1:col-15/15/Put/vlen=9, row-02/colfam1:col-16/16/Put/vlen=9, row-02/colfam1:col-17/17/Put/vlen=9, row-02/colfam1:col-18/18/Put/vlen=9, row-02/colfam1:col-19/19/Put/vlen=9} ...
Note
This example slightly changes the way the
rows and columns are numbered by adding a padding to the numeric
counters. For example, the first row is padded to be row-01
. This also shows how padding can be
used to get a more human-readable style of
sorting, for example—as known from a dictionary or telephone
book.
The result includes all 10 rows, starting
each row at column (offset = 15
)
and printing five columns (limit =
5
).
ColumnPrefixFilter
Analog to the PrefixFilter
,
which worked by filtering on row key prefixes, this filter does the
same for columns. You specify a prefix when creating the
filter:
ColumnPrefixFilter(byte[] prefix)
All columns that have the given prefix are then included in the result.
RandomRowFilter
Finally, there is a filter that shows what is also possible
using the API: including random rows into the result. The constructor
is given a parameter named chance
,
which represents a value between 0.0
and 1.0
:
RandomRowFilter(float chance)
Internally, this class is using a Java
Random.nextFloat()
call to
randomize the row inclusion, and then compares the value with the
chance
given. Giving it a negative
chance value will make the filter exclude all rows, while a value
larger than 1.0
will make it
include all rows.
Decorating Filters
While the provided filters are already very powerful, sometimes it can be useful to modify, or extend, the behavior of a filter to gain additional control over the returned data. Some of this additional control is not dependent on the filter itself, but can be applied to any of them. This is what the decorating filter group of classes is about.
SkipFilter
This filter wraps a given filter and extends it to exclude
an entire row, when the wrapped filter hints for a KeyValue
to be skipped. In other words, as
soon as a filter indicates that a column in a row is omitted, the
entire row is omitted.
Note
The wrapped filter
must implement the filterKeyValue()
method, or the SkipFilter
will not work as
expected.[58] This is because the
Skip
Filter
is only
checking the results of that method to decide how to handle the
current row. See Table 4-5 on page for an overview of compatible
filters.
Example 4-12
combines the SkipFilter
with a
ValueFilter
to first select all
columns that have no zero-valued column, and subsequently drops all
other partial rows that do not have a matching value.
Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("val-0"))); Scan scan = new Scan(); scan.setFilter(filter1); ResultScanner scanner1 = table.getScanner(scan); for (Result result : scanner1) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner1.close(); Filter filter2 = new SkipFilter(filter1); scan.setFilter(filter2); ResultScanner scanner2 = table.getScanner(scan); for (Result result : scanner2) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner2.close();
The example code should print roughly the following results when you execute it—note, though, that the values are randomized, so you should get a slightly different result for every invocation:
Adding rows to table... Results of scan #1: KV: row-01/colfam1:col-00/0/Put/vlen=5, Value: val-4 KV: row-01/colfam1:col-01/1/Put/vlen=5, Value: val-2 KV: row-01/colfam1:col-02/2/Put/vlen=5, Value: val-4 KV: row-01/colfam1:col-03/3/Put/vlen=5, Value: val-3 KV: row-01/colfam1:col-04/4/Put/vlen=5, Value: val-1 KV: row-02/colfam1:col-00/0/Put/vlen=5, Value: val-3 KV: row-02/colfam1:col-01/1/Put/vlen=5, Value: val-1 KV: row-02/colfam1:col-03/3/Put/vlen=5, Value: val-4 KV: row-02/colfam1:col-04/4/Put/vlen=5, Value: val-1 ... Total KeyValue count for scan #1: 122 Results of scan #2: KV: row-01/colfam1:col-00/0/Put/vlen=5, Value: val-4 KV: row-01/colfam1:col-01/1/Put/vlen=5, Value: val-2 KV: row-01/colfam1:col-02/2/Put/vlen=5, Value: val-4 KV: row-01/colfam1:col-03/3/Put/vlen=5, Value: val-3 KV: row-01/colfam1:col-04/4/Put/vlen=5, Value: val-1 KV: row-07/colfam1:col-00/0/Put/vlen=5, Value: val-4 KV: row-07/colfam1:col-01/1/Put/vlen=5, Value: val-1 KV: row-07/colfam1:col-02/2/Put/vlen=5, Value: val-1 KV: row-07/colfam1:col-03/3/Put/vlen=5, Value: val-2 KV: row-07/colfam1:col-04/4/Put/vlen=5, Value: val-4 ... Total KeyValue count for scan #2: 50
The first scan returns all columns that are not zero valued. Since the value is assigned at random, there is a high probability that you will get at least one or more columns of each possible row. Some rows will miss a column—these are the omitted zero-valued ones.
The second scan, on the other hand, wraps
the first filter and forces all partial rows to be dropped. You can
see from the console output how only complete rows are emitted, that
is, those with all five columns the example code creates initially.
The total KeyValue
count for each
scan confirms the more restrictive behavior of the SkipFilter
variant.
WhileMatchFilter
This second decorating filter type works somewhat similarly
to the previous one, but aborts the entire scan once a piece of
information is filtered. This works by checking the wrapped filter and
seeing if it skips a row by its key, or a column of a row because of a
KeyValue
check.[59]
Example 4-13 is a slight variation of the previous example, using different filters to show how the decorating class works.
Filter filter1 = new RowFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("row-05"))); Scan scan = new Scan(); scan.setFilter(filter1); ResultScanner scanner1 = table.getScanner(scan); for (Result result : scanner1) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner1.close(); Filter filter2 = new WhileMatchFilter(filter1); scan.setFilter(filter2); ResultScanner scanner2 = table.getScanner(scan); for (Result result : scanner2) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner2.close();
Once you run the example code, you should get this output on the console:
Adding rows to table... Results of scan #1: KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00 KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00 KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00 KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00 KV: row-06/colfam1:col-00/0/Put/vlen=9, Value: val-06.00 KV: row-07/colfam1:col-00/0/Put/vlen=9, Value: val-07.00 KV: row-08/colfam1:col-00/0/Put/vlen=9, Value: val-08.00 KV: row-09/colfam1:col-00/0/Put/vlen=9, Value: val-09.00 KV: row-10/colfam1:col-00/0/Put/vlen=9, Value: val-10.00 Total KeyValue count for scan #1: 9 Results of scan #2: KV: row-01/colfam1:col-00/0/Put/vlen=9, Value: val-01.00 KV: row-02/colfam1:col-00/0/Put/vlen=9, Value: val-02.00 KV: row-03/colfam1:col-00/0/Put/vlen=9, Value: val-03.00 KV: row-04/colfam1:col-00/0/Put/vlen=9, Value: val-04.00 Total KeyValue count for scan #2: 4
The first scan used just the RowFilter
to skip one out of 10 rows; the
rest is returned to the client. Adding the WhileMatchFilter
for the second scan shows
its behavior to stop the entire scan operation, once the wrapped
filter omits a row or column. In the example this is row-05
, triggering the end of the
scan.
FilterList
So far you have seen how filters—on their own, or
decorated—are doing the work of filtering out various dimensions of a
table, ranging from rows, to columns, and all the way to versions of
values within a column. In practice, though, you may want to have more
than one filter being applied to reduce the data returned to your client
application. This is what the FilterList
is for.
Note
The FilterList
class implements the same
Filter
interface, just like any
other single-purpose filter. In doing so, it can be used as a drop-in
replacement for those filters, while combining the effects of each
included instance.
You can create an instance of FilterList
while providing various parameters
at instantiation time, using one
of these constructors:
FilterList(List<Filter> rowFilters) FilterList(Operator operator) FilterList(Operator operator, List<Filter> rowFilters
The rowFilters
parameter specifies the list of
filters that are assessed together, using an operator
to combine their results. Table 4-3 lists the possible choices of operators.
The default is MUST_PASS_ALL
, and can
therefore be omitted from the constructor when you do not need a
different one.
Adding filters, after the
FilterList
instance has been created,
can be done with:
void addFilter(Filter filter)
You can only specify one
operator per FilterList
, but you are
free to add other FilterList
instances to an existing FilterList
,
thus creating a hierarchy of filters, combined with the operators you
need.
You can further control the execution order of
the included filters by carefully choosing the List
implementation you require. For example,
using ArrayList
would guarantee that
the filters are applied in the order they were added to the list. This
is shown in Example 4-14.
List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-03"))); filters.add(filter1); Filter filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-06"))); filters.add(filter2); Filter filter3 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("col-0[03]")); filters.add(filter3); FilterList filterList1 = new FilterList(filters); Scan scan = new Scan(); scan.setFilter(filterList1); ResultScanner scanner1 = table.getScanner(scan); for (Result result : scanner1) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner1.close(); FilterList filterList2 = new FilterList( FilterList.Operator.MUST_PASS_ONE, filters); scan.setFilter(filterList2); ResultScanner scanner2 = table.getScanner(scan); for (Result result : scanner2) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner2.close();
The first scan filters out a lot of details, as at least one of the filters in the list excludes some information. Only where they all let the information pass is it returned to the client.
In contrast, the second scan includes
all rows and columns in the result. This is caused
by setting the FilterList
operator to
MUST_PASS_ONE
, which includes all the
information as soon as a single filter lets it pass. And in this
scenario, all values are passed by at least one of them,
including everything.
Custom Filters
Eventually, you may exhaust the list of supplied filter types and
need to implement your own. This can be done by either implementing the
Filter
interface, or extending the
provided FilterBase
class. The latter
provides default implementations for all methods that are members of the
interface.
The Filter
interface has
the following structure:
public interface Filter extends Writable { public enum ReturnCode { INCLUDE, SKIP, NEXT_COL, NEXT_ROW, SEEK_NEXT_USING_HINT } public void reset() public boolean filterRowKey(byte[] buffer, int offset, int length) public boolean filterAllRemaining() public ReturnCode filterKeyValue(KeyValue v) public void filterRow(List<KeyValue> kvs) public boolean hasFilterRow() public boolean filterRow() public KeyValue getNextKeyHint(KeyValue currentKV)
The interface provides a public enumeration
type, named ReturnCode
, that is used
by the filterKeyValue()
method to
indicate what the execution framework should do next. Instead of blindly
iterating over all values, the filter has the ability to skip a value,
the remainder of a column, or the rest of the entire row. This helps
tremendously in terms of improving performance while retrieving
data.
Note
The servers may still need to scan the
entire row to find matching data, but the optimizations provided by
the filterKeyValue()
return code
can reduce the work required to do so.
Table 4-4 lists the possible values and their meaning.
Return code | Description |
INCLUDE | Include the given KeyValue instance in the
result. |
SKIP | Skip the current KeyValue and proceed to the
next. |
NEXT_COL | Skip the remainder of the current column, proceeding to
the next. This is used by the TimestampsFilter , for
example. |
NEXT_ROW | Similar to the previous, but skips the remainder of the
current row, moving to the next. The RowFilter makes use of this return
code, for example. |
SEEK_NEXT_USING_HINT | Some filters want to skip a variable number of values and
use this return code to indicate that the framework should use
the getNextKeyHint() method
to determine where to skip to. The ColumnPrefixFilter , for example, uses
this feature. |
Most of the provided methods are called at various stages in the process of retrieving a row for a client—for example, during a scan operation. Putting them in call order, you can expect them to be executed in the following sequence:
filterRowKey(byte[] buffer, int offset, int length)
The next check is against the row key, using this method of the
Filter
implementation. You can use it to skip an entire row from being further processed. TheRowFilter
uses it to suppress entire rows being returned to the client.filterKeyValue(KeyValue v)
When a row is not filtered (yet), the framework proceeds to invoke this method for every
KeyValue
that is part of the current row. TheReturnCode
indicates what should happen with the current value.filterRow(List<KeyValue> kvs)
Once all row and value checks have been performed, this method of the filter is called, giving you access to the list of
KeyValue
instances that have been included by the previous filter methods. TheDependentColumnFilter
uses it to drop those columns that do not match the reference column.filterRow()
After everything else was checked and invoked, the final inspection is performed using
filterRow()
. A filter that uses this functionality is thePageFilter
, checking if the number of rows to be returned for one iteration in the pagination process is reached, returningtrue
afterward. The defaultfalse
would include the current row in the result.reset()
This resets the filter for every new row the scan is iterating over. It is called by the server, after a row is read, implicitly. This applies to get and scan operations, although obviously it has no effect for the former, as
get
s only read a single row.filterAllRemaining()
This method can be used to stop the scan, by returning
true
. It is used by filters to provide the early out optimizations mentioned earlier. If a filter returnsfalse
, the scan is continued, and the aforementioned methods are called.Obviously, this also implies that for
get
operations this call is not useful.
Figure 4-2 shows the logical flow of the filter methods for a single row. There is a more fine-grained process to apply the filters on a column level, which is not relevant in this context.
Example 4-15 implements a
custom filter, using the methods provided by FilterBase
, overriding only those methods that
need to be changed.
The filter first assumes all rows should be filtered, that is, removed from the result. Only when there is a value in any column that matches the given reference does it include the row, so that it is sent back to the client.
public class CustomFilter extends FilterBase{ private byte[] value = null; private boolean filterRow = true; public CustomFilter() { super(); } public CustomFilter(byte[] value) { this.value = value; } @Override public void reset() { this.filterRow = true; } @Override public ReturnCode filterKeyValue(KeyValue kv) { if (Bytes.compareTo(value, kv.getValue()) == 0) { filterRow = false; } return ReturnCode.INCLUDE; } @Override public boolean filterRow() { return filterRow; } @Override public void write(DataOutput dataOutput) throws IOException { Bytes.writeByteArray(dataOutput, this.value); } @Override public void readFields(DataInput dataInput) throws IOException { this.value = Bytes.readByteArray(dataInput); } }
Example 4-16 uses
the new custom filter to find rows with specific values in it, also
using a FilterList
.
List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new CustomFilter(Bytes.toBytes("val-05.05")); filters.add(filter1); Filter filter2 = new CustomFilter(Bytes.toBytes("val-02.07")); filters.add(filter2); Filter filter3 = new CustomFilter(Bytes.toBytes("val-09.00")); filters.add(filter3); FilterList filterList = new FilterList( FilterList.Operator.MUST_PASS_ONE, filters); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { for (KeyValue kv : result.raw()) { System.out.println("KV: " + kv + ", Value: " + Bytes.toString(kv.getValue())); } } scanner.close();
Just as with the earlier examples, here is what should appear as output on the console when executing this example:
Adding rows to table... Results of scan: KV: row-02/colfam1:col-00/1301507323088/Put/vlen=9, Value: val-02.00 KV: row-02/colfam1:col-01/1301507323090/Put/vlen=9, Value: val-02.01 KV: row-02/colfam1:col-02/1301507323092/Put/vlen=9, Value: val-02.02 KV: row-02/colfam1:col-03/1301507323093/Put/vlen=9, Value: val-02.03 KV: row-02/colfam1:col-04/1301507323096/Put/vlen=9, Value: val-02.04 KV: row-02/colfam1:col-05/1301507323104/Put/vlen=9, Value: val-02.05 KV: row-02/colfam1:col-06/1301507323108/Put/vlen=9, Value: val-02.06 KV: row-02/colfam1:col-07/1301507323110/Put/vlen=9, Value: val-02.07 KV: row-02/colfam1:col-08/1301507323112/Put/vlen=9, Value: val-02.08 KV: row-02/colfam1:col-09/1301507323113/Put/vlen=9, Value: val-02.09 KV: row-05/colfam1:col-00/1301507323148/Put/vlen=9, Value: val-05.00 KV: row-05/colfam1:col-01/1301507323150/Put/vlen=9, Value: val-05.01 KV: row-05/colfam1:col-02/1301507323152/Put/vlen=9, Value: val-05.02 KV: row-05/colfam1:col-03/1301507323153/Put/vlen=9, Value: val-05.03 KV: row-05/colfam1:col-04/1301507323154/Put/vlen=9, Value: val-05.04 KV: row-05/colfam1:col-05/1301507323155/Put/vlen=9, Value: val-05.05 KV: row-05/colfam1:col-06/1301507323157/Put/vlen=9, Value: val-05.06 KV: row-05/colfam1:col-07/1301507323158/Put/vlen=9, Value: val-05.07 KV: row-05/colfam1:col-08/1301507323158/Put/vlen=9, Value: val-05.08 KV: row-05/colfam1:col-09/1301507323159/Put/vlen=9, Value: val-05.09 KV: row-09/colfam1:col-00/1301507323192/Put/vlen=9, Value: val-09.00 KV: row-09/colfam1:col-01/1301507323194/Put/vlen=9, Value: val-09.01 KV: row-09/colfam1:col-02/1301507323196/Put/vlen=9, Value: val-09.02 KV: row-09/colfam1:col-03/1301507323199/Put/vlen=9, Value: val-09.03 KV: row-09/colfam1:col-04/1301507323201/Put/vlen=9, Value: val-09.04 KV: row-09/colfam1:col-05/1301507323202/Put/vlen=9, Value: val-09.05 KV: row-09/colfam1:col-06/1301507323203/Put/vlen=9, Value: val-09.06 KV: row-09/colfam1:col-07/1301507323204/Put/vlen=9, Value: val-09.07 KV: row-09/colfam1:col-08/1301507323205/Put/vlen=9, Value: val-09.08 KV: row-09/colfam1:col-09/1301507323206/Put/vlen=9, Value: val-09.09
As expected, the entire row that has a column with the value matching one of the references is included in the result.
Filters Summary
Table 4-5 summarizes some of the features and compatibilities related to the provided filter implementations. The ✓ symbol means the feature is available, while ✗ indicates it is missing.
Filter | Batch[a] | Skip[b] | While-Match[c] | List[d] | Early Out[e] | Gets[f] | Scans[g] |
RowFilter | ✓ | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ |
FamilyFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
QualifierFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
ValueFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
DependentColumnFilter | ✗ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
SingleColumnValueFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ | ✓ |
SingleColumnValue ExcludeFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ | ✓ |
PrefixFilter | ✓ | ✗ | ✓ | ✓ | ✓ | ✗ | ✓ |
PageFilter | ✓ | ✗ | ✓ | ✓ | ✓ | ✗ | ✓ |
KeyOnlyFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
FirstKeyOnlyFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
InclusiveStopFilter | ✓ | ✗ | ✓ | ✓ | ✓ | ✗ | ✓ |
TimestampsFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
ColumnCountGetFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✗ |
ColumnPaginationFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
ColumnPrefixFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✓ | ✓ |
RandomRowFilter | ✓ | ✓ | ✓ | ✓ | ✗ | ✗ | ✓ |
SkipFilter | ✓ | ✓/✗[h] | ✓/✗[h] | ✓ | ✗ | ✗ | ✓ |
WhileMatchFilter | ✓ | ✓/✗[h] | ✓/✗[h] | ✓ | ✓ | ✗ | ✓ |
FilterList | ✓/✗[h] | ✓/✗[h] | ✓/✗[h] | ✓ | ✓/✗[h] | ✓ | ✓ |
[a] Filter supports [b] Filter can be used with the decorating [c] Filter can be used with the decorating [d] Filter can be used with the combining [e] Filter has optimizations to stop a scan early, once there are no more matching rows ahead. [f] Filter can be usefully applied to [g] Filter can be usefully applied to [h] Depends on the included filters. |
Counters
In addition to the functionality we already discussed, HBase offers another advanced feature: counters. Many applications that collect statistics—such as clicks or views in online advertising—were used to collect the data in logfiles that would subsequently be analyzed. Using counters offers the potential of switching to live accounting, foregoing the delayed batch processing step completely.
Introduction to Counters
In addition to the check-and-modify operations you saw earlier, HBase also has a mechanism to treat columns as counters. Otherwise, you would have to lock a row, read the value, increment it, write it back, and eventually unlock the row for other writers to be able to access it subsequently. This can cause a lot of contention, and in the event of a client process, crashing it could leave the row locked until the lease recovery kicks in—which could be disastrous in a heavily loaded system.
The client API provides specialized methods to do the read-and-modify operation atomically in a single client-side call. Earlier versions of HBase only had calls that would involve an RPC for every counter update, while newer versions started to add the same mechanisms used by the CRUD operations—as explained in CRUD Operations—which can bundle multiple counter updates in a single RPC.
Note
While you can update multiple counters, you
are still limited to single rows. Updating counters in multiple rows
would require separate API—and
therefore RPC—calls. The batch()
calls
currently do not support the Increment
instance, though this should
change in the near future.
Before we discuss each type separately, you need to have a few more details regarding how counters work on the column level. Here is an example using the shell that creates a table, increments a counter twice, and then queries the current value:
hbase(main):001:0>
create 'counters', 'daily', 'weekly', 'monthly'
0 row(s) in 1.1930 secondshbase(main):002:0>
incr 'counters', '20110101', 'daily:hits', 1
COUNTER VALUE = 1hbase(main):003:0>
incr 'counters', '20110101', 'daily:hits', 1
COUNTER VALUE = 2hbase(main):04:0>
get_counter 'counters', '20110101', 'daily:hits'
COUNTER VALUE = 2
Every call to incr
returns the new value of the counter.
The final check using get_counter
shows the current value
as expected.
Note
The format of the shell’s incr
command is as follows:
incr '<table>', '<row>', '<column>', [<increment-value>]
You can also access the counter with a get
call, giving you this result:
hbase(main):005:0>
get 'counters', '20110101'
COLUMN CELL daily:hits timestamp=1301570823471, value=\x00\x00\x00\x00\x00\x00\x00\x02 1 row(s) in 0.0600 seconds
This is obviously not very readable, but it shows that a counter is simply a column, like any other. You can also specify a larger increment value:
hbase(main):006:0>
incr 'counters',
'20110101', 'daily:hits', 20
COUNTER VALUE = 22hbase(main):007:0>
get 'counters', '20110101'
COLUMN CELL daily:hits timestamp=1301574412848, value=\x00\x00\x00\x00\x00\x00\x00\x16 1 row(s) in 0.0400 secondshbase(main):008:0>
get_counter 'counters',
'20110101', 'daily:hits'
COUNTER VALUE = 22
Accessing the counter directly gives you the
byte
array representation, with the
shell printing the separate byte
s as
hexadecimal values. Using the get_counter
once again shows the current value
in a more human-readable format, and confirms that variable increments
are possible and work as expected.
Finally, you can use the increment value of the incr
call to not only increase the counter,
but also retrieve the current value, and decrease it as well. In fact,
you can omit it completely and the default of 1
is assumed:
hbase(main):004:0>
incr 'counters', '20110101',
'daily:hits'
COUNTER VALUE = 3hbase(main):005:0>
incr 'counters', '20110101', 'daily:hits'
COUNTER VALUE = 4hbase(main):006:0>
incr 'counters', '20110101', 'daily:hits', 0
COUNTER VALUE = 4hbase(main):007:0>
incr 'counters', '20110101', 'daily:hits', -1
COUNTER VALUE = 3hbase(main):008:0>
incr 'counters', '20110101', 'daily:hits', -1
COUNTER VALUE = 2
Using the increment value—the last parameter
of the incr
command—you can achieve
the behavior shown in Table 4-6.
Value | Effect |
greater than
zero | Increase the counter by the given value. |
zero | Retrieve the current value of the
counter. Same as using the get_counter shell command. |
less than zero | Decrease the counter by the given value. |
Obviously, using the shell’s incr
command only allows you to increase a
single counter. You can do the same using the client API, described
next.
Single Counters
The first type of increment call is for single counters only: you need
to specify the exact column you want to use. The methods, provided by HTable
, are as such:
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException
Given the coordinates of
a column, and the increment account, these methods only differ by the
optional writeToWAL
parameter—which
works the same way as the Put.setWriteToWAL()
method.
Omitting writeToWAL
uses the default value of true
, meaning the write-ahead log is
active.
Apart from that, you can use them easily, as shown in Example 4-17.
HTable table = new HTable(conf, "counters"); long cnt1 = table.incrementColumnValue(Bytes.toBytes("20110101"), Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); long cnt2 = table.incrementColumnValue(Bytes.toBytes("20110101"), Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); long current = table.incrementColumnValue(Bytes.toBytes("20110101"), Bytes.toBytes("daily"), Bytes.toBytes("hits"), 0); long cnt3 = table.incrementColumnValue(Bytes.toBytes("20110101"), Bytes.toBytes("daily"), Bytes.toBytes("hits"), -1);
cnt1: 1, cnt2: 2, current: 2, cnt3: 1
Just as with the shell commands used earlier, the API calls have the same effect: they increment the counter when using a positive increment value, retrieve the current value when using zero for the increment, and eventually decrease the counter by using a negative increment value.
Multiple Counters
Another way to increment counters is provided by the increment()
call of HTable
. It works similarly to the CRUD-type
operations discussed earlier, using the following method to do the
increment:
Result increment(Increment increment) throws IOException
You must create an instance of the Increment
class and fill it with the
appropriate details—for example,
the counter coordinates. The constructors provided by this class
are:
Increment() {} Increment(byte[] row) Increment(byte[] row, RowLock rowLock)
You must provide a row key when instantiating
an Increment
, which sets the row
containing all the counters that the subsequent call to increment()
should modify.
The optional parameter rowLock
specifies a custom row lock instance,
allowing you to run the entire operation under your exclusive
control—for example, when you want to modify the same row a few times
while protecting it against updates from other writers.
Warning
While you can guard the increment operation against other writers, you currently cannot do this for readers. In fact, there is no atomicity guarantee made for readers.
Since readers are not taking out locks on rows that are incremented, it may happen that they have access to some counters—within one row—that are already updated, and some that are not! This applies to scan and get operations equally.
Once you have decided which row to update and created the
Increment
instance, you need to add
the actual counters—meaning columns—you want to increment, using this
method:
Increment addColumn(byte[] family, byte[] qualifier, long amount)
The difference here, as compared to the
Put
methods, is that there is no
option to specify a version—or timestamp—when dealing with increments:
versions are handled implicitly. Furthermore, there is no addFamily()
equivalent, because counters are
specific columns, and they need to be specified as such. It therefore
makes no sense to add a column family alone.
A special feature of the Increment
class is the ability to take an
optional time range:
Increment setTimeRange(long minStamp, long maxStamp) throws IOException
Setting a time range for a set of counter
increments seems odd in light of the fact that versions are handled
implicitly. The time range is actually passed on to the servers to
restrict the internal get operation from retrieving the current counter
values. You can use it to expire counters, for
example, to partition them by time: when you set the time range to be
restrictive enough, you can mask out older counters from the internal
get, making them look like they are nonexistent. An increment would
assume they are unset and start at 1
again.
The Increment
class provides additional methods,
which are summarized in Table 4-7.
Similar to the shell example shown earlier, Example 4-18 uses various increment values to increment, retrieve, and decrement the given counters.
Increment increment1 = new Increment(Bytes.toBytes("20110101")); increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1); increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10); increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10); Result result1 = table.increment(increment1); for (KeyValue kv : result1.raw()) { System.out.println("KV: " + kv + " Value: " + Bytes.toLong(kv.getValue())); } Increment increment2 = new Increment(Bytes.toBytes("20110101")); increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5); increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1); increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0); increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5); Result result2 = table.increment(increment2); for (KeyValue kv : result2.raw()) { System.out.println("KV: " + kv + " Value: " + Bytes.toLong(kv.getValue())); }
When you run the example, the following is output on the console:
KV: 20110101/daily:clicks/1301948275827/Put/vlen=8 Value: 1 KV: 20110101/daily:hits/1301948275827/Put/vlen=8 Value: 1 KV: 20110101/weekly:clicks/1301948275827/Put/vlen=8 Value: 10 KV: 20110101/weekly:hits/1301948275827/Put/vlen=8 Value: 10 KV: 20110101/daily:clicks/1301948275829/Put/vlen=8 Value: 6 KV: 20110101/daily:hits/1301948275829/Put/vlen=8 Value: 2 KV: 20110101/weekly:clicks/1301948275829/Put/vlen=8 Value: 10 KV: 20110101/weekly:hits/1301948275829/Put/vlen=8 Value: 5
When you compare the two sets of increment results, you will notice that this works as expected.
Coprocessors
Earlier we discussed how you can use filters to reduce the amount of data being sent over the network from the servers to the client. With the coprocessor feature in HBase, you can even move part of the computation to where the data lives.
Introduction to Coprocessors
Using the client API, combined with specific selector mechanisms, such as filters, or column family scoping, it is possible to limit what data is transferred to the client. It would be good, though, to take this further and, for example, perform certain operations directly on the server side while only returning a small result set. Think of this as a small MapReduce framework that distributes work across the entire cluster.
A coprocessor enables you to run arbitrary code directly on each region server. More precisely, it executes the code on a per-region basis, giving you trigger-like functionality—similar to stored procedures in the RDBMS world. From the client side, you do not have to take specific actions, as the framework handles the distributed nature transparently.
There is a set of implicit events that you can use to hook into, performing auxiliary tasks. If this is not enough, you can also extend the RPC protocol to introduce your own set of calls, which are invoked from your client and executed on the server on your behalf.
Just as with the custom filters (see Custom Filters), you need to create special Java classes that implement specific interfaces. Once they are compiled, you make these classes available to the servers in the form of a JAR file. The region server process can instantiate these classes and execute them in the correct environment. In contrast to the filters, though, coprocessors can be loaded dynamically as well. This allows you to extend the functionality of a running HBase cluster.
Use cases for coprocessors are, for instance, using hooks into row mutation operations to maintain secondary indexes, or implementing some kind of referential integrity. Filters could be enhanced to become stateful, and therefore make decisions across row boundaries. Aggregate functions, such as sum(), or avg(), known from RDBMSes and SQL, could be moved to the servers to scan the data locally and only returning the single number result across the network.
Note
Another good use case for coprocessors is access control. The authentication, authorization, and auditing features added in HBase version 0.92 are based on coprocessors. They are loaded at system startup and use the provided trigger-like hooks to check if a user is authenticated, and authorized to access specific values stored in tables.
The framework already provides classes, based on the coprocessor framework, which you can use to extend from when implementing your own functionality. They fall into two main groups: observer and endpoint. Here is a brief overview of their purpose:
- Observer
This type of coprocessor is comparable to triggers: callback functions (also referred to here as hooks) are executed when certain events occur. This includes user-generated, but also server-internal, automated events.
The interfaces provided by the coprocessor framework are:
Observers provide you with well-defined event callbacks, for every operation a cluster server may handle.
- Endpoint
Next to event handling there is also a need to add custom operations to a cluster. User code can be deployed to the servers hosting the data to, for example, perform server-local computations.
Endpoints are dynamic extensions to the RPC protocol, adding callable remote procedures. Think of them as stored procedures, as known from RDBMSes. They may be combined with observer implementations to directly interact with the server-side state.
All of these interfaces are based on the Coprocessor
interface to gain common features,
but then implement their own specific functionality.
Finally, coprocessors can be chained, very similar to what the Java Servlet API does with request filters. The following section discusses the various types available in the coprocessor framework.
The Coprocessor Class
All coprocessor classes
must be based on this interface. It defines the
basic contract of a coprocessor and facilitates the management by the
framework itself. The interface provides two enumerations, which are
used throughout the framework: Priority
and State
. Table 4-8 explains the priority values.
Value | Description |
SYSTEM | Highest priority, defines coprocessors that are executed first |
USER | Defines all other coprocessors, which are executed subsequently |
The priority of a coprocessor defines in what order the coprocessors are executed: system-level instances are called before the user-level coprocessors are executed.
Note
Within each priority level, there is also the notion of a sequence number, which keeps track of the order in which the coprocessors were loaded. The number starts with zero, and is increased by one thereafter.
The number itself is not very helpful, but you can rely on the framework to order the coprocessors—in each priority group—ascending by sequence number. This defines their execution order.
Coprocessors are managed by the framework in their own life cycle. To
that effect, the Coprocessor
interface offers two calls:
void start(CoprocessorEnvironment env) throws IOException; void stop(CoprocessorEnvironment env) throws IOException;
These two methods are called when the
coprocessor class is started, and eventually when it is decommissioned.
The provided CoprocessorEnvironment
instance is used to retain the state across the lifespan of the
coprocessor instance. A coprocessor instance is always contained in a
provided environment. Table 4-9 lists the methods
available from it.
Coprocessors should only deal with what they have been given by their environment. There is a good reason for that, mainly to guarantee that there is no back door for malicious code to harm your data.
Note
Coprocessor implementations should be using
the getTable()
method to access
tables. Note that this class adds certain safety measures to the
default HTable
class. For example,
coprocessors are not allowed to lock a
row.
While there is currently nothing that can
stop you from creating your own HTable
instances inside your coprocessor
code, this is likely to be checked against in the future and possibly
denied.
The start()
and stop()
methods of the Coprocessor
interface are invoked implicitly
by the framework as the instance is going through its life cycle. Each
step in the process has a well-known state. Table 4-10 lists the life-cycle state values as
provided by the coprocessor interface.
Value | Description |
UNINSTALLED | The coprocessor is in its initial state. It has no environment yet, nor is it initialized. |
INSTALLED | The instance is installed into its environment. |
STARTING | This state indicates that the coprocessor is about to be
started, i.e., its start()
method is about to be invoked. |
ACTIVE | Once the start() call
returns, the state is set to active . |
STOPPING | The state set just before the stop() method is called. |
STOPPED | Once stop() returns
control to the framework, the state of the coprocessor is set to
stopped . |
The final piece of the puzzle is the CoprocessorHost
class that maintains all the
coprocessor instances and their dedicated environments. There are
specific subclasses, depending on where the host is used, in other
words, on the master, region server, and so on.
The trinity of Coprocessor
, CoprocessorEnvironment
, and CoprocessorHost
forms the basis for the
classes that implement the advanced functionality of HBase, depending on
where they are used. They provide the life-cycle support for the
coprocessors, manage their state, and offer the environment for them to
execute as expected. In addition, these classes provide an abstraction
layer that developers can use to easily build their own custom
implementation.
Figure 4-3 shows how the calls from a client are flowing through the list of coprocessors. Note how the order is the same on the incoming and outgoing sides: first are the system-level ones, and then the user ones in the order they were loaded.
Coprocessor Loading
Coprocessors are loaded in a variety of ways. Before we discuss the actual coprocessor types and how to implement your own, we will talk about how to deploy them so that you can try the provided examples.
You can either configure coprocessors to be loaded in a static way, or load them dynamically while the cluster is running. The static method uses the configuration files and table schemas—and is discussed next. Unfortunately, there is not yet an exposed API to load them dynamically.[60]
Loading from the configuration
You can configure globally which coprocessors are loaded when HBase starts. This is done by adding one, or more, of the following to the hbase-site.xml configuration file:
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value> </property> <property> <name>hbase.coprocessor.master.classes</name> <value>coprocessor.MasterObserverExample</value> </property> <property> <name>hbase.coprocessor.wal.classes</name> <value>coprocessor.WALObserverExample, bar.foo.MyWALObserver</value> </property>
The order of the classes in each configuration property is important, as it defines the execution order. All of these coprocessors are loaded with the system priority. You should configure all globally active classes here so that they are executed first and have a chance to take authoritative actions. Security coprocessors are loaded this way, for example.
Note
The configuration file is the first to be examined as HBase starts. Although you can define additional system-level coprocessors in other places, the ones here are executed first.
Only one of the three possible
configuration keys is read by the matching CoprocessorHost
implementation. For
example, the coprocessors defined in hbase.coprocessor.master.classes
are
loaded by the Master
Coprocessor
Host
class.
Table 4-11 shows where each configuration property is used.
The coprocessors defined with hbase.coprocessor.region.classes
are loaded
as defaults when a region is
opened for a table. Note that you cannot specify
for which table, or region, they are loaded: the default coprocessors
are loaded for every table and region. You need
to keep this in mind when designing your own coprocessors.
Loading from the table descriptor
The other option to define what coprocessors to load is the table descriptor. As this is per table, the coprocessors defined here are only loaded for regions of that table—and only by the region servers. In other words, you can only use this approach for region-related coprocessors, not for master or WAL-related ones.
Since they are loaded in the context of a table, they are more targeted compared to the configuration loaded ones, which apply to all tables.
You need to add their definition to the table descriptor
using the HTableDescriptor.setValue()
method. The key
must start with COPROCESSOR
, and the value has to conform to
the following format:
<path-to-jar>|<classname>|<priority>
Here is an example that defines two coprocessors, one with system-level priority, the other with user-level priority:
'COPROCESSOR$1' => \ 'hdfs://localhost:8020/users/leon/test.jar|coprocessor.Test|SYSTEM' 'COPROCESSOR$2' => \ '/Users/laura/test2.jar|coprocessor.AnotherTest|USER'
The path-to-jar
can either be a fully specified
HDFS location, or any other path supported by the Hadoop FileSystem
class. The second coprocessor
definition, for example, uses a local path instead.
The classname
defines the actual implementation
class. While the JAR may contain many coprocessor classes, only one
can be specified per table attribute. Use the standard Java package
name conventions to specify the class.
The priority
must be either SYSTEM
or USER
. This is case-sensitive and must be
specified exactly this way.
Warning
Avoid using extra whitespace characters in the coprocessor definition. The parsing is quite strict, and adding leading, trailing, or spacing characters will render the entire entry invalid.
Using the $<number>
postfix for the key enforces
the order in which the definitions, and therefore the coprocessors,
are loaded. Although only the prefix of COPROCESSOR
is checked, using the numbered
postfix is the advised way to define them. Example 4-19 shows how this
can be done using the administrative API for HBase.
public class LoadWithTableDescriptorExample { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); Path path = new Path(fs.getUri() + Path.SEPARATOR + "test.jar"); HTableDescriptor htd = new HTableDescriptor("testtable"); htd.addFamily(new HColumnDescriptor("colfam1")); htd.setValue("COPROCESSOR$1", path.toString() + "|" + RegionObserverExample.class.getCanonicalName() + "|" + Coprocessor.Priority.USER); HBaseAdmin admin = new HBaseAdmin(conf); admin.createTable(htd); System.out.println(admin.getTableDescriptor(Bytes.toBytes("testtable"))); } }
The final check should show you the following result when running this example against a local, standalone HBase cluster:
{NAME => 'testtable', COPROCESSOR$1 => \ 'file:/test.jar|coprocessor.RegionObserverExample|USER', FAMILIES => \ [{NAME => 'colfam1', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', \ COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE \ => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}
The coprocessor definition has been successfully applied to the table schema. Once the table is enabled and the regions are opened, the framework will first load the configuration coprocessors and then the ones defined in the table descriptor.
The RegionObserver Class
The first subclass of Coprocessor
we will look into is the one used
at the region level: the RegionObserver
class. You can learn from its
name that it belongs to the group of observer
coprocessors: they have hooks that trigger when a specific region-level
operation occurs.
These operations can be divided into two
groups as well: region life-cycle changes and
client API
calls. We will look into
both in that order.
Handling region life-cycle events
While The Region Life Cycle explains the region life-cycle, Figure 4-4 shows a simplified form.
The observers have the opportunity to hook into the pending open, open, and pending close state changes. For each of them there is a set of hooks that are called implicitly by the framework.
Note
For the sake of brevity, all parameters and exceptions are omitted when referring to the observer calls. Read the online documentation for the full specification.[61] Note, though, that all calls have a special first parameter:
ObserverContext<RegionCoprocessorEnvironment> c
This special CoprocessorEnvironment
wrapper gives you
additional control over what should happen after the hook execution.
See The RegionCoprocessorEnvironment class and The ObserverContext class for the details.
State: pending open
A region is in this state when it is about to be opened. Observing coprocessors can either piggyback or fail this process. To do so, the following calls are available:
void preOpen(...) / void postOpen(...)
These methods are called just before the
region is opened, and just after it was opened. Your coprocessor
implementation can use them, for instance, to indicate to the
framework—in the preOpen()
call—that it should abort the opening process. Or hook into the
postOpen()
call to trigger a
cache warm up, and so on.
After the pending open, but just before the open state, the region server may have to apply records from the write-ahead log (WAL). This, in turn, invokes the following methods of the observer:
void preWALRestore(...) / void postWALRestore(...)
Hooking into these calls gives you fine-grained control over what mutation is applied during the log replay process. You get access to the edit record, which you can use to inspect what is being applied.
State: open
A region is considered open when it is deployed to a region server and fully operational. At this point, all the operations discussed throughout the book can take place; for example, the region’s in-memory store could be flushed to disk, or the region could be split when it has grown too large. The possible hooks are:
void preFlush(...) / void postFlush(...) void preCompact(...) / void postCompact(...) void preSplit(...) / void postSplit(...)
This should be quite intuitive by now: the
pre calls are executed
before, while the post
calls are executed after the respective
operation. For example, using the preSplit()
hook, you could effectively
disable the built-in region splitting process and perform these
operations manually.
State: pending close
The last group of hooks for the observers is for regions that go into the pending close state. This occurs when the region transitions from open to closed. Just before, and after, the region is closed the following hooks are executed:
void preClose(..., boolean abortRequested) / void postClose(..., boolean abortRequested)
The abortRequested
parameter indicates why a
region was closed. Usually regions are closed during normal
operation, when, for example, the region is moved to a different
region server for load-balancing reasons. But there also is the
possibility for a region server to have gone rogue and be
aborted to avoid any side effects. When this
happens, all hosted regions are also aborted, and you can see from
the given parameter if that was the case.
Handling client API events
As opposed to the life-cycle events, all client API calls are explicitly sent from a client application to the region server. You have the opportunity to hook into these calls just before they are applied, and just thereafter. Here is the list of the available calls:
void preGet(...) / void postGet(...)
Called before and after a client makes an
HTable.get()
requestvoid prePut(...) / void postPut(...)
Called before and after a client makes an
HTable.put()
requestvoid preDelete(...) / void postDelete(...)
Called before and after a client makes an
HTable.delete()
requestboolean preCheckAndPut(...) / boolean postCheckAndPut(...)
Called before and after a client invokes an
HTable.checkAndPut()
callboolean preCheckAndDelete(...) / boolean postCheckAndDelete(...)
Called before and after a client invokes an
HTable.checkAndDelete()
callvoid preGetClosestRowBefore(...) / void postGetClosestRowBefore(...)
Called before and after a client invokes an
HTable.getClosestRowBefore()
callboolean preExists(...) / boolean postExists(...)
Called before and after a client invokes an
HTable.exists()
calllong preIncrementColumnValue(...) / long postIncrementColumnValue(...)
Called before and after a client invokes an
HTable.incrementColumnValue()
callvoid preIncrement(...) / void postIncrement(...)
Called before and after a client invokes an
HTable.increment()
callInternalScanner preScannerOpen(...) / InternalScanner postScannerOpen(...)
Called before and after a client invokes an
HTable.getScanner()
callboolean preScannerNext(...) / boolean postScannerNext(...)
Called before and after a client invokes a
ResultScanner.next()
callvoid preScannerClose(...) / void postScannerClose(...)
Called before and after a client invokes a
ResultScanner.close()
call
The RegionCoprocessorEnvironment class
The environment instances provided to a coprocessor that is
implementing the Region
Observer
interface are based on the
RegionCoprocessorEnvironment
class—which in turn is implementing the CoprocessorEnvironment
interface. The latter
was discussed in The Coprocessor Class.
On top of the provided methods, the more specific, region-oriented subclass is adding the methods described in Table 4-12.
The getRegion()
call can be used to get a
reference to the hosting HRegion
instance, and to invoke calls this class provides. In addition, your
code can access the shared region server services instance, which is
explained in Table 4-13.
I will not be discussing all the details on the provided functionality, and instead refer you to the Java API documentation.[62]
The ObserverContext class
For the callbacks provided by the RegionObserver
class, there is a special
context handed in as the first parameter to all calls: the ObserverContext
class. It provides access to
the current environment, but also adds the crucial ability to indicate
to the coprocessor framework what it should do after a callback is
completed.
Note
The context instance is the same for all coprocessors in the execution chain, but with the environment swapped out for each coprocessor.
Table 4-14 lists the methods as provided by the context class.
The important context functions are bypass()
and complete()
. These functions give your
coprocessor implementation the option to control the subsequent
behavior of the framework. The complete()
call influences the execution
chain of the coprocessors, while the bypass()
call stops any further default
processing on the server. Use it with the earlier example of avoiding
automated region splits like so:
@Override public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) { e.bypass(); }
Instead of having to implement your own
RegionObserver
, based on the
interface, you can use the following base class to only implement what
is needed.
The BaseRegionObserver class
This class can be used as the basis for all your
observer-type coprocessors. It has placeholders for all methods required by
the RegionObserver
interface. They
are all left blank, so by default nothing is done when extending this
class. You must override all the callbacks that you are interested in
to add the required functionality.
Example 4-20 is an observer that handles specific row key requests.
public class RegionObserverExample extends BaseRegionObserver { public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@"); @Override public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<KeyValue> results) throws IOException { if (Bytes.equals(get.getRow(), FIXED_ROW)) { KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis())); results.add(kv); } } }
Note
The following was added to the hbase-site.xml file to enable the coprocessor:
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverExample</value> </property>
The class is available to the region
server’s Java Runtime Environment because we have already added the
JAR of the compiled repository to the HBASE_CLASSPATH
variable in hbase-env.sh—see Deployment of Custom Filters for reference.
Do not forget to restart HBase, though, to make the changes to the static configuration files active.
The row key @@@GETTIME@@@
is handled by the observer’s
preGet()
hook, inserting the
current time of the server. Using the HBase Shell—after deploying the
code to servers—you can see this
in action:
hbase(main):001:0>
get 'testtable', '@@@GETTIME@@@'
COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, \ value=\x00\x00\x01/s@3\xD8 1 row(s) in 0.0410 secondshbase(main):002:0>
Time.at(Bytes.toLong( \
"\x00\x00\x01/s@3\xD8".to_java_bytes) / 1000)
=> Wed Apr 20 16:11:18 +0200 2011
This requires an existing table, because trying to issue a get call to a nonexistent table will raise an error, before the actual get operation is executed. Also, the example does not set the bypass flag, in which case something like the following could happen:
hbase(main):003:0>
create 'testtable2', 'colfam1'
0 row(s) in 1.3070 secondshbase(main):004:0>
put 'testtable2', '@@@GETTIME@@@', \
'colfam1:qual1', 'Hello there!'
0 row(s) in 0.1360 secondshbase(main):005:0>
get 'testtable2', '@@@GETTIME@@@'
COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, \ value=\x00\x00\x01/sJ\xBC\xEC colfam1:qual1 timestamp=1303309353184, value=Hello there! 2 row(s) in 0.0450 seconds
A new table is created and a row with the
special row key is inserted. Subsequently, the row is retrieved. You
can see how the artificial column is mixed with the actual one stored
earlier. To avoid this issue, Example 4-21 adds the necessary
e.bypass()
call.
Note
You need to adjust the hbase-site.xml file to point to the new example:
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RegionObserverWithBypassExample</value> </property>
Just as before, please restart HBase after making these adjustments.
As expected, and using the shell once more, the result is now different:
hbase(main):069:0>
get 'testtable2', '@@@GETTIME@@@'
COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, \ value=\x00\x00\x01/s]\x1D4 1 row(s) in 0.0470 seconds
Only the artificial column is returned, and
since the default get operation is bypassed, it is the only column
retrieved. Also note how the timestamp of this column is 9223372036854775807
—which is Long.MAX_VALUE
on purpose. Since the example
creates the KeyValue
instance
without specifying a timestamp, it is set to HConstants.
LATEST_
TIMESTAMP
by default, and that is, in turn, set to Long.MAX_VALUE
. You can amend the example by
adding a timestamp and see how that would be printed when using the
shell (an exercise left to you).
The MasterObserver Class
The second subclass of Coprocessor
discussed handles all possible
callbacks the master server may initiate. The operations and API calls
are explained in Chapter 5, though they can be
classified as data-manipulation operations, similar to DDL used in
relational database systems. For that reason, the MasterObserver
class provides the following
hooks:
void preCreateTable(...) / void postCreateTable(...)
void preDeleteTable(...) / void postDeleteTable(...)
void preModifyTable(...) / void postModifyTable(...)
void preAddColumn(...) / void postAddColumn(...)
void preModifyColumn(...) / void postModifyColumn(...)
void preDeleteColumn(...) / void postDeleteColumn(...)
void preEnableTable(...) / void postEnableTable(...)
void preDisableTable(...) / void postDisableTable(...)
void preMove(...) / void postMove(...)
void preAssign(...) / void postAssign(...)
void preUnassign(...) / void postUnassign(...)
void preBalance(...) / void postBalance(...)
boolean preBalanceSwitch(...) / void postBalanceSwitch(...)
Called before and after the flag for the balancer is changed.
void preShutdown(...)
Called before the cluster shutdown is initiated. There is no post hook, because after the shutdown, there is no longer a cluster to invoke the callback.
void preStopMaster(...)
Called before the master process is stopped. There is no post hook, because after the master has stopped, there is no longer a process to invoke the callback.
The MasterCoprocessorEnvironment class
Similar to how the RegionCoprocessorEnvironment
is enclosing a
single Region
Observer
coprocessor, the MasterCoprocessorEnvironment
is wrapping
MasterObserver
instances. It also
implements the CoprocessorEnvironment
interface, thus
giving you, for instance, access to the getTable()
call to access data from within
your own implementation.
On top of the provided methods, the more specific, master-oriented subclass adds the one method described in Table 4-15.
Your code can access the shared master services instance, the methods of which are listed and described in Table 4-16.
I will not be discussing all the details on the provided functionality, and instead refer you to the Java API documentation once more.[63]
The BaseMasterObserver class
Either you can base your efforts to implement a MasterObserver
on the interface directly, or
you can extend the BaseMasterObserver
class instead. It
implements the interface while leaving all callback functions empty.
If you were to use this class unchanged, it would not yield any kind
of reaction.
Adding functionality is achieved by overriding the appropriate event methods. You have the choice of hooking your code into the pre and/or post calls.
Example 4-22 uses the post hook after a table was created to perform additional tasks.
public class MasterObserverExample extends BaseMasterObserver { @Override public void postCreateTable( ObserverContext<MasterCoprocessorEnvironment> env, HRegionInfo[] regions, boolean sync) throws IOException { String tableName = regions[0].getTableDesc().getNameAsString(); MasterServices services = env.getEnvironment().getMasterServices(); MasterFileSystem masterFileSystem = services.getMasterFileSystem(); FileSystem fileSystem = masterFileSystem.getFileSystem(); Path blobPath = new Path(tableName + "-blobs"); fileSystem.mkdirs(blobPath); } }
Note
You need to add the following to the hbase-site.xml file for the coprocessor to be loaded by the master process:
<property> <name>hbase.coprocessor.master.classes</name> <value>coprocessor.MasterObserverExample</value> </property>
Just as before, restart HBase after making these adjustments.
Once you have activated the coprocessor, it is listening to the said events and will trigger your code automatically. The example is using the supplied services to create a directory on the filesystem. A fictitious application, for instance, could use it to store very large binary objects (known as blobs) outside of HBase.
To trigger the event, you can use the shell like so:
hbase(main):001:0>
create 'testtable', 'colfam1'
0 row(s) in 0.4300 seconds
This creates the table and afterward calls
the coprocessor’s postCreateTable()
method. The Hadoop command-line tool can be used to verify the
results:
$
bin/hadoop dfs -ls
Found 1 items drwxr-xr-x - larsgeorge supergroup 0 ... /user/larsgeorge/testtable-blobs
There are many things you can implement with
the MasterObserver
coprocessor.
Since you have access to most of the shared master resources through
the MasterServices
instance, you
should be careful what you do, as it can potentially wreak
havoc.
Finally, because the environment is wrapped
in an ObserverContext
, you have the
same extra flow controls, exposed by the bypass()
and complete()
methods. You can use them to
explicitly disable certain operations or skip subsequent coprocessor
execution, respectively.
Endpoints
The earlier RegionObserver
example used a well-known row key to add a computed column during a get
request. It seems that this could suffice to implement other
functionality as well—for example, aggregation functions that return the
sum of all values in a specific column.
Unfortunately, this does not work, as the row key defines which region is handling the request, therefore only sending the computation request to a single server. What we want, though, is a mechanism to send such a request to all regions, and therefore all region servers, so that they can build the sum of the columns they have access to locally. Once each region has returned its partial result, we can aggregate the total on the client side much more easily. If you were to have 1,000 regions and 1 million columns, you would receive 1,000 decimal numbers on the client side—one for each region. This is fast to aggregate for the final result.
If you were to scan the entire table using a purely client API approach, in a worst-case scenario you would transfer all 1 million numbers to build the sum. Moving such computation to the servers where the data resides is a much better option. HBase, though, does not know what you may need, so to overcome this limitation, the coprocessor framework provides you with a dynamic call implementation, represented by the endpoint concept.
The CoprocessorProtocol interface
In order to provide a custom RPC protocol to clients, a
coprocessor implementation defines an interface that extends CoprocessorProtocol
. The interface can
define any methods that the coprocessor wishes to expose. Using this
protocol, you can communicate with the coprocessor instances via the
following calls, provided by HTable
:
<T extends CoprocessorProtocol> T coprocessorProxy( Class<T> protocol, byte[] row) <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec( Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable) <T extends CoprocessorProtocol, R> void coprocessorExec( Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable, Batch.Callback<R> callback)
Since CoprocessorProtocol
instances are associated
with individual regions within the table, the client RPC calls must
ultimately identify which regions should be used in the CoprocessorProtocol
method invocations.
Though regions are seldom handled directly in client code and the
region names may change over time, the coprocessor RPC calls use row
keys to identify which regions should be used for the method
invocations. Clients can call CoprocessorProtocol
methods against one of
the following:
- Single region
This is done by calling
coprocessorProxy()
with a single row key. This returns a dynamic proxy of theCoprocessorProtocol
interface, which uses the region containing the given row key—even if the row does not exist—as the RPC endpoint.- Range of regions
You can call
coprocessorExec()
with a start row key and an end row key. All regions in the table from the one containing the start row key to the one containing the end row key (inclusive) will be used as the RPC endpoints.
Note
The row keys passed as parameters to the
HTable
methods are not passed to the
CoprocessorProtocol
implementations. They are only used
to identify the regions for endpoints of the remote calls.
The Batch
class defines
two interfaces used for CoprocessorProtocol
invocations against
multiple regions: clients implement Batch.Call
to call methods of the actual
Coprocessor
Protocol
instance. The interface’s call()
method will be called once per
selected region, passing the CoprocessorProtocol
instance for the region
as a parameter.
Clients can optionally implement Batch.Callback
to be notified of the results
from each region invocation as they complete. The instance’s
void update(byte[] region, byte[] row, R result)
method will be called with the value returned by R
call(T instance)
from each region.
The BaseEndpointCoprocessor class
Implementing an endpoint involves the following two steps:
Extend the
CoprocessorProtocol
interface.This specifies the communication details for the new endpoint: it defines the RPC protocol between the client and the servers.
Extend the
BaseEndpointCoprocessor
class.You need to provide the actual implementation of the endpoint by extending both the abstract
BaseEndpointCoprocessor
class and the protocol interface provided in step 1, defining your endpoint protocol.
Example 4-23
implements the CoprocessorProtocol
to add custom functions
to HBase. A client can invoke these remote calls to retrieve the
number of rows and KeyValue
s in
each region where it is running.
Step 2 is to combine this new protocol
interface with a class that also extends BaseEndpointCoprocessor
. Example 4-24 uses the environment provided to access
the data using an InternalScanner
instance.
public class RowCountEndpoint extends BaseEndpointCoprocessor implements RowCountProtocol { private long getCount(Filter filter, boolean countKeyValues) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); // use an internal scanner to perform scanning. InternalScanner scanner = environment.getRegion().getScanner(scan); int result = 0; try { List<KeyValue> curVals = new ArrayList<KeyValue>(); boolean done = false; do { curVals.clear(); done = scanner.next(curVals); result += countKeyValues ? curVals.size() : 1; } while (done); } finally { scanner.close(); } return result; } @Override public long getRowCount() throws IOException { return getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return getCount(filter, false); } @Override public long getKeyValueCount() throws IOException { return getCount(null, true); } }
Note how the FirstKeyOnlyFilter
is used to reduce the
number of columns being scanned.
Note
You need to add (or amend from the previous examples) the following to the hbase-site.xml file for the endpoint coprocessor to be loaded by the region server process:
<property> <name>hbase.coprocessor.region.classes</name> <value>coprocessor.RowCountEndpoint</value> </property>
Just as before, restart HBase after making these adjustments.
Example 4-25 showcases
how a client can use the provided calls of HTable
to execute the deployed coprocessor
endpoint functions. Since the calls are sent to each region
separately, there is a need to summarize the total number at the
end.
public class EndpointExample { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable"); try { Map<byte[], Long> results = table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Long>() { @Override public Long call(RowCountProtocol counter) throws IOException { return counter.getRowCount(); } }); long total = 0; for (Map.Entry<byte[], Long> entry : results.entrySet()) { total += entry.getValue().longValue(); System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue()); } System.out.println("Total Count: " + total); } catch (Throwable throwable) { throwable.printStackTrace(); } } }
The code emits the region names, the count for each of them, and eventually the grand total:
Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2 Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3 Total Count: 5
The Batch
class also
offers a more convenient way to access the remote endpoint: using
Batch.forMethod()
, you can retrieve
a fully configured Batch.Call
instance, ready to be sent to the region servers. Example 4-26 amends the previous example to
make use of this shortcut.
The forMethod()
call uses the Java reflection
API to retrieve the named method. The returned Batch.Call
instance will execute the
endpoint function and return the same data types as defined by the
protocol for this method.
However, if you want to perform additional
processing on the results, implementing Batch.Call
directly will provide more power
and flexibility. This can be seen in Example 4-27, which combines the row and
key-value count for each region.
Map<byte[], Pair<Long, Long>> results = table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Pair<Long, Long>>() { public Pair<Long, Long> call(RowCountProtocol counter) throws IOException { return new Pair(counter.getRowCount(), counter.getKeyValueCount()); } }); long totalRows = 0; long totalKeyValues = 0; for (Map.Entry<byte[], Pair<Long, Long>> entry : results.entrySet()) { totalRows += entry.getValue().getFirst().longValue(); totalKeyValues += entry.getValue().getSecond().longValue(); System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue()); } System.out.println("Total Row Count: " + totalRows); System.out.println("Total KeyValue Count: " + totalKeyValues);
Running the code will yield the following output:
Region: testtable,,1303420252525.9c336bd2b294a...0647a1f2d13b., Count: {2,4} Region: testtable,row3,1303420252525.6d7c95de8a7...386cfec7f2., Count: {3,6} Total Row Count: 5 Total KeyValue Count: 10
The examples so far all used the coprocessorExec()
calls to batch the
requests across all regions, matching the given start and end row
keys. Example 4-28 uses the coprocessorProxy()
call to get a
local, client-side proxy of the endpoint. Since a row key is
specified, the client API will route the proxy calls to the region—and
to the server currently hosting it—that contains the given key,
regardless of whether it actually exists: regions are specified with a
start and end key only, so the match is done by range only.
With the proxy reference, you can invoke any
remote function defined in your CoprocessorProtocol
implementation
from within client code, and it returns the result for the region that
served the request. Figure 4-5 shows the
difference between the two approaches.
HTablePool
Instead of creating an HTable
instance for every request from your client application, it makes much
more sense to create one initially and subsequently reuse them.
The primary reason for doing so is that creating
an HTable
instance is a fairly
expensive operation that takes a few seconds to complete. In a highly
contended environment with thousands of requests per second, you would not
be able to use this approach at all—creating the HTable
instance would be too slow. You need to
create the instance at startup and use it for the duration of your
client’s life cycle.
There is an additional issue with the HTable
being reused by multiple threads within
the same process.
Warning
The HTable
class is not thread-safe, that is, the local
write buffer is not guarded against concurrent
modifications. Even if you were to use setAutoFlush(true)
(which is the default
currently; see Client-side write buffer)
this is not advisable. Instead, you should use one instance of HTable
for each thread you are running in your
client application.
Clients can solve this problem using the
HTablePool
class. It only serves one
purpose, namely to pool client API instances to the HBase cluster.
Creating the pool is accomplished using one of these constructors:
HTablePool() HTablePool(Configuration config, int maxSize) HTablePool(Configuration config, int maxSize, HTableInterfaceFactory tableFactory)
The default constructor—the one without any parameters—creates a pool with the configuration found in the classpath, while setting the maximum size to unlimited. This equals calling the second constructor like so:
Configuration conf = HBaseConfiguration.create() HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE)
Setting the maxSize
parameter gives you control over how
many HTable
instances a pool is allowed
to contain. The optional tableFactory
parameter can be used to hand in a custom factory class that creates the
actual HTable
instances.
Using the pool is a matter of employing the following calls:
HTableInterface getTable(String tableName) HTableInterface getTable(byte[] tableName) void putTable(HTableInterface table)
The getTable()
calls retrieve an HTable
instance from the pool, while the
putTable()
returns it after you are
done using it. Both internally defer some of the work to the mentioned
HTableInterfaceFactory
instance the
pool is configured with.
Note
Setting the maxSize
parameter during the construction of a
pool does not impose an upper limit on the number
of HTableInterface
instances the pool
is allowing you to retrieve. You can call getTable()
as much as you like to get a valid
table reference.
The maximum size of the pool only sets the
number of HTableInterface
instances
retained within the pool, for a given table name. For example, when you
set the size to 5, but then call getTable()
10 times, you have created 10
HTable
instances (assuming you use
the default). Upon returning them using the putTable()
method, five are kept for
subsequent use, while the additional five you requested are simply
ignored. More importantly, the release mechanisms
of the factory are not invoked.
Finally, there are calls to close the pool for specific tables:
void closeTablePool(String tableName) void closeTablePool(byte[] tableName)
Obviously, both do the same thing, with one
allowing you to specify a String
, and
the other a byte
array—use whatever is
more convenient for you.
The close
call of the pool
iterates over the list of retained references for a specific table,
invoking the release mechanism provided by the factory. This is useful for
freeing all resources for a named table, and starting all over again. Keep
in mind that for all resources to be released, you
would need to call these methods for every table name you have used so
far.
Example 4-29 uses these methods to create and use a pool.
Configuration conf = HBaseConfiguration.create(); HTablePool pool = new HTablePool(conf, 5); HTableInterface[] tables = new HTableInterface[10]; for (int n = 0; n < 10; n++) { tables[n] = pool.getTable("testtable"); System.out.println(Bytes.toString(tables[n].getTableName())); } for (int n = 0; n < 5; n++) { pool.putTable(tables[n]); } pool.closeTablePool("testtable");
You should receive the following output on the console:
Acquiring tables... testtable testtable testtable testtable testtable testtable testtable testtable testtable testtable Releasing tables... Closing pool...
Note that using more than the configured maximum size of the pool works as we discussed earlier: we receive more references than were configured. Returning the tables to the pool is not yielding any logging or printout, though, doing its work behind the scenes.
Connection Handling
Every instance of HTable
requires a connection to the remote servers. This is internally
represented by the HConnection
class,
and more importantly managed process-wide by the shared HConnectionManager
class. From a user perspective, there is usually no immediate need to deal with either of these
two classes; instead, you simply create a new Configuration
instance, and use that with your
client API calls.
Internally, the connections are keyed in a map,
where the key is the Configuration
instance you are using. In other words, if you create a number of HTable
instances while providing the same
configuration reference, they all share the same underlying HConnection
instance. There are good
reasons for this to happen:
- Share ZooKeeper connections
As each client eventually needs a connection to the ZooKeeper ensemble to perform the initial lookup of where user table regions are located, it makes sense to share this connection once it is established, with all subsequent client instances.
- Cache common resources
Every lookup performed through ZooKeeper, or the
-ROOT-
, or.META.
table, of where user table regions are located requires network round-trips. The location is then cached on the client side to reduce the amount of network traffic, and to speed up the lookup process.Since this list is the same for every local client connecting to a remote cluster, it is equally useful to share it among multiple clients running in the same process. This is accomplished by the shared
HConnection
instance.In addition, when a lookup fails—for instance, when a region was split—the connection has the built-in retry mechanism to refresh the stale cache information. This is then immediately available to all other clients sharing the same connection reference, thus further reducing the number of network round-trips initiated by a client.
Another class that benefits from the same advantages is the
HTablePool
: all of the pooled HTable
instances automatically share the
provided configuration instances, and therefore also the shared connection
it references to. This also means you should always create your own
configuration, whenever you plan to instantiate more than one HTable
instance. For example:
HTable table1 = new HTable("table1"); //... HTable table2 = new HTable("table2");
is less efficient than the following code:
Configuration conf = HBaseConfiguration.create(); HTable table1 = new HTable(conf, "table1"); //... HTable table2 = new HTable(conf, "table2");
The latter implicitly uses the connection sharing, as provided by the HBase client-side API classes.
Note
There are no known performance implications for sharing a connection, even for heavily multithreaded applications.
The drawback of sharing a connection is the
cleanup: when you do not explicitly close a connection, it is kept open
until the client process exits. This can result in many connections that
remain open to ZooKeeper, especially for heavily distributed applications,
such as MapReduce jobs talking to HBase. In a worst-case scenario, you can
run out of available connections, and receive an IOException
instead.
You can avoid this problem by explicitly closing
the shared connection, when you are done using it. This is accomplished
with the close()
method provided by
HTable
. The call decreases an internal
reference count and eventually closes all shared resources, such as the
connection to the ZooKeeper ensemble, and removes the connection reference
from the internal list.
Every time you reuse a Configuration
instance, the connection manager
internally increases the reference count, so you only have to make sure
you call the close()
method to trigger
the cleanup. There is also an explicit call to clear out a connection, or all
open connections:
static void deleteConnection(Configuration conf, boolean stopProxy) static void deleteAllConnections(boolean stopProxy)
Since all shared connections are internally
keyed by the configuration instance, you need to provide that instance to
close the associated connection. The boolean stopProxy
parameter lets you further enforce the
cleanup of the entire RPC stack of the client—which is its umbilical cord to the
remote servers. Only use true
when you
do not need any further communication with the server to take
place.
The deleteAllConnections()
call only requires the
boolean stopProxy
flag; it simply
iterates over the entire list of shared connections known to the
connection manager and closes them.
If you are ever in need of using a connection explicitly, you can
make use of the getConnection()
call
like so:
Configuration newConfig = new Configuration(originalConf); HConnection connection = HConnectionManager.getConnection(newConfig); // Use the connection to your hearts' delight and then when done... HConnectionManager.deleteConnection(newConfig, true);
The advantage is that you are the sole user of that connection, but you must make sure you close it out properly as well.
[58] The various filter methods are discussed in Custom Filters.
[60] Coprocessors are a fairly recent addition to HBase, and are therefore still in flux. Check with the online documentation and issue tracking system to see what is not yet implemented, or planned to be added.
[62] The Java HBase classes are documented online at http://hbase.apache.org/apidocs/.
[63] The Java HBase classes are documented online at http://hbase.apache.org/apidocs/.
Get HBase: The Definitive Guide now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.