Pilotcore Pilotcore

Scaling Airflow Workers in EKS

In a previous post, we deployed Kubernetes cluster in AWS and Airflow on Kubernetes cluster (EKS) using Fargate nodes. In this post, we will show how we can scale the workers to maximize power of the cluster when needed and minimize the costs of idle time.

Peter Jung
Peter Jung
Cloud Engineer
4 min read
Scaling Airflow Workers in EKS

Workers are an essential part of Airflow. They are daemons that actually execute the logic of tasks. When you create a task, it's sent to the queue to which workers are listening and can pick up the task to do.

Airflow's executors are the mechanism by which task instances are run.

Depending on your needs, you can select from

Local Executors

  • Debug Executor
  • Local Executor
  • Sequential Executor

As the name suggests, local executors are great for debugging purposes on your localhost, they are not meant for the production usage.

Remote Executors

  • Celery Executor
  • CeleryKubernetes Executor
  • Dask Executor
  • Kubernetes Executor
  • LocalKubernetes Executor

From the production options, we will look at the Celery and Kubernetes executor along with the Kubernetes pod operator.

CeleryExecutor and KEDA

Celery is the default executor in Airflow's Helm chart. They manage one to many CeleryD processes to execute the desired tasks.

You are able to set a limit of maximal tasks running on one worker with the environment variable AIRFLOW__CELERY__WORKER_CONCURRENCY.

In the simplest deployment, we could spawn a fixed number of workers with fixed number of tasks allowed to be run in parallel on each of them. If too many tasks are scheduled, they would just wait until they are finally processed. This would be fine, as long as you don't mind unnecessary long waiting times, the main downside is when only a small number of tasks are scheduled or none at all. That means that some workers are running without processing anything and so you are paying for nothing!

With new Airflow and CeleryExecutor, you can utilize KEDA to handle auto-scaling of workers according to the number of tasks in the queue. Simply put, KEDA will query the database every n seconds for the number of tasks in a state RUNNING or QUEUED and if the number exceeds maximum free tasks slots, it will spawn new workers. Of course you can also set a limit to a maximum number of workers running at a time, so it won't drain your resources.

With Airflow Helm chart, enabling KEDA is pretty simple, all you need to do is configure a few variable in the values.yaml:

workers:
 # Overridden by KEDA (if enabled).
 replicas: 1

 keda:
   # CeleryExecutor or CeleryKubernetesExecutor required.
   # Disabled persistence required.
   enabled: true
   cooldownPeriod: 60
   maxReplicaCount: 10
   minReplicaCount: 0

Just keep in mind that with KEDA, persistence on your workers needs to be disabled and it supports only CeleryExecutor and CeleryKubernetesExecutor executors.

KubernetesExecutor

We have seen how we can use KEDA to drive auto-scaling of workers based on the number of tasks. As great as it is, there is one main drawback – it scales based on the number of tasks, not on the resources needed to run them.

We can easily imagine a situation where each worker is allowed to run concurrently some number of tasks, let's say 2. And let's say we have two different tasks, first one is lightweight, it only reads data from an external database and reports them via email. The second one is more resource-heavy as it uses some machine learning model to do predictions on the data.

Depending on how big resources are allocated for your workers, there are several possible situations. To be safe, you set big enough resources so Airflow can schedule multiple heavy tasks on a single worker, but in that case, Airflow can also schedule multiple light tasks on a single worker, meaning they will be under-utilized and cost will be unnecessary high.

Right now, KEDA won't check the resources of tasks, but in our case such heavy tasks are more an exception than a rule and so for them, we are using KubernetesExecutor.

Instead of spawning workers that can run multiple tasks, the Kubernetes executor will create one pod for each scheduled task. The advantage of this approach is that you can utilize Kubernetes scaling features right out of the box, without worrying about whether your workers have enough resources to run all the allocated tasks. Another advantage is that since each task is a Kubernetes pod, so you can modify its template on a per-task basis.

When using task flow API, simply add arguments specifying your desired pod

from airflow.decorators import task
from kubernetes.client import models as k8s

@task(
   executor_config={
       "pod_override": k8s.V1Pod(
           spec=k8s.V1PodSpec(
               containers=[
                   k8s.V1Container(
                       name="base",
                       resources=k8s.V1ResourceRequirements(
                           limits={"cpu": 4, "memory": "8Gi"},
                           requests={"cpu": 4, "memory": "8Gi"}},
                       ),
                   )
               ]
           )
       )
   },
)
def my_task() -> None:
   ...

Now, this task will always be executed with 4 vCPUs and 8GB of memory. It's worth noting that the default manifest for pods created with the Kubernetes executor is located in the pod template file and can be changed if you need. You are also not limited to modifying only the resources, you can override anything from the pod template file. For example you can add annotations if you want the task to be executed only on EC2 nodes with available GPU, you can mount persistent volume, or anything else that you can do with standard pods.

CeleryKubernetesExecutor

In practice, you may benefit from using both Celery and Kubernetes executors. Main benefit of CeleryExecutor is that multiple tasks can be executed on one worker, meaning that if the worker is already up and running, your other tasks get executed within a few seconds. If you have a lot of tasks with stable (most probably small) resource requirements, it's safe to use KEDA to drive autoscaling.

Disadvantage of Kubernetes pods is that it will take some time to get them up. So even a task that takes just a few seconds, it will take at least 1-2 minutes from start to finish, where most of the time will be spent waiting on the pod to get ready.

With CeleryKubernetesExecutor, you can benefit from both words. It will create two queues, where one queue will be listened by the Celery workers and the other one by the Kubernetes workers, in the end you can simply choose the right fit for your task like this:

from airflow.decorators import task

@task(
    queue="target_queue",
)
def task() -> None:
    ...

By default, the queue for Celery is called “default” and the queue for Kubernetes is called “kubernetes”.

In this post, you have seen how to utilize two different executor types to improve the performance of your tasks while optimizing the costs at the same time. If you would like our team to provide assistance with your Airflow project, contact us today!