Splitting HDFS files into multiple hive tables

Splitting HDFS files into multiple hive tables

David WORMS

By David WORMS

Sep 15, 2013

Categories
Data Engineering
Tags
Flume
Pig
HDFS
Hive
Oozie
SQL
[more]
Do you like our work......we hire!

Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.

I am going to show how to split a CSV file stored inside HDFS as multiple Hive tables based on the content of each record. The context is simple. We are using Flume to collect logs from all over our datacenter through syslog. The stream is dumped into HDFS files partitioned by minute. Oozie is here listening to newly created directories and when ready, it wants to distribute its content across various Hive tables, one for each log category.

For example, we want ssh logs to go to the ssh table. If we cannot determine to which category a log record is associated, we dump it to an “xlogs” table. Later on, when appropriate new rules are added, we should be able to iterate through the “xlogs” table and dispatch its record across the appropriate tables.

What we need is to add a category property to each record arriving through syslog and dispatch it to the appropriate table based on its value. Also, it is very important to guaranty that a single event won’t be stored in multiple categories.

I have been thinking about 3 strategies:

  • Use a Hive SQL query in Oozie to read the source file and do the dispatching into Hive.
    Advantage: with a single declarative statement, we could re-generate files on the fly from a template; benefit from Hive native functionalities such as the custom file formats;
    Disadvantage: happens after Oozie, so category is determined on a batched basis;
  • Use a Pig SQL query in Oozie to read the source file and do the dispatching into Hive.
    Advantage: very easy to read and express the Pig Latin query.
    Disadvantage: also happens after Oozie, batched mode;
  • Flume: write the category into a header and dump it with the message body.
    Advantage: simple to write; categories are known at real time, useful if the logs are also forwarded to a CEP (Complex Event Processing system) or something similar, think Yahoo! S4, Storm or even Splunk;
    Disadvantage: need to write some Java code, simple but with harder deployment cost; rules additions imply a restart of Flume and potentially some event loss unless the model is well defined and new rules are propagated through messaging.

I am not going to test the Flume route unless none of the previous strategy is usable. This is mainly because batch processing is fine for us and we can’t invest too much time.

The Hive route

For Hive, we need conditional functions and multiple insert (called Hive extension). Here’s some extract from the Wiki documentation.

Conditional Functions

if(boolean testCondition, T valueTrue, T valueFalseOrNull)
Return valueTrue when testCondition is true, returns valueFalseOrNull otherwise

COALESCE(T v1, T v2, ...)
Return the first v that is not NULL, or NULL if all v's are NULL

CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
When a = b, returns c; when a = d, return e; else return f

CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END
When a = true, returns b; when c = true, return d; else return e

Multiple Inserts

FROM raw_logs
INSERT INTO TABLE cpus PARTITION (ds=2012-10-25, ts=10) select_statement1
INSERT INTO TABLE apache PARTITION (ds=2012-10-25, ts=10) select_statement2;

Note about multiple insert, you should stay inside the default database otherwise it doesn’t work with Hive below version 0.10, this includes all the Cloudera distributed version as of now. The issue is documented in Hive-3465.

Clean and create tables

Flume interceptors provide us with the events timestamp and IP addresses. Additionnally, we dump the facility and severity headers obtained from the syslog protocol.

DROP TABLE IF EXISTS staging; DROP TABLE IF EXISTS crond; DROP TABLE IF EXISTS xlogs;
CREATE EXTERNAL TABLE staging ( ts BIGINT, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/staging';
CREATE EXTERNAL TABLE kernel ( ts BIGINT, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/kernel';
CREATE EXTERNAL TABLE crond ( ts TIMESTAMP, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/hive/crond';
CREATE EXTERNAL TABLE xlogs ( ts TIMESTAMP, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/hive/xlogs';

Insert content into staging table

hadoop filesystem -rm /user/big/logs/db/staging/data
hadoop filesystem -put - /user/big/logs/db/staging/data <<CSV
1360691259466,10.120.152.39,18,5,Feb 12 18:47:39 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/raidctl -S
1360691265106,10.120.152.39,18,5,Feb 12 18:47:45 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm faulty
1360692001830,10.120.152.39,4,6,Feb 12 19:00:01 su: [ID 366847 auth.info] 'su cac' succeeded for root on /dev/???
1360692001895,10.120.152.39,9,6,crond[27302]: (root) CMD (/usr/lib64/sa/sa1 1 1)
1360691267321,10.120.152.39,18,5,Feb 12 18:47:47 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm faulty
1360691267584,10.120.152.39,18,5,Feb 12 18:47:47 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm config
CSV

Hive HQL

set source=staging;
FROM(
  select
    ts,
    ip,
    facility,
    severity,
    message,
    COALESCE(
      if(facility = 0, 'kernel', null),
      if(facility = 9 and message REGEXP '^crond.*', 'crond', null),
      'xlogs'
    ) AS category
  FROM (
    select * from ${hiveconf:source}
  ) t
) catlogs
INSERT INTO TABLE kernel SELECT ts, ip, facility, severity, message WHERE category = 'kernel'
INSERT INTO TABLE crond SELECT ts, ip, facility, severity, message WHERE category = 'crond'
INSERT INTO TABLE xlogs SELECT ts, ip, facility, severity, message WHERE category = 'xlogs' and category != '${hiveconf:source}'
;

Result

Here, we are lucky to have the COALESCE. It behaves like a “switch…case” statement with the addition of a “default” functionality in case no match is found.

It is also quiet fast to run.

Re-dispatching xlogs

If we enrich the query with new rules, we can re-execute the same query on the “xlogs” data set. For this, we move the “xlogs” content into the staging table and simply re-run the query. For example, you could add the following statements at the top of your HQL file:

DFS -rm logs/db/staging/*;
DFS -mv logs/db/hive/xlogs/* logs/db/staging;

Pig Latin

With Pig, I have tried two implementations which are quite close from each others.

Using “split”

A = LOAD '/user/big/logs/db/staging' USING PigStorage(',') AS (ts: long, ip: chararray, facility: int, severity: int, message: chararray);
SPLIT A INTO
  kernel IF facility == 0,
  cron IF (facility == 9 and message matches '^crond.*')
;
STORE kernel INTO '/user/big/logs/db/pig/kernel' USING PigStorage(',');
STORE cron INTO '/user/big/logs/db/pig/cron' USING PigStorage(',');

Using “filter”

A = LOAD '/user/big/logs/db/staging' USING PigStorage(',') AS (ts: long, ip: chararray, facility: int, severity: int, message: chararray);
kernel = FILTER A BY facility == 0;
cron = FILTER A BY facility == 9 and message matches '^crond.*';
;
STORE kernel INTO '/user/big/logs/db/pig/kernel' USING PigStorage(',');
STORE cron INTO '/user/big/logs/db/pig/cron' USING PigStorage(',');

Result

The two implementations are very easy to read. However, they don’t fulfil our requirements. In both scenarios, an event may be stored in more than one table. Also, there is no mechanism to determine a default category.

Note, this is probably due to my lack of experience with Pig, please fill free to leave suggestions in the comment section bellow.

One solution with Pig would have been to register an external UDF, possibly in Ruby or Python. This would probably have work without too much pain. Maybe the subject of a future post.

Conclusion

I was able to split my records with SQL. Initially, I was expecting contrary result: impossible to express in SQL and easy to do it in Pig. Again, I am sure it is possible to do it in Pig but with a deeper knowledge of Pig Latin. Flume is still a strategy I consider for the future. While a simple implementation will be fast, a correct implementation will be much harder to write.

Share this article

Canada - Morocco - France

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.

Support Ukrain