Skip to main content

Migrating an Airflow KubernetesPodOperator to Dagster

info

Airlift v2 is under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please reach out to your CSM.

The KubernetesPodOperator in Apache Airflow allows you to execute containerized tasks within Kubernetes pods as part of your data pipelines.


from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k8s_hello_world = KubernetesPodOperator(
task_id="hello_world_task",
name="hello-world-pod",
image="bash:latest",
cmds=["bash", "-cx"],
arguments=['echo "Hello World!"'],
)

Dagster equivalent

The Dagster equivalent to the KubernetesPodOperator is to use the PipesK8sClient to execute a task within a Kubernetes pod:

import dagster_k8s as dg_k8s

import dagster as dg

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@dg.asset
def execute_hello_world_task(context: dg.AssetExecutionContext):
return (
dg_k8s.PipesK8sClient()
.run(
context=context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
)
.get_results()
)

Migrating the operator

To migrate the operator, you will need to:

  1. Ensure that your Dagster deployment has access to the Kubernetes cluster.
  2. Write an asset that executes the task within a Kubernetes pod using the PipesK8sClient.
  3. Use the Dagster Airlift component to proxy execution of the original task to Dagster.

Step 1: Ensure access to the Kubernetes cluster

First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The PipesK8sClient accepts kubeconfig and kubecontext, and env arguments to configure the Kubernetes client.

Here's an example of what this might look like when configuring the client to access an EKS cluster:

import dagster_k8s as dg_k8s

eks_client = dg_k8s.PipesK8sClient(
# The client will have automatic access to all
# environment variables in the execution context.
env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
kubeconfig_file="path/to/kubeconfig",
kube_context="my-eks-cluster",
)

Step 2: Write an asset that executes the task within a Kubernetes pod

Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the PipesK8sClient. Unlike the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code. For example:

from dagster import AssetExecutionContext, asset

container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}


@asset
def execute_hello_world_task(context: AssetExecutionContext):
return eks_client.run(
context=context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
).get_results()

In the parameter comparison section of this guide, you'll find a reference for mapping KubernetesPodOperator parameters to PipesK8sClient parameters.

For more information on the full capabilities of the PipesK8sClient, see the "Build pipelines with Kubernetes".

Step 3: Using the Dagster Airlift component to proxy execution

Finally, you can use the Dagster Airlift component to proxy the execution of the original task to Dagster. For more information, see "Migrate from Airflow to Dagster at the task level".

Parameter comparison

Here's a comparison of the parameters between the Airflow KubernetesPodOperator and the Dagster PipesK8sClient.

Directly supported arguments

KubernetesPodOperator argumentPipesK8sClient argument
in_clusterload_incluster_config
cluster_contextkube_context
config_filekubeconfig_file

Indirectly support arguments

Many arguments are supported indirectly as keys passed to the base_pod_spec argument:

KubernetesPodOperator argumentPipesK8sClient key (under base_pod_spec argument)Description
volumesvolumesVolumes to be used by the Pod
affinityaffinityNode affinity/anti-affinity rules for the Pod
node_selectornodeSelectorNode selection constraints for the Pod
hostnetworkhostNetworkEnable host networking for the Pod
dns_configdnsConfigDNS settings for the Pod
dnspolicydnsPolicyDNS policy for the Pod
hostnamehostnameHostname of the Pod
subdomainsubdomainSubdomain for the Pod
schedulernameschedulerNameScheduler to be used for the Pod
service_account_nameserviceAccountNameService account to be used by the Pod
priority_class_namepriorityClassNamePriority class for the Pod
security_contextsecurityContextSecurity context for the entire Pod
tolerationstolerationsTolerations for the Pod
image_pull_secretsimagePullSecretsSecrets for pulling container images
termination_grace_periodterminationGracePeriodSecondsGrace period for Pod termination
active_deadline_secondsactiveDeadlineSecondsDeadline for the Pod's execution
host_aliaseshostAliasesAdditional entries for the Pod's /etc/hosts
init_containersinitContainersInitialization containers for the Pod

The following arguments are supported under the nested containers key of the base_pod_spec argument of the PipesK8sClient:

KubernetesPodOperator argumentPipesK8sClient key (under base_pod_spec > containers argument)Description
imageimageDocker image for the container
cmdscommandEntrypoint command for the container
argumentsargsArguments for the entrypoint command
portsportsList of ports to expose from the container
volume_mountsvolumeMountsList of volume mounts for the container
env_varsenvEnvironment variables for the container
env_fromenvFromList of sources to populate environment variables
image_pull_policyimagePullPolicyPolicy for pulling the container image
container_resourcesresourcesResource requirements for the container
container_security_contextsecurityContextSecurity context for the container
termination_message_policyterminationMessagePolicyPolicy for the termination message

For a full list, see the Kubernetes container spec documentation.

The following arguments are supported under the base_pod_meta argument, which configures the metadata of the pod:

KubernetesPodOperator argumentPipesK8sClient key (under base_pod_meta argument)
namename
namespacenamespace
labelslabels
annotationsannotations

For a full list, see the Kubernetes objectmeta spec documentation.