Deploying a secured Flink cluster on Kubernetes

When deploying secured Flink applications inside Kubernetes, you are faced with two choices. Assuming your Kubernetes is secure, you may rely on the underlying platform or rely on Flink native solutions to secure your application from the inside. Note, those two solutions are not mutually exclusive.

Relying on Kubernetes native security

Kubernetes definition templates for Flink are available online and provide a reliable starting point. The JobManager deployment maintains 1 replica. Kubernetes is in charge of redeployment in case of failure.

Network isolation

By default, all Pods in a cluster may communicate freely with each other. Said differently, when a Pod is started, it is assigned a unique IP which is accessible from by the other Pods. By using Network Policies, you can isolate the services running in Pods from each other. A network policy is a set of network traffic rules applied to a given group of Pods in a Kubernetes cluster. You can restrict the access within a namespace or for the Pods which constitute an application. With Network Policies enforced, your application will be isolated from the outside world.

Encryption and authentication

However, the communications internal to the cluster will be sent as is, the same applies to the outside world, and you are left with no authentication. To encrypt all communications with SSL, you may then introduce a service mesh like Istio which will deploy quite transparently while providing additional benefits you may, or may not need including load balancing, monitoring or resiliency features (retry, timeout, deadlines, …). The usage of a service mesh is particularly attractive when your application runs on a zero-trust network.

Also, access to the application from the outside provides no authentication and you are left with the requirement to provide a service to authenticate accesses, which may also be implemented with a service mesh solution, or deny all external and direct access your application and deploy a trusted Pod in charge of proxying the communications.

The latest is similar to the approach described by Marc Rooding and Niels Dennissen from ING in their talk Automating Flink Deployments to Kubernetes where the CI/CD chain pushed deployment instructions and resources to a trusted Pod deployed along their Flink application.

Relying on Flink native security features

In the second approach, Edward Alexander Rojas Clavijo presented in hist talk Deploying a secured Flink cluster on Kubernetes how to integrate Flink native solutions. While fairly straightforward in a Hadoop YARN environment, both SSL and Kerberos present some challenges due to the dynamic nature of the IP and DNS assignment inside a Kubernetes cluster.

Architecture

The architecture described below involves Kafka as a source and a sink as well as HDFS and ElasticSearch as sinks.

The JobManager service uses a dedicated hostname which can be referenced by the TaskManagers instances as well as the job submissions scripts. To each individual TaskManager is assigned a Pod. The number of containers is aligned with the number of jobs which are endless and stateful. On update, the TaskManager deploys a checkpoint.

ConfigMap are used to generate the Flink configuration including memory and checkpoints managements.

Guaranteed secure communication with SSL

By default, every Flink communication between the JobManager and the TaskManagers is performing hostname validation. The traditional approach to generating public and private certificates for each component is hard to integrate with dynamic IP and hostnames. It implies contacting a PKI server to generate the certificate on the container startup before the Flink component has started.

In their first iteration, the team used wildcard certificates.

It starts with the registration of a service resource for the JobManager. Kubertenetes service gets a fixed hostname and communications are routed to the Pod by Kubernetes.

The taskManagers rely on stateful set definitions. A stateful Set provides a predicated hostname. For example, the FQDN “flink-tm-0.flink-tm-svc.namesapce.cluster.local” defines the TaskManager instance “flink-tm-0” inside the service “flink-tm-svc” inside the namespace “namespace”. Isolating all the TaskManagers inside a single domain name makes it possible to generate a wildcard certificate available to the TaskManager.

In the Pods performing job submissions,  the CLI is not configured to perform hostname validation. The certificate authority must be trusted by the JobManager.

The limitation of using wildcard certificates is that it restrains the deployment of the Flink application to a single namespace. Deployment to a new namespace means generating a new certificate.

The final solution which was selected consists of generating one single purpose certificate pushed to the Truststores and Keystores with certificates and hostname validation being deactivated.

Starting with version 1.6, Flink can perform mutual validation which respects the recommended approach for Kubernetes. The content and password of the Truststores and Keystores are exposed as a Kubernetes secret, which is base64 encoded and mounted into the Pods.

Communication with sources and sinks

In the presented architecture, the Flink application is communicating with Kafka, HDFS, and Elasticsearch.

To enable Kerberos authentication between two Flink components, it is required to provide access to the Keytab files to every component. A Kubernetes secret contains the file encoded in base64 and is exposed to every Pod. The ConfigMap is used to send the Kerberos properties such as the service principals.

The Kafka security protocol is “SASL_PLAINTEXT” and requires the Kerberos service name provided by the ConfigMap.

The various SSL certificates of the source and sink services are exposed as secrets mounted into the Pods. The DockerFile definition mounts and ensures access rights to “$JAVA_HOME/lib/security/cacerts” which contains the required Certificate Authorities.

The ElasticSearch endpoint is accessed through REST requests. They created a custom sink because Flink prior to version 1.6 is incompatible with ElasticSearch in version 6. In their custom HTTP client, the SSL hostname verifier was invalidated.

Communication between the Flink TaskManager and the Kubernetes Physical Volume

The Flink state is persisted using a Physical Volume exposing an NFS server. Because the NFS is not capable of encrypting the data, encryption is handled at the application level. This answers the requirement of in-flight and at-rest (not supported natively by NFS) encryption.

What’s next

The talk ended on the future actions to improve the security of the system, including enforcing Kubernetes security config:

  • Use RBAC
  • Restrict access to kubectl
  • Use Network Policy
  • Pod security Policy

As well as using the Job Cluster Container Entrypoint present in Flink 1.6 which embeds both the Flink runtime and the user application inside a container.


Also published on Medium.

By |2018-10-09T11:25:29+00:00October 8th, 2018|Categories: Big Data, Cyber Security|Tags: , , , , , |0 Comments

About the Author:

Passionate with programming, data and entrepreneurship, I participate in shaping Adaltas to be a team of talented engineers to share our skills and experiences.

Leave A Comment