Auto-scaling Druid with Kubernetes

Auto-scaling Druid with Kubernetes

Do you like our work......we hire!

Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.

Apache Druid is an open-source analytics data store which could leverage the auto-scaling abilities of Kubernetes due to its distributed nature and its reliance on memory. I was inspired by the talk ”Apache Druid Auto Scale-out/in for Streaming Data Ingestion on Kubernetes” by Jinchul Kim during DataWorks Summit 2019 Europe in Barcelona.

Druid

Druid is a column-oriented, open-source, distributed data store commonly used in business intelligence/OLAP applications to analyze high volumes of real-time and historical data. It can be seen as a mix between a search engine like Apache Solr and Elasticsearch, a time-series Database (TSDB) like Prometheus, OpenTSDB and an OLAP Database. It is currently incubating to become a top-level Apache project. A Druid cluster is fragmented in multiple roles/services which are designed to be highly scalable. Most importantly, because of the specificities of its architecture and implementation, Druid is really fast when it comes to specific use cases.

Rollup ingestion

Druid has a pre-aggregation feature called the “Rollup”. At ingestion time, the system will not store individual records. Instead, it stores aggregations of these records based on the dimensions of the data as we can see in this example:

Rollup

These records are stored as segments in a columnar fashion. The combination of these two features is what makes Druid so fast for analytics queries such as counts or groupBys because the data was already computed at ingestion time. The columnar orientation is also practical because it only reads the data it needs to answer the query.

Roles

A Druid installation is made of multiple services designed to run on a distributed, cloud-friendly architecture.

  • Middlemanager: Responsible for ingesting data, reading from external data sources into segments. This is the service we will focus on in this article trying to automatically scale it with Kubernetes.
  • Historical: Handle the storage and answer the queries on data already ingested (ie: historical data).
  • Broker: Receive both ingestion and querying requests and forward them to the right service (Historical or Middlemanager).
  • Overlord: Deal with the assignation of ingestion tasks to the Middlemanager nodes.
  • Coordinator: Deal with the balancing of the segments across the cluster’s Historical nodes.

It might seem complex at first sight but with a little practice of Druid, every role will seem like a no-brainer.

Auto-scaling

Druid has a built-in auto-scaling ability but unfortunately, the only implementation at the time of this writing is coupled with Amazon EC2. There is a real need for Druid clusters to support auto-scaling in complementary environments such as for other Cloud providers and on native Kubernetes platforms.

Kubernetes

Kubernetes (K8s) is a container orchestrator. If you are not too familiar with Kubernetes or wish to get a refresh, I invite you to read Arthur’s excellent article “Installing Kubernetes on CentOS 7”. One aspect of Kubernetes that is not covered in his article is the ability to do auto-scaling. Let’s see how this works.

Horizontal Pod Autoscaler

Horizontal Pod Autoscaler (HPA) is a feature that allows a user to let Kubernetes automatically increase or decrease the number of pods in a Replication Controller, Deployment or ReplicaSet. It is based by default on CPU usage using the metrics.k8s.io API provided by metrics-server (since Heapster as been deprecated) and can be extended with user-defined custom metrics using the custom.metrics.k8s.io API.

The default algorithm for deciding the number of replicas is the following:

desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]

Example in a Druid context:

The Middlemanagers are responsible for doing the ingestion in forms of tasks. As tasks are done by a Middlemanager one at a time, we can declare a Druid installation with many pending tasks (ie: tasks waiting for a Middlemanager to be available) to not be scaled properly.

With:
- currentReplicas: number of Middle managers, 3 in the beginning.
- desiredMetricValue: number of pending tasks that we want, let’s decide 5 is acceptable.
- currentMetricValue: number of pending tasks, at the moment of computation 10.

desiredReplicas = ceil[3 * (10 / 5)] = 6

In this example, Kubernetes will scale up by the Middlemanager pods times 2, resulting in 6 Middlemanagers to handle the workload.

With this formula, zero pending tasks would lead to zero replicas of the Middlemanager which is obviously not what we want. Kubernetes is able to set a hard limit minimum for us as we will see in the demo part of the article.

Custom Metrics API

