I am going to show how to split a file store as CSV inside HDFS into multiple Hive tables based on the content of each record. The context is simple. We are using Flume to collect logs from all over our datacenter through syslog. The stream is dumped into HDFS files partitioned by minute. Oozie is here listening to newly created directories and when ready, it want to distribute its content across various Hive tables, one for each log category.

For example, we want log ssh logs to go to the ssh table. If we cannot determine to which category a log record is associated, we dump it to an “xlogs” table. Later on, when appropriate new rules are added, we should be able to iterate through the “xlogs” table and dispatch its record across the appropriate tables.

What we need is to add a category property to each record arriving through syslog and dispatch it to the appropriate table based on its value. Also, it is very important to guaranty that a single event won’t be stored in multiple categories.

I have been thinking about 3 strategies:

  • Use a Hive SQL query in Oozie to read the source file and do the dispatching into Hive.
    Advantage: on single declarative statement, we could be re-generated on the fly from a template; benefit from Hive native functionalities such as custom file format;
    Disadvantage: happens after Oozie, so category is determined on a batched basis;
  • Use a Pig SQL query in Oozie to read the source file and do the dispatching into Hive. Advantage: very easy to read and express the Pig Latin query. Disadvantage: also happens after Oozie, batched mode;
  • Flume: write the category into a header and dump it with the message body.
    Advantage: simple to write; categories are known at real time, useful if the logs are also forwarded to a CEP (Complex Event Processing system) or something similar, think Yahoo! S4, Storm or even Splunk; Disadvantage: need to write some Java code, simple but with harder deployment cost; rules additions imply a restart of Flume and potentially some event loss unless the model is well defined and new rules are propagated through messaging.

I am not going to test the Flume route unless none of the previous strategy is usable. This is mainly because batch processing is fine for us and we can’t invest too much time.

The Hive route

For Hive, we need conditional functions and multiple insert (called Hive extension). Here’s some extract from the Wiki documentation.

Conditional Functions

Multiple Inserts

Note about multiple insert, you should stay inside the default database otherwise it doesn’t work with Hive below version 0.10, this include all the Cloudera distributed version as of now. The issue is documented in Hive-3465.

Clean and create tables

Flume interceptors provide us with the events timestamp and IP addresses. Additionnally, we dump the facility and severity headers obtained from the syslog protocol.

Insert content into staging table

Hive HQL

Result

Here, we are lucky to have the “COALESCE”. It behaves like a “switch…case” statement with the addition of a “default” functionality in case no match is found.

It is also quiet fast to run.

Re-dispatching xlogs

If we enrich the query with new rules, we can re-execute the same query on the “xlogs” data set. For this, we move the “xlogs” content into the staging table and simply re-run the query. For example, you could add the following statements at the top of your HQL file:

Pig Latin

With Pig, I have tried two implementations which are quite close from each others.

Using “split”

Using “filter”

Result

The two implementations are very easy to read. However, they don’t fulfil our requirements. In both scenarios, an event may be stored in more than one table. Also, there is no mechanism to determine a default category.

Note, this is probably due to my lack of experience with Pig, please fill free to leave suggestions in the comment section bellow.

One solution with Pig would have been to register an external UDF, possibly in Ruby or Python. This would probably have work without too much pain. Maybe the subject of a future post.

Conclusion

I was able to split my records with SQL. Initially, I was expecting contrary result: impossible to express in SQL and easy to do it in Pig. Again, I am sure it is possible to do it in Pig but with a deeper knowledge of Pig Latin. Flume is still a strategy I consider for the future. While a simple implementation will be fast, a correct implementation will be much harder to write.