Apache Druid is an open-source analytics data store which could leverage the auto-scaling abilities of Kubernetes due to its distributed nature and its reliance on memory. I was inspired by the talk “Apache Druid Auto Scale-out/in for Streaming Data Ingestion on Kubernetes” by Jinchul Kim during DataWorks Summit 2019 Europe in Barcelona.

Druid

Druid is a column-oriented, open-source, distributed data store commonly used in business intelligence/OLAP applications to analyze high volumes of real-time and historical data. It can be seen as a mix between a search engine like Apache Solr and Elasticsearch, a time-series Database (TSDB) like Prometheus, OpenTSDB and an OLAP Database. It is currently incubating to become a top-level Apache project. A Druid cluster is fragmented in multiple roles/services which are designed to be highly scalable. Most importantly, because of the specificities of its architecture and implementation, Druid is *really* fast when it comes to specific use cases.

Rollup ingestion

Druid has a pre-aggregation feature called the “Rollup”. At ingestion time, the system will not store individual records. Instead, it stores aggregations of these records based on the dimensions of the data as we can see in this example:

The roll-up process

These records are stored as segments in a columnar fashion. The combination of these two features is what makes Druid so fast for analytics queries such as counts or groupBys because the data was already computed at ingestion time. The columnar orientation is also practical because it only reads the data it needs to answer the query.

Roles

A Druid installation is made of multiple services designed to run on a distributed, cloud-friendly architecture.

– Middlemanager: Responsible for ingesting data, reading from external data sources into segments. This is the service we will focus on in this article trying to automatically scale it with Kubernetes.
– Historical: Handle the storage and answer the queries on data already ingested (ie: historical data).
– Broker: Receive both ingestion and querying requests and forward them to the right service (Historical or Middlemanager).
– Overlord: Deal with the assignation of ingestion tasks to the Middlemanager nodes.
– Coordinator: Deal with the balancing of the segments across the cluster’s Historical nodes.

It might seem complex at first sight but with a little practice of Druid, every role will seem like a no-brainer.

Auto-scaling

Druid has a built-in auto-scaling ability but unfortunately, the only implementation at the time of writing is coupled with Amazon EC2. There is a real need for Druid clusters to support auto-scaling in complementary environments such as for other Cloud providers and on native Kubernetes platforms.

Kubernetes

Kubernetes (K8s) is a container orchestrator. If you are not too familiar with Kubernetes or wish to get a refresh, I invite you to read Arthur’s excellent article “Installing Kubernetes on CentOS 7”. One aspect of Kubernetes that is not covered in his article is the ability to do auto-scaling. Let’s see how this works.

Horizontal Pod Autoscaler

Horizontal Pod Autoscaler (HPA) is a feature that allows a user to let Kubernetes automatically increase or decrease the number of pods in a Replication Controller, Deployment or ReplicaSet. It is based by default on CPU usage using the metrics.k8s.io API provided by metrics-server (since Heapster as been deprecated) and can be extended with user-defined custom metrics using the custom.metrics.k8s.io API.

The default algorithm for deciding the number of replicas is the following:

Example in a Druid context:

The Middlemanagers are responsible for doing the ingestion in forms of tasks. As tasks are done by a Middlemanager one at a time, we can declare a Druid installation with many pending tasks (ie: tasks waiting for a Middlemanager to be available) to not be scaled properly.

With:
currentReplicas : number of Middle managers, 3 in the beginning.
desiredMetricValue : number of pending tasks that we want, let’s decide 5 is acceptable.
currentMetricValue : number of pending tasks, at the moment of computation 10.

In this example, Kubernetes will scale up by the Middlemanager pods times 2, resulting in 6 Middlemanagers to handle the workload.

With this formula, zero pending tasks would lead to zero replicas of the Middlemanager which is obviously not what we want. Kubernetes is able to set a hard limit minimum for us as we will see in the demo part of the article.

Custom Metrics API

Kuberenetes provides an API for user-defined metrics. This API can be implemented to serve custom metrics that can be used for Kubernetes’ built-in capabilities, in our case we are interested in using these with the HPA.

There are a few implementations available known as “adapters”. We will use the Prometheus adapter designed by DirectXMan12. Prometheus is Cloud Native Computing Foundation (CNCF) project has become a standard in terms of metrics scraping, parsing and database.

