Google Kubernetes Engine Operators

Google Kubernetes Engine (GKE) provides a managed environment for deploying, managing, and scaling your containerized applications using Google infrastructure. The GKE environment consists of multiple machines (specifically, Compute Engine instances) grouped together to form a cluster.

Prerequisite Tasks

To use these operators, you must do a few things:

Manage GKE cluster

A cluster is the foundation of GKE - all workloads run on top of the cluster. It is made up on a cluster master and worker nodes. The lifecycle of the master is managed by GKE when creating or deleting a cluster. The worker nodes are represented as Compute Engine VM instances that GKE creates on your behalf when creating a cluster.

Create GKE cluster

Here is an example of a cluster definition:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}

A dict object like this, or a Cluster definition, is required when creating a cluster with GKECreateClusterOperator.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

Delete GKE cluster

To delete a cluster, use GKEDeleteClusterOperator. This would also delete all the nodes allocated to the cluster.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

Manage workloads on a GKE cluster

GKE works with containerized applications, such as those created on Docker, and deploys them to run on the cluster. These are called workloads, and when deployed on the cluster they leverage the CPU and memory resources of the cluster to run effectively.

Run a Pod on a GKE cluster

There are two operators available in order to run a pod on a GKE cluster:

GKEStartPodOperator extends KubernetesPodOperator to provide authorization using Google Cloud credentials. There is no need to manage the kube_config file, as it will be generated automatically. All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator. For more information on KubernetesPodOperator, please look at: KubernetesPodOperator guide.

Using with Private cluster

All clusters have a canonical endpoint. The endpoint is the IP address of the Kubernetes API server that Airflow use to communicate with your cluster master. The endpoint is displayed in Cloud Console under the Endpoints field of the cluster’s Details tab, and in the output of gcloud container clusters describe in the endpoint field.

Private clusters have two unique endpoint values: privateEndpoint, which is an internal IP address, and publicEndpoint, which is an external one. Running GKEStartPodOperator against a private cluster sets the external IP address as the endpoint by default. If you prefer to use the internal IP as the endpoint, you need to set use_internal_ip parameter to True.

Using with Autopilot (serverless) cluster

When running on serverless cluster like GKE Autopilot, the pod startup can sometimes take longer due to cold start. During the pod startup, the status is checked in regular short intervals and warning messages are emitted if the pod has not yet started. You can increase this interval length via the startup_check_interval_seconds parameter, with recommendation of 60 seconds.

Use of XCom

We can enable the usage of XCom on the operator. This works by launching a sidecar container with the pod specified. The sidecar is automatically mounted when the XCom usage is specified and its mount point is the path /airflow/xcom. To provide values to the XCom, ensure your Pod writes it into a file called return.json in the sidecar. The contents of this can then be used downstream in your DAG. Here is an example of it being used:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

And then use it in other operators:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
    task_id="pod_task_xcom_result",
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

Was this entry helpful?