Apache Druid est un système de stockage de données open-source destiné à l’analytics qui peut profiter des capacités d’auto-scaling de Kubernetes de par son architecture distribuée. Cet article est inspiré de la présentation “Apache Druid Auto Scale-out/in for Streaming Data Ingestion on Kubernetes” donnée par Jinchul Kim lors du DataWorks Summit 2019 Europe à Barcelone.

Druid

Druid est un système de stockage de données orienté colonne, distributé et open source qui est fréquemment utilisé dans le cadre d’application BI/OLAP pour effectuer des requêtes analytiques sur de larges quantités de données chaudes ou froides. Druid peut être considéré comme un mélange entre un moteur d’indexation comme Apache Solr ou Elasticsearch, une base de données timeseries comme Prometheus ou OpenTSDB et une base de données OLAP. Druid est actuellement sous incubation dans la fondation Apache. Un cluster Druid est composé de plusieurs rôles/services qui sont conçus pour être scalable facilement. De part des choix d’architectures particulier, Druid est “très” rapide pour certains cas d’usages.

Le processus de “roll-up”

Druid embarque une fonctionnalité de pré-aggrégation appellée le “roll-up”. Lors de l’ingestion des données, le logiciel ne va pas enregistrer les entrées une à une mais des aggrégations de ces dernières basées sur les différents champs de la donnée comme nous pouvons le voir dans l’exemple suivant :

Le roll-up

Ces enregistrements sont stockés en tant que “segments” de sous forme de colonnes. Ces deux fonctionnalités font que Druid est très rapide pour répondre à des requêtes de types GROUP BY ou COUNT  car les résultats ont déjà été calculées à l’ingestion. L’orientation colonne des données sert à lire uniquement les fichiers concernés par la requête.

Roles

Un cluster Druid est composé de plusieurs services conçus pour tourner sur des architectures cloud et distribuées :

MiddleManager : Responsable de l’ingestion des données, il lis les entrées depuis des sources exterieures pour les transformer en segments. Il s’agit du service que nous allons essayer de scaler automatiquement avec Kubernetes.
Historical : Gère le stockage et réponds aux requêtes sur les données déjà ingérées (données “historiques”).
Broker : Reçoit les requêtes d’ingestion et de lecture des données et les réoriente vers le bon service (Historical ou Middlemanager).
Overlord : Gère l’assignation des tâches d’ingestions aux noeuds Middlemanager.
Coordinator : Gère la répartition des segments à traver les noeuds Historical du cluster.

Cela peut sembler compliqué au premier abord, mais avec un peu de pratique les rôles de Druid n’auront plus de secrets pour vous.

Auto-scaling

Druid dispose d’une fonctionnalité d’auto-scaling embarqué mais malheuresement elle est conçue pour ne fonctionner qu’avec Amazon EC2. Il y a donc un réel besoin pour les clusters Druid de pouvoir auto-scaler dans des environnements différents et sur du Kubernetes natif.

Kubernetes

Kubernetes (K8s) est un orchestrateur de containers. Si vous n’êtes pas très à l’aise avec le sujet ou que je vous souhaitez un cours de rattrapage, je vous invite à lire l’excellent article d’Arthur Busser à ce sujet “Installing Kubernetes on CentOS 7”. Un des aspects de Kubernetes qui n’est pas détaillé dans son article est sa faculté à auto-scaler les services qui tournent dessus. Voyons voir comment cela fonctionne.

Horizontal Pod Autoscaler

L’Horizontal Pod Autoscaler (HPA) est une fonctionnalité de Kubernetes qui donne aux utilisateurs la possibilité d’augmenter ou diminuer dynamiquement le nombre de pods d’un Replication Controller, d’un Deployment ou d’un ReplicaSet de manière automatique. Par défaut, cette fonctionnalité se base sur l’usage du CPU en utilisant l’API metrics.k8s.io  du metrics-server (depuis que Heapster a été déprécié) et peut être étendue à des metrics personalisées avec l’API custom.metrics.k8s.io.

L’algorithme de décision du nombre de replicas est le suivant par défaut :

Voici maintenant un exemple dans le contexte de Druid :

Les Middlemanagers sont responsables de l’ingestions sous la forme de tâches (“tasks”). Ces tasks sont réalisées par un Middlemanager une par une. On peut donc décider qu’un cluster Druid avec beaucoup de tâches en attente (tâches attendant un Middlemanager disponible) est un cluster mal dimensionné.

Avec :
currentReplicas : nombre de Middle managers, 3 pour commencer.
desiredMetricValue : nombre de tâches en attente souhaité, décidons que 5 est acceptable.
currentMetricValue : nombre de tâches en attente en ce moment, 10 pour l’exemple.

Dans ce cas là, Kubernetes va scaler le nombre de Middlemanagers en allouant 3 pods supplémentaires. Il y aura donc 6 Middlemanagers prêts à se répartir la charge.

Avec cette formule, on obtiendrait un nombre de Middlemanagers égal à zéro si aucune tâche n’est en attente. Ce n’est évident pas souhaitable et Kubernetes nous propose pour cela de définir une valeur minimum comme nous allons le voir après lors de la démo.

Custom Metrics API

Kubernetes fournit une API pour les metrics définies par l’utilisateur. Cette API peut être implémentée pour pourvoir utiliser des metrics customs dans les fonctionnalités natives de Kubernetes, ici c’est l’autoscaling avec HPA qui nous intéresse.

Il y a quelques implémentations disponibles plus communément appelées des “adapters”. Nous allons utiliser l’adapter Prometheus conçu par DirectXMan12. Prometheus est un projet de la CNCF qui est devenu le nouveau standard dans la récolte, le parsing et le stockage de metrics.