Kuberenetes provides an API for user-defined metrics. This API can be implemented to serve custom metrics that can be used for Kubernetes’ built-in capabilities, in our case we are interested in using these with the HPA.

There are a few implementations available known as “adapters”. We will use the Prometheus adapter designed by DirectXMan12. Prometheus is Cloud Native Computing Foundation (CNCF) project has become a standard in terms of metrics scraping, parsing, and database.

It is also worth to mention that there is a boilerplate if you wish to implement your own Custom Metrics API.

Demo

For this demo, I deployed a 3 workers Kubernetes cluster by following Arthur’s tutorial. I also installed a Druid cluster on this Kubernetes using Helm’s Druid chart. Helm is a package manager for Kubernetes that simplifies a lot of things. It helps us deploys commons apps in Kubernetes without reinventing the wheel.

I deployed a Druid cluster using the incubating Helm chart. Here is a look at our cluster:

kubectl get pods -n druid -o wide
NAME                                READY  STATUS       RESTARTS   AGE     IP            NOMINATED NODE  READINESS GATES
druid-broker-5c6b4dd495-ppmvx       1/1    Running      2          65m     10.244.3.141  <none>          <none>
druid-coordinator-748f4fd656-vkvjq  1/1    Running      1          65m     10.244.2.133  <none>          <none>
druid-historical-0                  1/1    Running      0          65m     10.244.3.143  <none>          <none>
druid-middle-manager-0              1/1    Running      0          65m     10.244.3.144  <none>          <none>
druid-middle-manager-1              1/1    Running      0          66m   10.244.3.146  <none>          <none>
druid-middle-manager-2              1/1    Running      0          67m     10.244.3.147  <none>          <none>
druid-mysql-6764889c67-f7l5r        1/1    Running      0          65m     10.244.2.131  <none>          <none>
druid-overlord-5fcd7c49cd-nh764     1/1    Running      1          65m     10.244.3.142  <none>          <none>
druid-zookeeper-0                   1/1    Running      0          65m     10.244.2.132  <none>          <none>
druid-zookeeper-1                   1/1    Running      0          47h     10.244.3.145  <none>          <none>
druid-zookeeper-2                   1/1    Running      0          65m     10.244.1.147  <none>          <none>

As we can see, we have 3 Middlemanagers, so we are in a nominal situation with our currentReplicas in the previous calculation.

The Coordinator web UI can confirm that:

Workers in Druid Coordinator UI

To get started, we will use stefanprodan’s k8s-prom-hpa GitHub project as it is an excellent starting point for using the HPA with custom metrics from Prometheus. It contains most resources we need for this use case.

Let’s create a Prometheus deployment in our Kubernetes cluster:

kubectl create -f prometheus/
configmap/prometheus-config created
deployment.apps/prometheus created
clusterrole.rbac.authorization.k8s.io/prometheus created
serviceaccount/prometheus created
clusterrolebinding.rbac.authorization.k8s.io/prometheus created
service/prometheus created

Our Prometheus is now accessible through the configured port in ./prometheus/prometheus-svc.yaml (31990):

Prometheus UI

We can see in the “Graph” tab that we already have a lot of cool metrics from Kubernetes: CPU Usage, Memory Usage, Disk Usage, etc. It is because Prometheus’s scrapers are configured to read directly from Kubernetes’s REST API using as we can see in ./prometheus/prometheus-cfg.yaml.

There are also additional configurations to modify the labeling and names of the metrics.

These metrics are nice but there is nothing yet that would allow us to automatically scale Druid based on the pre-requisites of our previous example.

We now need to gather metrics from Druid and get Prometheus to scrape them.

For this purpose and as it is only a POC and not something I want to run in production, I wrote a really simple Prometheus exporter to expose just one metric, here is the code:

http = require('http')
axios = require('axios')

get_num_pending_tasks = () ->
  axios.get "http://#{env.HOST}:#{env.PORT}/druid/indexer/v1/pendingTasks"
    .then (response) ->
      return response.data.length

server = http.createServer (req, res) ->
  res.writeHead 200
  res.end """
    container_druid_num_pending_tasks #{await get_num_pending_tasks()}
  """
  return
