Diviser des fichiers HDFS en plusieurs tables Hive

Diviser des fichiers HDFS en plusieurs tables Hive

By WORMS David

15 sept. 2013

Catégories : Data Engineering | Tags : Flume, HDFS, Hive, Oozie, Pig, SQL

Je vais montrer comment scinder fichier CSV stocké dans HDFS en plusieurs tables Hive en fonction du contenu de chaque enregistrement. Le contexte est simple. Nous utilisons Flume pour collecter les logs de l’ensemble de notre datacenter via syslog. Le flux est déversé dans des fichiers HDFS partitionnés par minute. Oozie surveille les répertoires nouvellement créés et, une fois prêt, on souhaite distribuer le contenu du répertoire dans plusieurs tables Hive, une pour chaque type de log.

Par exemple, nous voulons que les logs ssh se trouvent dans la table ssh. Si nous ne pouvons pas déterminer à quelle catégorie un enregistrement de log est associé, nous le stockons dans une table “xlogs”. Plus tard, lorsque de nouvelles règles sont ajoutées, nous devrions pouvoir parcourir la table “xlogs” et répartir son contenu dans les tables appropriées.

Nous avons besoin de catégoriser chaque enregistrement provenant de syslog et l’envoyer à la table appropriée en fonction de sa valeur. En outre, il est très important de garantir qu’un événement unique ne sera pas stocké dans plusieurs tables.

J’ai réfléchi à 3 stratégies :

  • Utiliser une requête SQL Hive dans Oozie pour lire le fichier source et effectuer la distribution dans Hive.
    Avantage : Avec une seule déclaration, nous pouvons regénérer les fichiers à partir d’un modèle ; bénéficier des fonctionnalités natives de Hive telles que les formats de fichier personnalisés ;
    Inconvénient : se produit après Oozie, le job est donc en batch ;
  • Utiliser une requête SQL Pig dans Oozie pour lire le fichier source et le distribuer dans Hive.
    Avantage : très facile à lire et à exprimer en Latin Query Pig.
    Inconvénient : se produit également après Oozie, en mode batch ;
  • Flume : écrire la catégorie dans un header et l’envoyer avec le corps du message.
    Avantage : simple à écrire ; les catégories sont connues en temps réel, elles sont utiles si les logs sont également transmis à un système de traitement d’événements complexes (CEP) ou similaire, pensez Yahoo ! S4, Storm ou même Splunk ;
    Inconvénient : besoin d’écrire du code Java, simple mais avec des coûts de déploiement plus difficiles ; les ajouts de règles impliquent un redémarrage de Flume et éventuellement une perte d’événements à moins que le modèle ne soit bien défini et que les nouvelles règles soient propagées par le biais de la messagerie.

Je ne vais pas tester la stratégie Flume à moins qu’aucune des stratégies précédentes ne soit utilisable. Ceci est principalement dû au fait que le traitement par lots nous convient et que nous ne pouvons pas investir trop de temps.

La stratégie Hive

Pour Hive, nous avons besoin de fonction conditionnelle ainsi que d’insertion multiple (extension Hive). Voici ce que la documentation nous dit.

Fonction Conditionnelle

if(boolean testCondition, T valueTrue, T valueFalseOrNull)
Return valueTrue when testCondition is true, returns valueFalseOrNull otherwise

COALESCE(T v1, T v2, ...)
Return the first v that is not NULL, or NULL if all v's are NULL

CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
When a = b, returns c; when a = d, return e; else return f

CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END
When a = true, returns b; when c = true, return d; else return e

Multiple Insertions

FROM raw_logs
INSERT INTO TABLE cpus PARTITION (ds=2012-10-25, ts=10) select_statement1
INSERT INTO TABLE apache PARTITION (ds=2012-10-25, ts=10) select_statement2;

Remarque sur les insertions multiples, vous devez rester dans la base de données par défaut, sinon elle ne fonctionne pas avec Hive sous la version 0.10, cela inclut désormais la version distribuée par Cloudera. Le problème est documenté dans Hive-3465.

Nettoyer et créer des tables

Flume Les intercepteurs nous fournissent l’horodatage et les adresses IP des événements. De plus, nous vidons les en-têtes de “facility” et de “criticity” obtenus à partir du protocole syslog.

