目錄
KubernetesPodOperator¶
KubernetesPodOperator
允許您在 Kubernetes 叢集上建立和執行 Pod。
注意
如果您使用託管 Kubernetes,請考慮使用專門的 KPO 運算子,因為它可以簡化 Kubernetes 授權流程
注意
使用此運算子 **不** 需要 Kubernetes 執行器。
此運算子如何運作?¶
KubernetesPodOperator
使用 Kubernetes API 在 Kubernetes 叢集中啟動 Pod。透過提供映像檔 URL 和包含選用引數的命令,此運算子使用 Kube Python Client 產生 Kubernetes API 請求,以動態啟動這些個別的 Pod。使用者可以使用 config_file
參數指定 kubeconfig 檔案,否則運算子將預設為 ~/.kube/config
。
KubernetesPodOperator
啟用任務層級的資源組態,並且最適合透過公開 PyPI 儲存庫無法取得的自訂 Python 相依性。它也允許使用者使用 pod_template_file
參數提供範本 YAML 檔案。最終,它允許 Airflow 作為工作流程協調器 — 無論這些工作流程是以何種語言撰寫。
偵錯 KubernetesPodOperator¶
您可以透過在運算子的實例上呼叫 dry_run()
,列印出在執行階段將建立的 Pod 的 Kubernetes 宣告。
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)
k.dry_run()
引數優先順序¶
當 KPO 定義 Pod 物件時,KubernetesPodOperator
引數之間可能會有重疊。一般而言,優先順序順序為 KPO 欄位特定引數(例如,secrets
、cmds
、affinity
)、更通用的範本 full_pod_spec
、pod_template_file
、pod_template_dict
,然後依序為 V1Pod
(預設)。
對於 namespace
,如果未透過任何這些方法提供命名空間,那麼我們將首先嘗試取得目前的命名空間(如果任務已在 Kubernetes 中執行),如果失敗,我們將使用 default
命名空間。
對於 Pod 名稱,如果未明確提供,我們將使用 task_id。預設會新增隨機後綴,因此 Pod 名稱通常並不重要。
如何在 Pod 中使用叢集 ConfigMap、Secret 和 Volume?¶
若要新增 ConfigMap、Volume 和其他 Kubernetes 原生物件,我們建議您匯入 Kubernetes 模型 API,如下所示
from kubernetes.client import models as k8s
透過此 API 物件,您可以存取 Python 類別形式的所有 Kubernetes API 物件。使用此方法將確保正確性和類型安全性。雖然我們已移除幾乎所有 Kubernetes 便利類別,但我們保留了 Secret
類別,以簡化產生密碼 Volume/環境變數的流程。
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
configmaps = [
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
init_container_volume_mounts = [
k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]
init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]
init_container = k8s.V1Container(
name="init-container",
image="ubuntu:16.04",
env=init_environments,
volume_mounts=init_container_volume_mounts,
command=["bash", "-cx"],
args=["echo 10"],
)
affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s.V1PreferredSchedulingTerm(
weight=1,
preference=k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
]
),
)
]
),
pod_affinity=k8s.V1PodAffinity(
required_during_scheduling_ignored_during_execution=[
k8s.V1WeightedPodAffinityTerm(
weight=1,
pod_affinity_term=k8s.V1PodAffinityTerm(
label_selector=k8s.V1LabelSelector(
match_expressions=[
k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
]
),
topology_key="failure-domain.beta.kubernetes.io/zone",
),
)
]
),
)
tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
KubernetesPodOperator
與 Kubernetes 物件規格之間的差異¶
KubernetesPodOperator
可以視為 Kubernetes 物件規格定義的替代方案,該定義能夠在 DAG 環境中的 Airflow 排程器中執行。如果使用運算子,則無需為您想要執行的 Pod 建立等效的 YAML/JSON 物件規格。YAML 檔案仍然可以使用 pod_template_file
提供,甚至可以使用 full_pod_spec
參數在 Python 中建構 Pod 規格,這需要 Kubernetes V1Pod
。
如何使用私有映像檔(容器登錄檔)?¶
預設情況下,KubernetesPodOperator
將尋找 Dockerhub 上公開託管的映像檔。若要從私有登錄檔(例如 ECR、GCR、Quay 或其他)提取映像檔,您必須建立 Kubernetes Secret,以表示從私有登錄檔存取映像檔的憑證,最終在 image_pull_secrets
參數中指定。
使用 kubectl
建立 Secret
kubectl create secret docker-registry testquay \
--docker-server=quay.io \
--docker-username=<Profile name> \
--docker-password=<password>
然後在您的 Pod 中使用它,如下所示
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
task_id="task-two",
get_logs=True,
)
此外,對於此動作,您可以在可延遲模式中使用運算子
quay_k8s_async = KubernetesPodOperator(
task_id="kubernetes_private_img_task_async",
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
定期擷取和顯示容器日誌的範例
kubernetes_task_async_log = KubernetesPodOperator(
task_id="kubernetes_task_async_log",
namespace="kubernetes_task_async_log",
in_cluster=False,
name="astro_k8s_test_pod",
image="ubuntu",
cmds=[
"bash",
"-cx",
(
"i=0; "
"while [ $i -ne 100 ]; "
"do i=$(($i+1)); "
"echo $i; "
"sleep 1; "
"done; "
"mkdir -p /airflow/xcom/; "
'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
),
],
do_xcom_push=True,
deferrable=True,
get_logs=True,
logging_interval=5,
)
XCom 如何運作?¶
KubernetesPodOperator
處理 XCom 值的方式與其他運算子不同。為了從您的 Pod 傳遞 XCom 值,您必須將 do_xcom_push
指定為 True
。這將建立一個與 Pod 並行執行的 Sidecar 容器。Pod 必須將 XCom 值寫入此位置的 /airflow/xcom/return.json
路徑。
注意
無效的 JSON 內容將會失敗,例如 echo 'hello' > /airflow/xcom/return.json
會失敗,而 echo '\"hello\"' > /airflow/xcom/return.json
則會成功
請參閱以下範例,了解其運作方式
write_xcom = KubernetesPodOperator(
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
write_xcom >> pod_task_xcom_result
注意
XCOM 將僅針對標記為 State.SUCCESS
的任務推送。
此外,對於此動作,您可以在可延遲模式中使用運算子
write_xcom_async = KubernetesPodOperator(
task_id="kubernetes_write_xcom_task_async",
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
on_finish_action="delete_pod",
in_cluster=True,
get_logs=True,
deferrable=True,
)
pod_task_xcom_result_async = BashOperator(
task_id="pod_task_xcom_result_async",
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
)
write_xcom_async >> pod_task_xcom_result_async
在電子郵件警示中包含錯誤訊息¶
寫入 /dev/termination-log
的任何內容都將由 Kubernetes 擷取,並在任務失敗時包含在例外訊息中。
k = KubernetesPodOperator(
task_id="test_error_message",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-error-message",
email="airflow@example.com",
email_on_failure=True,
)
在此處閱讀更多關於 termination-log 的資訊 這裡。
KubernetesPodOperator 回呼¶
KubernetesPodOperator
支援不同的回呼,可用於在 Pod 的生命週期中觸發動作。為了使用它們,您需要建立 KubernetesPodOperatorCallback
的子類別,並覆寫您想要使用的回呼方法。然後,您可以使用 callbacks
參數將您的回呼類別傳遞給運算子。
支援以下回呼
on_sync_client_creation:在建立同步用戶端後呼叫
on_pod_creation:在建立 Pod 後呼叫
on_pod_starting:在 Pod 啟動後呼叫
on_pod_completion:在 Pod 完成時呼叫
on_pod_cleanup:在清理/刪除 Pod 後呼叫
on_operator_resuming:從延遲狀態恢復任務時
progress_callback:在容器日誌的每一行呼叫
目前,回呼方法不會在非同步模式中呼叫,此支援將在未來新增。
範例:¶
import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
class MyCallback(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
client.create_namespaced_service(
namespace=pod.metadata.namespace,
body=k8s.V1Service(
metadata=k8s.V1ObjectMeta(
name=pod.metadata.name,
labels=pod.metadata.labels,
owner_references=[
k8s.V1OwnerReference(
api_version=pod.api_version,
kind=pod.kind,
name=pod.metadata.name,
uid=pod.metadata.uid,
controller=True,
block_owner_deletion=True,
)
],
),
spec=k8s.V1ServiceSpec(
selector=pod.metadata.labels,
ports=[
k8s.V1ServicePort(
name="http",
port=80,
target_port=80,
)
],
),
),
)
k = KubernetesPodOperator(
task_id="test_callback",
image="alpine",
cmds=["/bin/sh"],
arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
name="test-callback",
callbacks=MyCallback,
)
SparkKubernetesOperator¶
SparkKubernetesOperator
允許您在 Kubernetes 叢集上建立和執行 Spark 工作。它基於 spark-on-k8s-operator 專案。
此運算子簡化了介面,並接受不同的參數來設定和執行 Kubernetes 上的 Spark 應用程式。與 KubernetesOperator 類似,我們新增了邏輯,以便在提交工作後等待工作完成、管理錯誤處理、從驅動程式 Pod 擷取日誌,以及刪除 Spark 工作的功能。它也支援開箱即用的 Kubernetes 功能,例如處理 Volume、Config Map、Secret 等。
此運算子如何運作?¶
運算子透過在 Kubernetes 中產生 SparkApplication 自訂資源定義 (CRD) 來啟動 Spark 任務。此 SparkApplication 任務隨後會使用使用者指定的參數產生驅動程式和必要的執行器 Pod。運算子會持續監控任務的進度,直到任務成功或失敗為止。它會從驅動程式 Pod 擷取日誌,並將其顯示在 Airflow UI 中。
使用範例¶
為了建立 SparkKubernetesOperator 任務,您必須提供一個基本範本,其中包含 Spark 組態和 Kubernetes 相關的資源組態。此範本可以是 YAML 或 JSON 格式,作為運算子的起點。以下是您可以利用的範例範本
spark_job_template.yaml
spark:
apiVersion: sparkoperator.k8s.io/v1beta2
version: v1beta2
kind: SparkApplication
apiGroup: sparkoperator.k8s.io
metadata:
namespace: ds
spec:
type: Python
pythonVersion: "3"
mode: cluster
sparkVersion: 3.0.0
successfulRunHistoryLimit: 1
restartPolicy:
type: Never
imagePullPolicy: Always
hadoopConf: {}
imagePullSecrets: []
dynamicAllocation:
enabled: false
initialExecutors: 1
minExecutors: 1
maxExecutors: 1
labels: {}
driver:
serviceAccount: default
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
executor:
instances: 1
container_resources:
gpu:
name: null
quantity: 0
cpu:
request: null
limit: null
memory:
request: null
limit: null
kubernetes:
# example:
# env_vars:
# - name: TEST_NAME
# value: TEST_VALUE
env_vars: []
# example:
# env_from:
# - name: test
# valueFrom:
# secretKeyRef:
# name: mongo-secret
# key: mongo-password
env_from: []
# example:
# node_selector:
# karpenter.sh/provisioner-name: spark
node_selector: {}
# example: https://kubernetes.dev.org.tw/docs/concepts/scheduling-eviction/assign-pod-node/
# affinity:
# nodeAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# nodeSelectorTerms:
# - matchExpressions:
# - key: beta.kubernetes.io/instance-type
# operator: In
# values:
# - r5.xlarge
affinity:
nodeAffinity: {}
podAffinity: {}
podAntiAffinity: {}
# example: https://kubernetes.dev.org.tw/docs/concepts/scheduling-eviction/taint-and-toleration/
# type: list
# tolerations:
# - key: "key1"
# operator: "Equal"
# value: "value1"
# effect: "NoSchedule"
tolerations: []
# example:
# config_map_mounts:
# snowflake-default: /mnt/tmp
config_map_mounts: {}
# example:
# volume_mounts:
# - name: config
# mountPath: /airflow
volume_mounts: []
# https://kubernetes.dev.org.tw/docs/concepts/storage/volumes/
# example:
# volumes:
# - name: config
# persistentVolumeClaim:
# claimName: airflow
volumes: []
# read config map into an env variable
# example:
# from_env_config_map:
# - configmap_1
# - configmap_2
from_env_config_map: []
# load secret into an env variable
# example:
# from_env_secret:
# - secret_1
# - secret_2
from_env_secret: []
in_cluster: true
conn_id: kubernetes_default
kube_config_file: null
cluster_context: null
重要
範本檔案包含兩個主要類別:
spark
和kubernetes
。spark:此區段包含任務的 Spark 組態,反映 Spark API 範本的結構。
kubernetes:此區段包含任務的 Kubernetes 資源組態,直接對應於 Kubernetes API 文件。每個資源類型都在範本中包含一個範例。
要使用的指定基礎映像檔是
gcr.io/spark-operator/spark-py:v3.1.1
。確保 Spark 程式碼已嵌入在映像檔中、使用 persistentVolume 掛載,或可從外部位置存取,例如 S3 儲存桶。
接下來,使用以下程式碼建立任務
SparkKubernetesOperator(
task_id="spark_task",
image="gcr.io/spark-operator/spark-py:v3.1.1", # OR custom image using that
code_path="local://path/to/spark/code.py",
application_file="spark_job_template.yaml", # OR spark_job_template.json
dag=dag,
)
注意:或者,application_file 也可以是 JSON 檔案。請參閱以下範例
spark_job_template.json
{
"spark": {
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"version": "v1beta2",
"kind": "SparkApplication",
"apiGroup": "sparkoperator.k8s.io",
"metadata": {
"namespace": "ds"
},
"spec": {
"type": "Python",
"pythonVersion": "3",
"mode": "cluster",
"sparkVersion": "3.0.0",
"successfulRunHistoryLimit": 1,
"restartPolicy": {
"type": "Never"
},
"imagePullPolicy": "Always",
"hadoopConf": {},
"imagePullSecrets": [],
"dynamicAllocation": {
"enabled": false,
"initialExecutors": 1,
"minExecutors": 1,
"maxExecutors": 1
},
"labels": {},
"driver": {
"serviceAccount": "default",
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
},
"executor": {
"instances": 1,
"container_resources": {
"gpu": {
"name": null,
"quantity": 0
},
"cpu": {
"request": null,
"limit": null
},
"memory": {
"request": null,
"limit": null
}
}
}
}
},
"kubernetes": {
"env_vars": [],
"env_from": [],
"node_selector": {},
"affinity": {
"nodeAffinity": {},
"podAffinity": {},
"podAntiAffinity": {}
},
"tolerations": [],
"config_map_mounts": {},
"volume_mounts": [
{
"name": "config",
"mountPath": "/airflow"
}
],
"volumes": [
{
"name": "config",
"persistentVolumeClaim": {
"claimName": "hsaljoog-airflow"
}
}
],
"from_env_config_map": [],
"from_env_secret": [],
"in_cluster": true,
"conn_id": "kubernetes_default",
"kube_config_file": null,
"cluster_context": null
}
}
除了使用 YAML 或 JSON 檔案之外,另一種方法是直接傳遞 template_spec
欄位,而不是 application_file,如果您不想使用檔案進行組態設定。
KubernetesJobOperator¶
KubernetesJobOperator
允許您在 Kubernetes 叢集上建立和執行 Job。
注意
如果您使用託管 Kubernetes,請考慮使用專門的 KJO 運算子,因為它可以簡化 Kubernetes 授權流程
GKEStartJobOperator
運算子適用於 Google Kubernetes Engine。
注意
使用此運算子 **不** 需要 Kubernetes 執行器。
此運算子如何運作?¶
KubernetesJobOperator
使用 Kubernetes API 在 Kubernetes 叢集中啟動 Job。此運算子使用 Kube Python Client 產生 Kubernetes API 請求,以動態啟動此 Job。使用者可以使用 config_file
參數指定 kubeconfig 檔案,否則運算子將預設為 ~/.kube/config
。它也允許使用者使用 job_template_file
參數提供範本 YAML 檔案。
k8s_job = KubernetesJobOperator(
task_id="job-task",
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
KubernetesJobOperator
也支援可延遲模式
k8s_job_def = KubernetesJobOperator(
task_id="job-task-def",
namespace="default",
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME + "-def",
wait_until_job_complete=True,
deferrable=True,
)
KubernetesPodOperator
和 KubernetesJobOperator
之間的差異¶
KubernetesJobOperator
是用於建立 Job 的運算子。Job 會建立一個或多個 Pod,並將繼續重試 Pod 的執行,直到指定數量的 Pod 成功終止。隨著 Pod 成功完成,Job 會追蹤成功的完成次數。當達到指定數量的成功完成次數時,Job 即完成。使用者可以使用組態參數(例如 activeDeadlineSeconds
和 backoffLimit
)限制 Job 重試執行的次數。此運算子不是使用用於建立 Pod 的 template
參數,而是使用 KubernetesPodOperator
。這表示使用者可以在 KubernetesJobOperator
中使用 KubernetesPodOperator
中的所有參數。
關於 Job 的更多資訊請參閱此處:Kubernetes Job 文件
KubernetesDeleteJobOperator¶
KubernetesDeleteJobOperator
允許您刪除 Kubernetes 叢集上的 Job。
delete_job_task = KubernetesDeleteJobOperator(
task_id="delete_job_task",
name=k8s_job.output["job_name"],
namespace=JOB_NAMESPACE,
wait_for_completion=True,
delete_on_status="Complete",
poll_interval=1.0,
)
KubernetesPatchJobOperator¶
KubernetesPatchJobOperator
允許您更新 Kubernetes 叢集上的 Job。
update_job = KubernetesPatchJobOperator(
task_id="update-job-task",
namespace="default",
name=k8s_job.output["job_name"],
body={"spec": {"suspend": False}},
)
KubernetesInstallKueueOperator¶
KubernetesInstallKueueOperator
允許您在 Kubernetes 叢集中安裝 Kueue 組件
install_kueue = KubernetesInstallKueueOperator(
task_id="install_kueue",
kueue_version="v0.9.1",
)
KubernetesStartKueueJobOperator¶
KubernetesStartKueueJobOperator
允許您在 Kubernetes 叢集中啟動 Kueue 工作
start_kueue_job = KubernetesStartKueueJobOperator(
task_id="kueue_job",
queue_name=QUEUE_NAME,
namespace="default",
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
wait_until_job_complete=True,
)
如需更多資訊,請參閱