server.listen 8080

Then we can configure a Prometheus scraper to fetch these metrics, this will happen in ./prometheus/prometheus-cfg.yml as the config file (/etc/prometheus/prometheus.yml) of the Prometheus deployment we launched on Kubernetes is defined with a ConfigMap:

...
- job_name: 'druid_prometheus_exporter'
  metrics_path: /
  scheme: http
  static_configs:
  - targets:
    - edge01.metal.ryba:49160
    labels:
      container_name: 'druid'
      pod_name: 'druid-middle-manager-0'
      namespace: 'druid'
...

Notice that we are telling Prometheus to add labels to these metrics.

After restarting Prometheus, we can see the metric showing up in Prometheus:

Druid pending tasks in Prometheus

Now we are ready to deploy the Prometheus adapter:

kubectl create -f custom-metrics-api/
secret/cm-adapter-serving-certs created
clusterrolebinding.rbac.authorization.k8s.io/custom-metrics:system:auth-delegator create
rolebinding.rbac.authorization.k8s.io/custom-metrics-auth-reader created
deployment.extensions/custom-metrics-apiserver create
clusterrolebinding.rbac.authorization.k8s.io/custom-metrics-resource-reader created
serviceaccount/custom-metrics-apiserver created
service/custom-metrics-apiserver created
apiservice.apiregistration.k8s.io/v1beta1.custom.metrics.k8s.io created
clusterrole.rbac.authorization.k8s.io/custom-metrics-server-resources created
clusterrole.rbac.authorization.k8s.io/custom-metrics-resource-reader created
clusterrolebinding.rbac.authorization.k8s.io/hpa-controller-custom-metrics created

The adapter queries the metrics from Prometheus, parse them and makes them available via the custom metrics API. Let’s see if we can fetch our metric:

kubectl get --raw "/api/custom.metrics.k8s.io/v1beta1/namespaces/druid/pods/*/druid_num_pending_tasks"
{"kind": "MetricValueList","apiVersion":"custom.metrics.k8s.io/v1beta1","metadata":{"selfLink":"/apis/custom.metrics.k8s.io/v1beta1/namespaces/druid/pods/%2A/druid_num_pending_tasks"},"items":[{"describedObject":{"kind":"Pod","namespace":"druid","name":"druid-middle-manager-0","apiVersion":"/__internal"},"metricName":"druid_num_pending_tasks","timestamp":"2019-04-17T13:08:45Z","value":"3"}]}

It works, we can see that the number of pending tasks (currently 3) is now a native Kubernetes metric.

We can now create the Horizontal Pod AutoScaler, here is what it looks like:

---
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
metadata:
  namespace: druid
  name: druid-mm
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: StatefulSet
    name: druid-middle-manager
  minReplicas: 3
  maxReplicas: 16
  metrics:
  - type: Pods
    pods:
      metricName: druid_num_pending_tasks
      targetAverageValue: 5

It is pretty straightforward, we just need to define the followings:

- A name for the HPA.

- A target on which it will apply: here, our druid-middle-manager StatefulSet.

- A min and max number of replicas: this is useful to avoid the HPA to scale like crazy in a direction or the other and getting things out of hands.

- A metric with it’s preferred value, from which the HPA will compute the number of preferred replicas.

A few seconds after creating the HPA, we can describe this Kubernetes resource to see how it is behaving:

We currently have 4 tasks pending, the HPA tells us that it is acceptable compared to the objective (targetAverageValue) that we have set:

kubectl describe -f druid/middlemanager-hpa.yaml
Name:                                 druid-mm
Namespace:                            druid
Labels:                               <none>
Annotations:                          <none>
CreationTimestamp:                    Wed, 17 Apr 2019 13:41:50 +0000  
Reference:                            StatefulSet/druid-middle-manager
Metrics:                              ( current / target )
  "druid_num_pending_tasks" on pods:  4 / 5