DROP TABLE IF EXISTS staging; DROP TABLE IF EXISTS crond; DROP TABLE IF EXISTS xlogs;
CREATE EXTERNAL TABLE staging ( ts BIGINT, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/staging';
CREATE EXTERNAL TABLE kernel ( ts BIGINT, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/kernel';
CREATE EXTERNAL TABLE crond ( ts TIMESTAMP, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/hive/crond';
CREATE EXTERNAL TABLE xlogs ( ts TIMESTAMP, ip STRING, facility TINYINT, severity TINYINT, message STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/big/logs/db/hive/xlogs';

Insérer le contenu dans la table staging

hadoop filesystem -rm /user/big/logs/db/staging/data
hadoop filesystem -put - /user/big/logs/db/staging/data <<CSV
1360691259466,10.120.152.39,18,5,Feb 12 18:47:39 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/raidctl -S
1360691265106,10.120.152.39,18,5,Feb 12 18:47:45 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm faulty
1360692001830,10.120.152.39,4,6,Feb 12 19:00:01 su: [ID 366847 auth.info] 'su cac' succeeded for root on /dev/???
1360692001895,10.120.152.39,9,6,crond[27302]: (root) CMD (/usr/lib64/sa/sa1 1 1)
1360691267321,10.120.152.39,18,5,Feb 12 18:47:47 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm faulty
1360691267584,10.120.152.39,18,5,Feb 12 18:47:47 sudo: [ID 702911 local2.notice]   patrol : TTY=unknown ; PWD=/home/patrol ; USER=root ; COMMAND=/usr/sbin/fmadm config
CSV

Hive HQL

set source=staging;
FROM(
  select
    ts,
    ip,
    facility,
    severity,
    message,
    COALESCE(
      if(facility = 0, 'kernel', null),
      if(facility = 9 and message REGEXP '^crond.*', 'crond', null),
      'xlogs'
    ) AS category
  FROM (
    select * from ${hiveconf:source}
  ) t
) catlogs
INSERT INTO TABLE kernel SELECT ts, ip, facility, severity, message WHERE category = 'kernel'
INSERT INTO TABLE crond SELECT ts, ip, facility, severity, message WHERE category = 'crond'
INSERT INTO TABLE xlogs SELECT ts, ip, facility, severity, message WHERE category = 'xlogs' and category != '${hiveconf:source}'
;

Resultat

Ici, nous avons la chance d’avoir la COALESCE. Il se comporte comme une instruction “switch … case” avec l’ajout d’une fonctionnalité “par défaut” au cas où aucune correspondance ne serait trouvée.

C’est aussi assez rapide à exécuter.

Re-dispatching xlogs

Si nous enrichissons la requête avec de nouvelles règles, nous pouvons réexécuter la même requête sur l’ensemble de données “xlogs”. Pour cela, nous déplaçons le contenu “xlogs” dans la table de transfert et réexécutons simplement la requête. Par exemple, vous pouvez ajouter les instructions suivantes en haut de votre fichier HQL :

DFS -rm logs/db/staging/*;
DFS -mv logs/db/hive/xlogs/* logs/db/staging;

Pig Latin

Avec Pig, j’ai essayé deux implémentations qui sont assez approches l’une de l’autre.

En utilisant “split”

A = LOAD '/user/big/logs/db/staging' USING PigStorage(',') AS (ts: long, ip: chararray, facility: int, severity: int, message: chararray);
SPLIT A INTO
  kernel IF facility == 0,
  cron IF (facility == 9 and message matches '^crond.*')
;
STORE kernel INTO '/user/big/logs/db/pig/kernel' USING PigStorage(',');
STORE cron INTO '/user/big/logs/db/pig/cron' USING PigStorage(',');

En utilisant “filter”

A = LOAD '/user/big/logs/db/staging' USING PigStorage(',') AS (ts: long, ip: chararray, facility: int, severity: int, message: chararray);
kernel = FILTER A BY facility == 0;
cron = FILTER A BY facility == 9 and message matches '^crond.*';
;
STORE kernel INTO '/user/big/logs/db/pig/kernel' USING PigStorage(',');
STORE cron INTO '/user/big/logs/db/pig/cron' USING PigStorage(',');

Resultat

Les deux implémentations sont très faciles à lire. Cependant, elles ne répondent pas à nos exigences. Dans les deux scénarios, un événement peut être stocké dans plusieurs tables. En outre, il n’existe aucun mécanisme pour déterminer une catégorie par défaut.

Notez que ceci est probablement dû à mon manque d’expérience avec Pig, veuillez renseigner gratuitement les suggestions ci-dessous.

Une solution avec Pig aurait été d’enregistrer une UDF externe, éventuellement en Ruby ou en Python. Cela aurait probablement du travail sans trop de douleur. Peut-être le sujet d’un futur post.

Conclusion

J’ai pu diviser mes enregistrements avec SQL. Au départ, je m’attendais à un résultat contraire : impossible à exprimer en SQL et facile à faire dans Pig. Encore une fois, je suis sûr qu’il est possible de le faire avec Pig mais avec une connaissance plus approfondie du Latin Pig. Flume est toujours une stratégie que je considère pour l’avenir. Alors qu’une implémentation simple sera rapide, une implémentation correcte sera beaucoup plus difficile à écrire.

Canada - Morocco - France

International locations

10 rue de la Kasbah
2393 Rabbat
Canada

Nous sommes une équipe passionnées par l'Open Source, le Big Data et les technologies associées telles que le Cloud, le Data Engineering, la Data Sciencem le DevOps…

Nous fournissons à nos clients un savoir faire reconnu sur la manière d'utiliser les technologies pour convertir leurs cas d'usage en projets exploités en production, sur la façon de réduire les coûts et d'accélérer les livraisons de nouvelles fonctionnalités.

Si vous appréciez la qualité de nos publications, nous vous invitons à nous contacter en vue de coopérer ensemble.