Google Kubernetes Engine 運算子¶
Google Kubernetes Engine (GKE) 提供一個受管理的環境,讓您使用 Google 基礎架構部署、管理及擴展您的容器化應用程式。GKE 環境包含多部機器 (具體來說是 Compute Engine 執行個體) 組合在一起以形成叢集。
先決條件任務¶
若要使用這些運算子,您必須執行幾項操作
使用 Cloud Console 選取或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'安裝的詳細資訊請參閱 安裝。
管理 GKE 叢集¶
叢集是 GKE 的基礎 — 所有工作負載都在叢集之上執行。它由叢集主節點和工作節點組成。主節點的生命週期由 GKE 在建立或刪除叢集時管理。工作節點以 Compute Engine VM 執行個體的形式呈現,GKE 在您建立叢集時代表您建立這些執行個體。
建立 GKE 叢集¶
以下是叢集定義的範例
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}
像這樣的 dict 物件,或 Cluster
定義,是在使用 GKECreateClusterOperator
建立叢集時所必需的。
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
在叢集中安裝特定版本的 Kueue¶
Kueue 是一個雲端原生工作排程器,可與預設 Kubernetes 排程器、工作控制器和叢集自動擴展器協同運作,以提供端對端批次系統。Kueue 實作工作佇列,根據配額和在團隊之間公平共用資源的階層,決定工作應何時等待以及何時開始。Kueue 支援 Autopilot 叢集、具有節點自動佈建的標準 GKE 和一般自動擴展節點集區。若要在您的叢集上安裝和使用 Kueue,請使用 GKEStartKueueInsideClusterOperator
,如此範例所示
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
刪除 GKE 叢集¶
若要刪除叢集,請使用 GKEDeleteClusterOperator
。這也會刪除配置給叢集的所有節點。
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
管理 GKE 叢集上的工作負載¶
GKE 可與容器化應用程式 (例如在 Docker 上建立的應用程式) 搭配運作,並將其部署到叢集上執行。這些稱為工作負載,當部署在叢集上時,它們會利用叢集的 CPU 和記憶體資源來有效執行。
在 GKE 叢集上執行 Pod¶
有兩個運算子可用於在 GKE 叢集上執行 Pod
GKEStartPodOperator
擴展 KubernetesPodOperator
以提供使用 Google Cloud 憑證的授權。無需管理 kube_config
檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file
除外) 也適用於 GKEStartPodOperator
。如需 KubernetesPodOperator
的詳細資訊,請參閱:KubernetesPodOperator 指南。
搭配私有叢集使用¶
所有叢集都有標準端點。端點是 Kubernetes API 伺服器的 IP 位址,Airflow 使用此位址與您的叢集主節點通訊。端點會顯示在 Cloud Console 的叢集「詳細資料」分頁的「端點」欄位中,以及 gcloud container clusters describe
的輸出中。
私有叢集有兩個獨特的端點值:privateEndpoint
(內部 IP 位址) 和 publicEndpoint
(外部 IP 位址)。預設情況下,針對私有叢集執行 GKEStartPodOperator
會將外部 IP 位址設定為端點。如果您偏好使用內部 IP 作為端點,則需要將 use_internal_ip
參數設定為 True
。
搭配 Autopilot (無伺服器) 叢集使用¶
在無伺服器叢集 (例如 GKE Autopilot) 上執行時,Pod 啟動有時可能需要較長時間,因為需要冷啟動。在 Pod 啟動期間,會定期以短時間間隔檢查狀態,如果 Pod 尚未啟動,則會發出警告訊息。您可以透過 startup_check_interval_seconds
參數增加此間隔長度,建議為 60 秒。
XCom 的使用¶
我們可以在運算子上啟用 XCom 的使用。這會透過啟動具有指定 Pod 的 Sidecar 容器來運作。當指定 XCom 使用方式時,會自動掛載 Sidecar,且其掛載點是路徑 /airflow/xcom
。若要將值提供給 XCom,請確保您的 Pod 將其寫入 Sidecar 中名為 return.json
的檔案。然後可以在 DAG 的下游使用此檔案的內容。以下是其使用範例
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
然後在其他運算子中使用它
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
您可以對此動作使用可延遲模式,以便非同步執行運算子。當運算子知道必須等待時,這將讓您能夠釋出 Worker,並將恢復運算子的工作交給觸發器。因此,當運算子暫停 (延遲) 時,它不會佔用 Worker 插槽,而且您的叢集將在閒置的運算子或感測器上浪費更少的資源
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
在 GKE 叢集上執行 Job¶
有兩個運算子可用於在 GKE 叢集上執行 Job
GKEStartJobOperator
擴展 KubernetesJobOperator
以提供使用 Google Cloud 憑證的授權。無需管理 kube_config
檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file
除外) 也適用於 GKEStartJobOperator
。
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator
也支援可延遲模式。請注意,只有在 wait_until_job_complete
參數設定為 True
時,此模式才有意義。
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
若要在啟用 Kueue 的 GKE 叢集上執行 Job,請使用 GKEStartKueueJobOperator
。
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
刪除 GKE 叢集上的 Job¶
有兩個運算子可用於刪除 GKE 叢集上的 Job
GKEDeleteJobOperator
擴展 KubernetesDeleteJobOperator
以提供使用 Google Cloud 憑證的授權。無需管理 kube_config
檔案,因為它會自動產生。所有 Kubernetes 參數 (config_file
除外) 也適用於 GKEDeleteJobOperator
。
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
依指定名稱擷取 Job 的相關資訊¶
您可以使用 GKEDescribeJobOperator
,透過提供 Job 的名稱和命名空間來擷取現有 Job 的詳細描述。
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
擷取 Job 列表¶
您可以使用 GKEListJobsOperator
擷取現有 Job 的列表。如果提供 namespace
參數,輸出將包含指定命名空間中的 Job。如果未指定 namespace
參數,則會輸出所有命名空間中的資訊。
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
在 GKE 叢集中建立資源¶
您可以使用 GKECreateCustomResourceOperator
,在指定的 Google Kubernetes Engine 叢集中建立資源。
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
刪除 GKE 叢集中的資源¶
您可以使用 GKEDeleteCustomResourceOperator
,刪除指定的 Google Kubernetes Engine 叢集中的資源。
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
暫停 GKE 叢集上的 Job¶
您可以使用 GKESuspendJobOperator
,暫停指定 Google Kubernetes Engine 叢集中的 Job。
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
恢復 GKE 叢集上的 Job¶
您可以使用 GKEResumeJobOperator
,恢復指定 Google Kubernetes Engine 叢集中的 Job。
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)