Google Kubernetes Engine 運算子

Google Kubernetes Engine (GKE) 提供一個受管理的環境,讓您使用 Google 基礎架構部署、管理及擴展您的容器化應用程式。GKE 環境包含多部機器 (具體來說是 Compute Engine 執行個體) 組合在一起以形成叢集。

先決條件任務

若要使用這些運算子,您必須執行幾項操作

管理 GKE 叢集

叢集是 GKE 的基礎 — 所有工作負載都在叢集之上執行。它由叢集主節點和工作節點組成。主節點的生命週期由 GKE 在建立或刪除叢集時管理。工作節點以 Compute Engine VM 執行個體的形式呈現,GKE 在您建立叢集時代表您建立這些執行個體。

建立 GKE 叢集

以下是叢集定義的範例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}

像這樣的 dict 物件,或 Cluster 定義,是在使用 GKECreateClusterOperator 建立叢集時所必需的。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

在叢集中安裝特定版本的 Kueue

Kueue 是一個雲端原生工作排程器,可與預設 Kubernetes 排程器、工作控制器和叢集自動擴展器協同運作,以提供端對端批次系統。Kueue 實作工作佇列,根據配額和在團隊之間公平共用資源的階層,決定工作應何時等待以及何時開始。Kueue 支援 Autopilot 叢集、具有節點自動佈建的標準 GKE 和一般自動擴展節點集區。若要在您的叢集上安裝和使用 Kueue,請使用 GKEStartKueueInsideClusterOperator,如此範例所示

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[source]

    add_kueue_cluster = GKEStartKueueInsideClusterOperator(
        task_id="add_kueue_cluster",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        kueue_version="v0.6.2",
    )

刪除 GKE 叢集

若要刪除叢集,請使用 GKEDeleteClusterOperator。這也會刪除配置給叢集的所有節點。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

管理 GKE 叢集上的工作負載

GKE 可與容器化應用程式 (例如在 Docker 上建立的應用程式) 搭配運作,並將其部署到叢集上執行。這些稱為工作負載,當部署在叢集上時,它們會利用叢集的 CPU 和記憶體資源來有效執行。

在 GKE 叢集上執行 Pod

有兩個運算子可用於在 GKE 叢集上執行 Pod

GKEStartPodOperator 擴展 KubernetesPodOperator 以提供使用 Google Cloud 憑證的授權。無需管理 kube_config 檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file 除外) 也適用於 GKEStartPodOperator。如需 KubernetesPodOperator 的詳細資訊,請參閱:KubernetesPodOperator 指南。

搭配私有叢集使用

所有叢集都有標準端點。端點是 Kubernetes API 伺服器的 IP 位址,Airflow 使用此位址與您的叢集主節點通訊。端點會顯示在 Cloud Console 的叢集「詳細資料」分頁的「端點」欄位中,以及 gcloud container clusters describe 的輸出中。

私有叢集有兩個獨特的端點值:privateEndpoint (內部 IP 位址) 和 publicEndpoint (外部 IP 位址)。預設情況下,針對私有叢集執行 GKEStartPodOperator 會將外部 IP 位址設定為端點。如果您偏好使用內部 IP 作為端點,則需要將 use_internal_ip 參數設定為 True

搭配 Autopilot (無伺服器) 叢集使用

在無伺服器叢集 (例如 GKE Autopilot) 上執行時,Pod 啟動有時可能需要較長時間,因為需要冷啟動。在 Pod 啟動期間,會定期以短時間間隔檢查狀態,如果 Pod 尚未啟動,則會發出警告訊息。您可以透過 startup_check_interval_seconds 參數增加此間隔長度,建議為 60 秒。

XCom 的使用

我們可以在運算子上啟用 XCom 的使用。這會透過啟動具有指定 Pod 的 Sidecar 容器來運作。當指定 XCom 使用方式時,會自動掛載 Sidecar,且其掛載點是路徑 /airflow/xcom。若要將值提供給 XCom,請確保您的 Pod 將其寫入 Sidecar 中名為 return.json 的檔案。然後可以在 DAG 的下游使用此檔案的內容。以下是其使用範例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

然後在其他運算子中使用它

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
    task_id="pod_task_xcom_result",
)

您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

在 GKE 叢集上執行 Job

有兩個運算子可用於在 GKE 叢集上執行 Job

GKEStartJobOperator 擴展 KubernetesJobOperator 以提供使用 Google Cloud 憑證的授權。無需管理 kube_config 檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file 除外) 也適用於 GKEStartJobOperator

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

job_task = GKEStartJobOperator(
    task_id="job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

GKEStartJobOperator 也支援可延遲模式。請注意,只有在 wait_until_job_complete 參數設定為 True 時,此模式才有意義。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

job_task_def = GKEStartJobOperator(
    task_id="job_task_def",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME_DEF,
    wait_until_job_complete=True,
    deferrable=True,
)

若要在啟用 Kueue 的 GKE 叢集上執行 Job,請使用 GKEStartKueueJobOperator

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[source]

kueue_job_task = GKEStartKueueJobOperator(
    task_id="kueue_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    queue_name=QUEUE_NAME,
    namespace="default",
    parallelism=3,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
)

刪除 GKE 叢集上的 Job

有兩個運算子可用於刪除 GKE 叢集上的 Job

GKEDeleteJobOperator 擴展 KubernetesDeleteJobOperator 以提供使用 Google Cloud 憑證的授權。無需管理 kube_config 檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file 除外) 也適用於 GKEDeleteJobOperator

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

delete_job = GKEDeleteJobOperator(
    task_id="delete_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=JOB_NAME,
    namespace=JOB_NAMESPACE,
)

依指定名稱擷取 Job 的相關資訊

您可以使用 GKEDescribeJobOperator,透過提供 Job 的名稱和命名空間來擷取現有 Job 的詳細描述。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

describe_job_task = GKEDescribeJobOperator(
    task_id="describe_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    job_name=job_task.output["job_name"],
    namespace="default",
    cluster_name=CLUSTER_NAME,
)

擷取 Job 列表

您可以使用 GKEListJobsOperator 擷取現有 Job 的列表。如果提供 namespace 參數,輸出將包含指定命名空間中的 Job。如果未指定 namespace 參數,則會輸出所有命名空間中的資訊。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

list_job_task = GKEListJobsOperator(
    task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)

在 GKE 叢集中建立資源

您可以使用 GKECreateCustomResourceOperator,在指定的 Google Kubernetes Engine 叢集中建立資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[source]

create_resource_task = GKECreateCustomResourceOperator(
    task_id="create_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

刪除 GKE 叢集中的資源

您可以使用 GKEDeleteCustomResourceOperator,刪除指定的 Google Kubernetes Engine 叢集中的資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[source]

delete_resource_task = GKEDeleteCustomResourceOperator(
    task_id="delete_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

暫停 GKE 叢集上的 Job

您可以使用 GKESuspendJobOperator,暫停指定 Google Kubernetes Engine 叢集中的 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

suspend_job = GKESuspendJobOperator(
    task_id="suspend_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

恢復 GKE 叢集上的 Job

您可以使用 GKEResumeJobOperator,恢復指定 Google Kubernetes Engine 叢集中的 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

resume_job = GKEResumeJobOperator(
    task_id="resume_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

這篇文章對您有幫助嗎?