This quick How-To post will teach you how to use HBase sink(s) in Flume. There are currently two generic HBase sinks available: hbase() and attr2hbase(). The former requires one to be more explicit when providing mapping from event data to HBase record, while the latter allows adding record cells dynamically based on event data. Despite this difference, these two sinks have a lot in common.
Configuring Sink(s)
Both sinks write a single or no records into an HBase table based on a single Flume event. Both sinks have three attributes (at least) in common: table, writeBufferSize and writeToWal. These attributes control the HBase client behavior.
The attribute table is the name of the output HBase table. Yes, that means that currently one sink can be configured to output into just one table. If you desperately need to write to multiple tables you can use Flume’s native features to configure several sinks and direct each to the desired HBase table.
The writeBufferSize corresponds to the attribute of HTable with the same name and defines client-side buffer (in bytes). If writeBufferSize has a non-zero value, the HTable’s autoFlush is set to false. In this case the sink buffers data and sends them in chunks of the writeBufferSize size to the server. This decreases the number of remote invocations and helps improve performance. By default the sink sends data on every received event. While it seems obvious that one would want to specify a bigger buffer to improve performance, determining the best buffer size depends on the use-case: no data will be sent to server before the buffer is full, which may break some “near real-time” models. The default writeBufferSize for HTable is 2 MB. Another important factor to consider is that the greater client-side buffer you have the more data you might loose in case of HBase failure [11]. Please find more details about this in Reliability Issues part of this post.
The writeToWal corresponds to the attribute of the HBase Put with the same name. If set to false the Put operations will not be written into HBase’s edit log. While this means fewer operations to perform and hence much better performance, this approach isn’t reliable: every non-persistent change (HBase likes to keep as much data as it can in memory to keep things fast) will be lost in case of HBase failure [1]. One would typically want to skip writing edits to WAL in case losing some portion of data is acceptable (e.g. when doing non mission-critical log analysis) or during a bulk import (it goes much faster without writing to WAL) which can be restarted in case of failure.
HBase Sink: hbase()
As of this writing the hbase() sink has the following semantics:
hbase("table", "rowkey", "cf1", "c1", "val1"[,"cf2", "c2", "val2", ....] {, writeBufferSize=int, writeToWal=true|false})
As you can see, one is asked to provide the values to be placed into the HBase record explicitly. One has to use expressions in values of “rowkey”, “cf1”, “c1”, “val1”, etc. to make it usable. Expressions available with any event type are “%{hostname}” (or “%{host}”), “%{nanos}”, “%{priority}”, “%{body}” (and soon “%{timestamp}” [8]) which resolve to event properties. You can also fetch value of any event’s custom attribute with the help of “%{attribute_name}”. For example, with the following configuration an HBase table would be populated with records so that one record corresponds to one event with event’s nanos (event.getNanos()) as row key and two cells in event_colfam column family: body and host with values of event’s body and hostname respectively.
hbase("table", "%{nanos}", "event_colfam", "body", "%{body}", "event_colfam", "host", "%{host}")
HBase Sink: attr2hbase()
This sink was originally contributed to Flume by Sematext, as we needed it for our products that make use of Flume and HBase.
As of this writing the attr2hbase() has the following semantic:
attr2hbase("table"[,"sysFamily"[,"writeBody"[,"attrPrefix"[,"writeBufferSize" [,"writeToWal"]]]]])
The attr2hbase() sink is used to write event attributes that correspond to a particular format (name starts with a specified prefix) into an HBase record. The names of columns (qualifiers) and column families are determined dynamically based on event’s attribute names. Thus, compared to the hbase() sink, you don’t have to list all possible event attributes you want to store in HBase along with their destination column families and qualifiers (columns). Your source and/or decorators can produce any (reasonable) number of attributes, with dynamic names (e.g. depending on the values) and they will be written into HBase if attr2hbase() sink is configured correctly. In other words, with attr2hbase() one can a) define column names at run-time and b) add whatever number of columns the business logic requires (also at run-time, based on the data being processed). E.g. in case email messages are processed we can store each recipient in a separate cell in an HBase record. You may want to implement your own decorator for that (keep reading to learn how to do this) due to some limitations [9].
sysFamily holds the name of the column family that is used to store “system” data (event timestamp, host, priority). In case this parameter is absent or equals “”, the sink doesn’t write “system” data. E.g. with configuration like this:
attr2hbase( "mytable", "sysColfam")
each record will have the following cells:
sysColFam:timestamp=<event’s timestamp>, sysColFam:host=<event’s host>, sysColFam:priority=<event’s prioirty>:
hbase(main):002:0> scan 'mytable' ROW COLUMN+CELL 123 column=sysColfam:host, timestamp=1309275728725, value=testhost 123 column=sysColfam:priority, timestamp=1309275728725, value=INFO 123 column=sysColfam:timestamp, timestamp=1309275728725, value=x00x00x010xD6xEA+T
writeBody indicates whether event body should be written with other “system” data. By default, (when this parameter is absent or equals ””) the attribute body is not written. This parameter should have the “column-family:qualifier” format in order for the sink to write the body to the specific column-family:qualifier.
attrPrefix defines which attributes will be written to HBase: every attribute with the name prefixed with attrPrefix parameter’s value is written. The attribute key should be in the following format to be properly written into HBase:
“<attrPrefix><colfam>:<qual>”
The default value of attrPrefix is “2hb_”. This means that all attributes with names “2hb_<colfam>:<qual>” should be written to HBase. Attribute with key “<attrPrefix>” must contain row key for Put, otherwise, if no row can be extracted, the event is skipped and no record is written to the HBase table. The table below shows how event attributes are handled: each cell shows where the event attribute (first column) will be written based on the sink parameters (second and other cells in table header).
Event’s attr (“name”->”value”) attrPrefix=”2hb_”, sysFamily=null attrPrefix=”2hb_”, sysFamily=”sysfam” attrPrefix=””, sysFamily=”sysfam” attrPrefix=””, sysFamily=null “any”->”foo” – – sysfam:any->foo – “colfam:col”->”foo” – – colfam:col->foo colfam:col->foo “2hb_any”->”foo” – sysfam:any->foo sysfam:2hb_any->foo – “2hb_colfam:col”->”foo” colfam:col->foo colfam:col->foo 2hb_colfam:col->foo 2hb_colfam:col->foo
Example
This sink is usually used with the decorators that perform light transformation of event data into attributes with specific names. We say “light transformation” here to avoid getting into the discussion about whether Flume is meant for ETL or just for reliable data delivery. You can use different standard Flume decorators [2], like “value()”, “select()”, “regex()”, “split()” and others. For example, the following setting will make Flume write a new record into HBase table for every line in a tailed file:
'tail("/tmp/some.log")' 'split( ":", 0 , "2hb_colfam:ts") split( ":", 1 ,"2hb_colfam:value") format("%{nanos}:") split(":", 0, "2hb_") attr2hbase( "mytable", "", "", "2hb_", "1000", "false" )'
NOTE: we have to use format() + split() decorators as workaround for FLUME-676 [3]: value() sink doesn’t support EL values. So with format() we put %{nanos} into the body and then from there we put it into attribute with name “2hb_”. The written records will have row key whose value will hold the nanos of the event timestamp (“2hb_” attribute value) and two values pased from log line. Here’s the example output:
$ echo "2238947398:56" >> /tmp/some.log $ hbase shell hbase(main):025:0> scan 'mytable' ROW COLUMN+CELL 9656555664717551 column=colfam:ts, timestamp=1308748686334, value=2238947398 9656555664717551 column=colfam:value, timestamp=1308748686334, value=56
Custom Decorator
The attr2hbase sink is often used with custom decorator that “prepares” events so that they contain attributes ready to be written into an HBase table. Implementing a custom decorator is very simple. Example code:
public class CustomDecorator<S extends EventSink> extends EventSinkDecorator<S> { public CustomDecorator(String param) { super(null); // TODO: do some initialization } public void append(Event e) throws IOException { // TODO: transform event e here super.append(e); } public static SinkFactory.SinkDecoBuilder builder() { return new SinkFactory.SinkDecoBuilder() { @Override public EventSinkDecorator<EventSink> build(Context context, String... argv) { if (argv.length != 1) { throw new IllegalArgumentException(" usage: CustomDecorator("param")"); } return new CustomDecorator<EventSink>(argv[0]); } }; } public static List<Pair<String, SinkFactory.SinkDecoBuilder>> getDecoratorBuilders() { return Arrays.asList(new Pair<String, SinkFactory.SinkDecoBuilder>("CustomDec", builder())); } }
To make your custom decorator visible to Flume you need to register it in flume-site.xml:
<configuration> <property> <name>flume.plugin.classes</name> <value>your.own.CustomDecorator</value> </property> ... </configuration>
Reliability Issues
There are certain reliability details you will want to think about when using HBase sinks.
Temporary HBase Connection Loss Requires Sink Restart
If default HBase configuration is used, when connection from sink node to HBase cluster breaks for several minutes (e.g. network problems or cluster maintenance downtime) the sink stops working. Unfortunately, it does not recover after cluster becomes accessible again. To fix this behavior you can adjust configuration properties “hbase.client.pause” and “hbase.client.retries.number” in hbase-site.xml which should be in Flume’s classpath. The default behavior (i.e. the issue) is going to be fixed in a future release [10].
Loss of Data Buffered on Client-side
Despite using reliable data delivery approaches (like agentE2ESink), there is a loss of data possible if client-side buffer is used (i.e. if writeBufferSize > 0) when failure happens during the buffer flushing [11]. This occurs because the ACK about data being received is sent before the actual write to an HBase table is done – writes are performed only on buffer flush, not on per-event basis. Thus, in case of a write failure during flushing data from the client-side will be lost as ACKs will fool Flume into thinking data have already been persisted in HBase.
Other Considerations
Row Key
As usually, one of the most important design decisions when using HBase is around row key format/values. In initial versions of HBase sinks the default row key was %{nanos}, as it seems to be unique and records seems to be ordered by arrival time. However, we suggest you consider using a different row key value/format, and here’s why. Usually when users import data into HBase they use either some kind of UUID or a timestamp (inverted) based approach. The former helps achieve better performance by distributing write load between multiple regions. The latter is good when one needs fast scans of imported data based on time ranges and when import load is bearable. However, when we use nanos, which are currently obtained from System.nanoTime() Java call, we don’t achieve any of these advantages: keys from each event-producer go in sequence, so all write load is distributed over just a few regions and region servers at a time [12], and System.nanoTime() is not really suitable for use in scans for fetching data from a time interval. Moreover, we cannot even rely on records being written from different Flume nodes being ordered by time of arrival, as nanos are not necessarily in sync across JVMs. Thus, it may make more sense to use pure random row keys (although Flume has no %{random} or anything similar, as far as we know) or a timestamp (inverted) [8] (along with [12] if that is possible). There might be more work needed with timestamp-based approach: there is a possibility to get events with the exact same timestamp during high load. Thus, adding an extra hash would help here (or even using “%{timstamp}%{nanos}”).
Setup Instructions
The easiest way to install Flume is to use CDH3 [4]. Then you need to add flume-plugin-hbasesink jar into flume lib dir. You can compile it from Flume sources [5] or download a compiled jar [6]. You also need to add HBase jar into Flume’s lib dir. It can be downloaded from Maven repositories [7] or copied from your HBase setup dir (so that the jar used on client-side by Flume sink is the same as the one on the server-side, which is important). The last step is to add plugin(s) to Flume’s configuration file (flume-site.xml):
<configuration> <property> <name>flume.plugin.classes</name> <value>com.cloudera.flume.hbase.HBaseSink,com.cloudera.flume.hbase.Attr2HBaseEventSink</value> </property> ... </configuration>
As both HBase and Flume use Zookeeper, it makes sense to share the Zookeeper quorum, too.
If you read this far, you may want to know that we are hiring and are happy users of and contributors to Flume, HBase, and a few other projects.
[1] http://www.larsgeorge.com/2010/01/hbase-architecture-101-write-ahead-log.html
[2] http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_sink_decorator_catalog
[3] https://issues.cloudera.org/browse/FLUME-676
[4] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation
[5] https://github.com/abaranau/flume-plugin-hbasesink-compiled
[6] https://github.com/cloudera/flume
[7] e.g. https://repository.cloudera.com/content/groups/public/org/apache/hbase/hbase/
[8] https://issues.cloudera.org/browse/FLUME-688
[9] https://issues.cloudera.org/browse/FLUME-689
[10] https://issues.cloudera.org/browse/FLUME-685
[11] https://issues.cloudera.org/browse/FLUME-390
[12] https://github.com/sematext/HBaseWD