Deux Hive UDAF pour convertir une aggregation vers une map

Deux Hive UDAF pour convertir une aggregation vers une map

By WORMS David

6 mars 2012

Catégories : Data Engineering | Tags : Analytique, HBase, HDFS, Hive

Je publie deux nouvelles fonctions UDAF pour Hive pour aider avec les map dans Apache Hive. Le code source est disponible sur GitHub dans deux classes Java : “UDAFToMap” et “UDAFToOrderedMap” ou vous pouvez télécharger le fichier jar. La première fonction convertit une agrégation en une map et utilise en interne un HashMap Java. La deuxième fonction étend la première. Elle convertit une agrégation en une sorted map et utilise en interne un TreeMap Java.

API

Pour rappel, un UDF représente une fonction définie par l’utilisateur et un UDAF correspond à la fonction d’agrégation définie par l’utilisateur. Tandis qu’une UDF est une fonction scalaire sur une ou plusieurs colonnes d’une même ligne (par exemple, la fonction CONCAT en SQL), une UDAF fonctionne sur une agrégation d’une ou de plusieurs colonnes (par exemple, la fonctionMAX dans SQL ) sur plusieurs lignes.

MAP to_map(primitive, primitive_or_complex)
MAP to_ordered_map(primitive, primitive_or_complex)

Les deux fonctions partagent la même API. Ils acceptent deux arguments, le premier est la clé et le second est la valeur. Les clés peuvent être n’importe quel type primitif. Les valeurs peuvent être un type primitif ou complexe. Dans Hive, les types primitifs sont les nombres entier, booleen, float, string, date et binaire, tandis que les types complexes sont structure, map et array.

Motivation

En travaillant sur des séries temporelles dans Hive, j’ai créé ces fonctions pour répondre aux besoins suivants :

  • Étant donné que les fichiers UDF sont beaucoup plus simples à implémenter qu’une UDAF, il est pratique d’écrire et d’utiliser un fichier UDF prenant le résultat d’un fichier générique UDAF en entrée, par exemple :
    sélectionnez MY_UDF (MY_UDAF (colonne_1, colonne_2)) dans le groupe my_table par la colonne_3.
  • La base de donnée HBase que nous modélisons utilise des clés pour stocker les identifiants client et les colonnes pour stocker les dates. Importer des données de Hive vers HBase peut être effectué de deux manières : avec une instruction select ou avec une stratégie compliquée de chargement en bloc plutôt bas niveau. L’approche par select utilise une map et nous n’avons trouvé aucun moyen de convertir le résultat d’une agrégation en une map. Nous avons dû insérer les données ligne par ligne qui, à notre avis, étaient moins efficaces. En utilisant une stratégie de chargement en masse, nous avons également pû regrouper notre ensemble de données dans une sorted map afin de refléter la mise en page finale du format de fichier de stockage HBase.
  • Nous expérimentons un format de fichier HDFS personnalisé intégré à Hive, qui doit en interne regrouper les données de mesure d’un client pour optimiser son stockage. Nous avons d’abord écrit une implémentation qui prend en entrée un ensemble de résultats ordonné. Cela fonctionne mais l’implémentation est plus compliquée. De plus, en raison de la nature de HDFS et de MapReduce, nous n’avions aucune garantie que toutes les données d’un seul client seraient stockées dans un même fichier, ce qui nous empêcherait une optimisation complète. À nouveau, il était indispensable de structurer les données sous forme de map.

Utilisation

Considérant un ensemble de données sources composé de 4 colonnes comme suit (ID client, horodatage, valeur du compteur, état du compteur) :

195100,1199145600,607527807,B
185100,1199145600,775031942,A
195100,1199156400,607532682,B
185100,1199156400,775032584,A
195100,1199167200,607535485,B
185100,1199167200,775033200,A
195100,1199178000,582924326,C
185100,1199178000,775034241,A
195100,1199188800,582927007,C
185100,1199188800,775035698,A
195100,1199199600,582929212,C
185100,1199199600,775036891,A
195100,1199210400,582932070,C
185100,1199210400,775038268,A
195100,1199221200,582935353,B
185100,1199221200,775039703,A

La source CSV est importée dans Hive avec les déclarations suivantes :

DROP TABLE source;
CREATE TABLE source (
    customer INT,
    emission INT,
    value INT,
    state STRING
)
ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH './sample/data.csv' OVERWRITE INTO TABLE source;

Nous pouvons maintenant déclarer nos deux UDAF :

ADD JAR ./target/adaltas-hive-udf-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION to_map as 'com.adaltas.UDAFToMap';
CREATE TEMPORARY FUNCTION to_ordered_map as 'com.adaltas.UDAFToOrderedMap';

Et nous pouvons enfin les utiliser avec les requêtes suivantes :

 # HashMap implementation
SELECT 
    `customer`, to_map(from_unixtime(emission), array(value,state)) 
FROM `source` 
GROUP BY `customer`;
 # Ordered HashMap implementation
SELECT 
    `customer`, to_ordered_map(from_unixtime(emission), array(value,state)) 
FROM `source` 
GROUP BY `customer`;

La sortie de la dernière instruction select ressemble à ceci :

185100  {
    "2008-01-01 01:00:00":[fusion_builder_container hundred_percent="yes" overflow="visible"][fusion_builder_row][fusion_builder_column type="1_1" background_position="left top" background_color="" border_size="" border_color="" border_style="solid" spacing="yes" background_image="" background_repeat="no-repeat" padding="" margin_top="0px" margin_bottom="0px" class="" id="" animation_type="" animation_speed="0.3" animation_direction="left" hide_on_mobile="no" center_content="no" min_height="none"]["775031942","A"],
    "2008-01-01 04:00:00":["775032584","A"],
    "2008-01-01 07:00:00":["775033200","A"],
    "2008-01-01 10:00:00":["775034241","A"],
    "2008-01-01 13:00:00":["775035698","A"],
    "2008-01-01 16:00:00":["775036891","A"],
    "2008-01-01 19:00:00":["775038268","A"],
    "2008-01-01 22:00:00":["775039703","A"] }
195100  {
    "2008-01-01 01:00:00":["607527807","B"],
    "2008-01-01 04:00:00":["607532682","B"],
    "2008-01-01 07:00:00":["607535485","B"],
    "2008-01-01 10:00:00":["582924326","C"],
    "2008-01-01 13:00:00":["582927007","C"],
    "2008-01-01 16:00:00":["582929212","C"],
    "2008-01-01 19:00:00":["582932070","C"],
    "2008-01-01 22:00:00":["582935353","B"]}

Si vous avez cloné notre repository hive-udf sur GitHub, vous pouvez exécuter l’exemple ci-dessus avec la commande mvn install && hive -f sample/to_map.hive.

En écrivant cet article, j’ai également publié le ticket JIRA HIVE-2843 proposant l’intégration du code source dans Hive.

Canada - Morocco - France

International locations

10 rue de la Kasbah
2393 Rabbat
Canada

Nous sommes une équipe passionnées par l'Open Source, le Big Data et les technologies associées telles que le Cloud, le Data Engineering, la Data Sciencem le DevOps…

Nous fournissons à nos clients un savoir faire reconnu sur la manière d'utiliser les technologies pour convertir leurs cas d'usage en projets exploités en production, sur la façon de réduire les coûts et d'accélérer les livraisons de nouvelles fonctionnalités.

Si vous appréciez la qualité de nos publications, nous vous invitons à nous contacter en vue de coopérer ensemble.