Il est également possible d’implémenter son propre Custom Metrics API à l’aide du boilerplate fourni.

Démo

Pour cette démo, j’ai déployé un cluster Kubernetes de 3 workers à l’aide du tutoriel d’Arthur. J’ai également installé un cluster Druid sur ce Kubernetes à l’aide de Helm et d’une chart associée. Helm est le package manager de Kubernetes qui simplifie énormément de choses. Il est très utile pour déployer des applications standards sur Kubernetes sans avoir à réinventer la roue.

Voici à quoi ressemble notre cluster Druid :

Comme nous pouvons le voir, nous avons 3 Middlemanagers. Nous sommes donc dans une situation nominale par rapport à notre calcul ci-dessus.

La Web UI du Coordinator peut nous le confirmer :

Pour nous lancer, nous allons utiliser le projet GitHub k8s-prom-hpa de stefanprodan parce qu’il s’agit d’un excellent point de départ pour utiliser le HPA avec des metrics de Prometheus. Il contient toutes les ressources dont nous allons avoir besoin pour ce cas d’usage.

Déployons maintenant Prometheus dans notre cluster Kubernetes :

Notre Prometheus est accessible au port que nous avons défini dans ./prometheus/prometheus-svc.yaml (31990):

Nous pouvons constater dans l’onglet “Graph” que nous avons un paquet de metrics venant de Kubernetes comme l’utilisation CPU, RAM et disques par exemple. C’est parce que les scrapers Prometheus sont configurés pour lire directement depuis l’API REST de Kubernetes avec la configuration “<kubernetes_sd_config>” comme défini dans ./prometheus/prometheus-cfg.yaml.

Il y a également des configurations pour modifier les labels et noms des metrics.

Ces metrics sont intéressantes mais pour l’instant rien ne nous permets de d’auto scaler Druid selon nos critères définis précédemment.

Nous allons donc devoir récolter des metrics depuis Druid et laisser Prometheus les scraper.

Pour ce besoin et comme il s’agit uniquement d’un POC et pas de la production, j’ai écrit un un exporter Prometheus “très” basique pour exposer une seule metric, voici le code :

On peut ensuite configurer un scraper Prometheus pour récupérer ces metrics, cela se passe dans le fichier ./prometheus/prometheus-cfg.yml qui défini le ConfigMap qui servira a générer le vrai fichier de configuration dans le container ( /etc/prometheus/prometheus.yml) :

Notons que nous demandons à Prometheus d’ajouter des labels à ces metrics.

Après avoir redémarré Prometheus on peut voir notre metric apparaître :

Nous sommes désormais prêts à deployer l’adapter Prometheus :

L’adapter requête Prometheus pour récuperer les metrics, les parses et les expose sur la Custom Metrics API. Voyons voir si l’on peut trouver notre metric :

Cela fonctionne, nous pouvons voir qu’actuellement 3 tâches sont en pending et il s’agit d’une metric Kubernetes.

Nous pouvons désormais créer l’Horizontal Pod AutoScaler, voici la manière dont il est défini :

C’est plutôt facile à comprendre, il faut simplement définir :

– Un nom pour le HPA ;

– Une cible sur laquelle le HPA s’appliquera : ici notre StatefulSet “druid-middle-manager” ;

– Un minimum et maximum acceptables de replicas : c’est grâce à cela que l’on peut éviter de scaler n’importe comment en cas de fort ou faible nombre de requêtes ;

– Une metric avec sa valeur que nous souhaitons obtenir. C’est à partir de celle-ci que le HPA calcule le nombre de replicas.

Après avoir créer le HPA, on peut décrire cette ressource Kubernetes pour voir comment elle agit :

Nous avons actuellement 4 tâches en attente, le HPA nous indique donc que c’est un nombre acceptable comparé à l’objectif (targetAverageValue) que nous avons fixé :

Maintenant, nous allons soumettre notre cluster Druid à une grosse charge en déclanchant beaucoup de tâches d’ingestion en simultanément. Après quelques secondes, voici à quoi ressemble le HPA :

Et finalement, nous obtenons :

Ça fonctionne ! Le StatefulSet a été scalé et nous avons maintenant 6 Middlemanager qui tournent et sont prêts à se répartir la charge.

Autre chose ?

Comme nous l’avons prouvé avec cette démo, l’auto-scaling de Druid avec Kubernetes est possible mais nous aurions pu mieux réaliser certaines choses. Pour commencer, nous aurions besoin d’un vrai exporter Prometheus pour les metrics de Druid. En effet celui que nous avons utilisé est très limité. Ce projet de Wikimedia a l’air très intéressant, il s’agit d’un endpoint qui utilise le http-emiter-module de Druid pour recevoir les metrics directement et les exposer au format compatible Prometheus. Cette application Python devrait être Dockerisée pour pouvoir tourner dans notre cluster Kubernetes. Le paquet Helm que nous avons utilisé pour déployer Druid aurait également besoin d’un certain nombre de modifications pour être compatible.

Le HPA que nous avons configuré a permis de scaler les Middlemanagers pour l’ingestion des données. Nous pouvons également imaginer un procédé similaire pour le requêtage de ces données. En surveillant les metrics de lecture du cluster, nous pourrions augmenter ou diminuer dynamiquement le nombre de noeud “Historical” pour pouvoir répondre à plus de reqûetes en parallèle.

Le HPA est une bonne solution pour faire de l’auto-sclaing avec Druid mais n’est pas vraiment viable dans le cadre d’un cluster Druid bare metal (non dockerisé dans Kubernetes). Espérons maitnenant que l’équipe qui maintient Druid pourra concevoir une implémentation plus flexible que celle qui existe actuellement avec EC2.