It is also worth to mention that there is a boilerplate if you wish to implement your own Custom Metrics API.

Demo

For this demo, I deployed a 3 workers Kubernetes cluster by following Arthur’s tutorial. I also installed a Druid cluster on this Kubernetes using Helm’s Druid chart. Helm is a package manager for Kubernetes that simplifies a lot of things. It helps us deploys commons apps in Kubernetes without reinventing the wheel.

I deployed a Druid cluster using the incubating Helm chart. Here is a look at our cluster:

 

As we can see, we have 3 Middlemanagers, so we are in a nominal situation with our currentReplicas in the previous calculation.

The Coordinator web UI can confirm that:

To get started, we will use stefanprodan’s k8s-prom-hpa GitHub project as it is an excellent starting point for using the HPA with custom metrics from Prometheus. It contains most resources we need for this use case.

Let’s create a Prometheus deployment in our Kubernetes cluster:

Our Prometheus is now accessible through the configured port in ./prometheus/prometheus-svc.yaml (31990):

We can see in the “Graph” tab that we already have a lot of cool metrics from Kubernetes: CPU Usage, Memory Usage, Disk Usage, etc. It is because Prometheus’s scrapers are configured to read directly from Kubernetes’s REST API using “<kubernetes_sd_config>” as we can see in ./prometheus/prometheus-cfg.yaml.

There are also additional configurations to modify the labeling and names of the metrics.

These metrics are nice but there is nothing yet that would allow us to automatically scale Druid based on the pre-requisites of our previous example.

We now need to gather metrics from Druid and get Prometheus to scrape them.

For this purpose and as it is only a POC and not something I want to run in production, I wrote a really simple Prometheus exporter to expose just one metric, here is the code:

Then we can configure a Prometheus scraper to fetch these metrics, this will happen in ./prometheus/prometheus-cfg.yml as the config file (/etc/prometheus/prometheus.yml) of the Prometheus deployment we launched on Kubernetes is defined with a ConfigMap:

Notice that we are telling Prometheus to add labels to these metrics.

After restarting Prometheus, we can see the metric showing up in Prometheus:

Now we are ready to deploy the Prometheus adapter:

The adapter queries the metrics from Prometheus, parse them and makes them available via the custom metrics API. Let’s see if we can fetch our metric:

It works, we can see that the number of pending tasks (currently 3) is now a native Kubernetes metric.

We can now create the Horizontal Pod AutoScaler, here is what it looks like:

It is pretty straightforward, we just need to define the followings:

– A name for the HPA.

– A target on which it will apply: here, our druid-middle-manager StatefulSet.

– A min and max number of replicas: this is useful to avoid the HPA to scale like crazy in a direction or the other and getting things out of hands.

– A metric with it’s preferred value, from which the HPA will compute the number of preferred replicas.

A few seconds after creating the HPA, we can describe this Kubernetes resource to see how it is behaving:

We currently have 4 tasks pending, the HPA tells us that it is acceptable compared to the objective (targetAverageValue) that we have set:

Now let’s try to push our Druid a bit further by triggering a lot of ingestion tasks at the same time. After a few seconds, the HPA describe should look like this:

And finally after waiting a few more seconds:

We did it! The StatefulSet has been scaled up and we now have 6 Middlemanagers up and running to balance the load.

What’s next?

As this demo proved, the auto-scaling of Druid with Kubernetes is possible but there are a few things we could have done better. For starters, we could (and should) have a better Prometheus exporters for Druid’s metrics like the one we used in the demo is very limited. This project from Wikimedia seems interesting, it is a configurable endpoint for Druid’s http-emiter-module, receives the metrics and exposes them in a Prometheus friendly format. This python app would have to be Dockerized in order to run alongside our cluster in Kubernetes. The Helm chart we used to deploy Druid could also use a little re-work to be tweakable enough to support this configuration.

The HPA we configured scaled the MiddleManagers for ingestion data. We could also imagine a similar process for the querying of these data. By monitoring the read metrics of the cluster, we could automatically scale the Druid Historical role to serve more clients at the same time.

HPA is a good solution for auto-scaling Druid but it is not really viable for existing bare metal Druid cluster. Hopefully, the development team behind the project will come up with a more open implementation than the EC2 one.