Migrating an Airflow KubernetesPodOperator to Dagster
Airlift v2 is under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please reach out to your CSM.
The KubernetesPodOperator in Apache Airflow allows you to execute containerized tasks within Kubernetes pods as part of your data pipelines.
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k8s_hello_world = KubernetesPodOperator(
    task_id="hello_world_task",
    name="hello-world-pod",
    image="bash:latest",
    cmds=["bash", "-cx"],
    arguments=['echo "Hello World!"'],
)
Dagster equivalent
The Dagster equivalent to the KubernetesPodOperator is to use the PipesK8sClient to execute a task within a Kubernetes pod:
import dagster_k8s as dg_k8s
import dagster as dg
container_cfg = {
    "name": "hello-world-pod",
    "image": "bash:latest",
    "command": ["bash", "-cx"],
    "args": ['echo "Hello World!"'],
}
@dg.asset
def execute_hello_world_task(context: dg.AssetExecutionContext):
    return (
        dg_k8s.PipesK8sClient()
        .run(
            context=context,
            base_pod_meta={"name": "hello-world-pod"},
            base_pod_spec={"containers": [container_cfg]},
        )
        .get_results()
    )
Migrating the operator
To migrate the operator, you will need to:
- Ensure that your Dagster deployment has access to the Kubernetes cluster.
- Write an assetthat executes the task within a Kubernetes pod using thePipesK8sClient.
- Use the Dagster Airlift component to proxy execution of the original task to Dagster.
Step 1: Ensure access to the Kubernetes cluster
First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The PipesK8sClient accepts kubeconfig and kubecontext, and env arguments to configure the Kubernetes client.
Here's an example of what this might look like when configuring the client to access an EKS cluster:
import dagster_k8s as dg_k8s
eks_client = dg_k8s.PipesK8sClient(
    # The client will have automatic access to all
    # environment variables in the execution context.
    env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
    kubeconfig_file="path/to/kubeconfig",
    kube_context="my-eks-cluster",
)
Step 2: Write an asset that executes the task within a Kubernetes pod
Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the PipesK8sClient. Unlike the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code. For example:
from dagster import AssetExecutionContext, asset
container_cfg = {
    "name": "hello-world-pod",
    "image": "bash:latest",
    "command": ["bash", "-cx"],
    "args": ['echo "Hello World!"'],
}
@asset
def execute_hello_world_task(context: AssetExecutionContext):
    return eks_client.run(
        context=context,
        base_pod_meta={"name": "hello-world-pod"},
        base_pod_spec={"containers": [container_cfg]},
    ).get_results()
In the parameter comparison section of this guide, you'll find a reference for mapping KubernetesPodOperator parameters to PipesK8sClient parameters.
For more information on the full capabilities of the PipesK8sClient, see the "Build pipelines with Kubernetes".
Step 3: Using the Dagster Airlift component to proxy execution
Finally, you can use the Dagster Airlift component to proxy the execution of the original task to Dagster. For more information, see "Migrate from Airflow to Dagster at the task level".
Parameter comparison
Here's a comparison of the parameters between the Airflow KubernetesPodOperator and the Dagster PipesK8sClient.
Directly supported arguments
| KubernetesPodOperatorargument | PipesK8sClientargument | 
|---|---|
| in_cluster | load_incluster_config | 
| cluster_context | kube_context | 
| config_file | kubeconfig_file | 
Indirectly support arguments
Many arguments are supported indirectly as keys passed to the base_pod_spec argument:
| KubernetesPodOperatorargument | PipesK8sClientkey (underbase_pod_specargument) | Description | 
|---|---|---|
| volumes | volumes | Volumes to be used by the Pod | 
| affinity | affinity | Node affinity/anti-affinity rules for the Pod | 
| node_selector | nodeSelector | Node selection constraints for the Pod | 
| hostnetwork | hostNetwork | Enable host networking for the Pod | 
| dns_config | dnsConfig | DNS settings for the Pod | 
| dnspolicy | dnsPolicy | DNS policy for the Pod | 
| hostname | hostname | Hostname of the Pod | 
| subdomain | subdomain | Subdomain for the Pod | 
| schedulername | schedulerName | Scheduler to be used for the Pod | 
| service_account_name | serviceAccountName | Service account to be used by the Pod | 
| priority_class_name | priorityClassName | Priority class for the Pod | 
| security_context | securityContext | Security context for the entire Pod | 
| tolerations | tolerations | Tolerations for the Pod | 
| image_pull_secrets | imagePullSecrets | Secrets for pulling container images | 
| termination_grace_period | terminationGracePeriodSeconds | Grace period for Pod termination | 
| active_deadline_seconds | activeDeadlineSeconds | Deadline for the Pod's execution | 
| host_aliases | hostAliases | Additional entries for the Pod's /etc/hosts | 
| init_containers | initContainers | Initialization containers for the Pod | 
The following arguments are supported under the nested containers key of the base_pod_spec argument of the PipesK8sClient:
| KubernetesPodOperatorargument | PipesK8sClientkey (underbase_pod_spec>containersargument) | Description | 
|---|---|---|
| image | image | Docker image for the container | 
| cmds | command | Entrypoint command for the container | 
| arguments | args | Arguments for the entrypoint command | 
| ports | ports | List of ports to expose from the container | 
| volume_mounts | volumeMounts | List of volume mounts for the container | 
| env_vars | env | Environment variables for the container | 
| env_from | envFrom | List of sources to populate environment variables | 
| image_pull_policy | imagePullPolicy | Policy for pulling the container image | 
| container_resources | resources | Resource requirements for the container | 
| container_security_context | securityContext | Security context for the container | 
| termination_message_policy | terminationMessagePolicy | Policy for the termination message | 
For a full list, see the Kubernetes container spec documentation.
The following arguments are supported under the base_pod_meta argument, which configures the metadata of the pod:
| KubernetesPodOperatorargument | PipesK8sClientkey (underbase_pod_metaargument) | 
|---|---|
| name | name | 
| namespace | namespace | 
| labels | labels | 
| annotations | annotations | 
For a full list, see the Kubernetes objectmeta spec documentation.