Min replicas:                         1
Max replicas:                         16
StatefulSet pods:                     3 current / 3 desired
Conditions:
  Type            Status  Reason              Message
  ----            ------  ------              --------
  AbleToScale     True    ReadyForNewScale    recommended size matches current size
  ScalingActive   True    ValidMetricFound    the HPA was able to successfully calculate a replica count from pods metric druid_num_pending_tasks
  ScalingLimited  False   DesiredWithinRange  the desired count is within the acceptable range
Events:           <none>

Now let’s try to push our Druid a bit further by triggering a lot of ingestion tasks at the same time. After a few seconds, the HPA describe should look like this:

kubectl describe hpa druid-mm -n druid
Name:                                 druid-mm
Namespace:                            druid
Labels:                               <none>
Annotations:                          <none>
CreationTimestamp:                    Wed, 17 Apr 2019 13:59:33 +0000  
Reference:                            StatefulSet/druid-middle-manager
Metrics:                              ( current / target )
  "druid_num_pending_tasks" on pods:  25 / 5
Min replicas:                         1
Max replicas:                         16
StatefulSet pods:                     3 current / 6 desired
Conditions:
  Type            Status  Reason              Message
  ----            ------  ------              --------
  AbleToScale     True    SucceededRescale    the HPA controller was able to update the target scale to 6
  ScalingActive   True    ValidMetricFound    the HPA was able to successfully calculate a replica count from pods metric druid_num_pending_tasks
  ScalingLimited  True   ScaleUpLimit  the desired replica count is increasing faster than the maximum scale rate
Events:           <none>

And finally after waiting a few more seconds:

kubectl get pods -n druid -o wide
NAME                                READY  STATUS       RESTARTS   AGE     IP            NOMINATED NODE  READINESS GATES
druid-broker-5c6b4dd495-ppmvx       1/1    Running      66         47h     10.244.3.141  <none>          <none>
druid-coordinator-748f4fd656-vkvjq  1/1    Running      1          47h     10.244.2.133  <none>          <none>
druid-historical-0                  1/1    Running      40         47h     10.244.3.143  <none>          <none>
druid-middle-manager-0              1/1    Running      3          47h     10.244.3.144  <none>          <none>
druid-middle-manager-1              1/1    Running      0          5h38m   10.244.3.148  <none>          <none>
druid-middle-manager-2              1/1    Running      0          30m     10.244.3.150  <none>          <none>
druid-middle-manager-3              1/1    Running      0          5m      10.244.3.150  <none>          <none>
druid-middle-manager-4              1/1    Running      0          4m      10.244.3.150  <none>          <none>
druid-middle-manager-5              1/1    Running      0          3m      10.244.3.150  <none>          <none>
druid-mysql-6764889c67-f7l5r        1/1    Running      0          47h     10.244.2.131  <none>          <none>
druid-overlord-5fcd7c49cd-nh764     1/1    Running      0          47h     10.244.3.142  <none>          <none>
druid-zookeeper-0                   1/1    Running      0          47h     10.244.2.132  <none>          <none>
druid-zookeeper-1                   1/1    Running      0          47h     10.244.3.145  <none>          <none>
druid-zookeeper-2                   1/1    Running      0          47h     10.244.1.147  <none>          <none>

We did it! The StatefulSet has been scaled up and we now have 6 Middlemanagers up and running to balance the load.

What’s next?

As this demo proved, the auto-scaling of Druid with Kubernetes is possible but there are a few things we could have done better. For starters, we could (and should) have a better Prometheus exporters for Druid’s metrics like the one we used in the demo is very limited. This project from Wikimedia seems interesting, it is a configurable endpoint for Druid’s http-emiter-module, receives the metrics and exposes them in a Prometheus friendly format. This python app would have to be Dockerized in order to run alongside our cluster in Kubernetes. The Helm chart we used to deploy Druid could also use a little re-work to be tweakable enough to support this configuration.

The HPA we configured scaled the MiddleManagers for ingestion data. We could also imagine a similar process for the querying of these data. By monitoring the read metrics of the cluster, we could automatically scale the Druid Historical role to serve more clients at the same time.

HPA is a good solution for auto-scaling Druid but it is not really viable for existing bare metal Druid cluster. Hopefully, the development team behind the project will come up with a more open implementation than the EC2 one.

Share this article

Canada - Morocco - France

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.

Support Ukrain