airflow.providers.cncf.kubernetes.operators.pod

在 Kubernetes POD 中執行任務。

模組內容

類別

PodEventType

Kubernetes Pod 發出的事件類型。

KubernetesPodOperator

在 Kubernetes Pod 中執行任務。

屬性

alphanum_lower

KUBE_CONFIG_ENV_VAR

airflow.providers.cncf.kubernetes.operators.pod.alphanum_lower[source]
airflow.providers.cncf.kubernetes.operators.pod.KUBE_CONFIG_ENV_VAR = 'KUBECONFIG'[source]
class airflow.providers.cncf.kubernetes.operators.pod.PodEventType[source]

基底: enum.Enum

Kubernetes Pod 發出的事件類型。

WARNING = 'Warning'[source]
NORMAL = 'Normal'[source]
exception airflow.providers.cncf.kubernetes.operators.pod.PodReattachFailure[source]

基底: airflow.exceptions.AirflowException

當我們預期可以找到 Pod 卻找不到時。

exception airflow.providers.cncf.kubernetes.operators.pod.PodCredentialsExpiredFailure[source]

基底: airflow.exceptions.AirflowException

當 Pod 刷新憑證失敗時。

class airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator(*, kubernetes_conn_id=KubernetesHook.default_conn_name, namespace=None, image=None, name=None, random_name_suffix=True, cmds=None, arguments=None, ports=None, volume_mounts=None, volumes=None, env_vars=None, env_from=None, secrets=None, in_cluster=None, cluster_context=None, labels=None, reattach_on_restart=True, startup_timeout_seconds=120, startup_check_interval_seconds=5, get_logs=True, base_container_name=None, init_container_logs=None, container_logs=None, image_pull_policy=None, annotations=None, container_resources=None, affinity=None, config_file=None, node_selector=None, image_pull_secrets=None, service_account_name=None, hostnetwork=False, host_aliases=None, tolerations=None, security_context=None, container_security_context=None, dnspolicy=None, dns_config=None, hostname=None, subdomain=None, schedulername=None, full_pod_spec=None, init_containers=None, log_events_on_failure=False, do_xcom_push=False, pod_template_file=None, pod_template_dict=None, priority_class_name=None, pod_runtime_info_envs=None, termination_grace_period=None, configmaps=None, skip_on_exit_code=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=2, log_pod_spec_on_failure=True, on_finish_action='delete_pod', is_delete_operator_pod=None, termination_message_policy='File', active_deadline_seconds=None, callbacks=None, progress_callback=None, logging_interval=None, **kwargs)[source]

基底: airflow.models.BaseOperator

在 Kubernetes Pod 中執行任務。

參見

關於如何使用此運算子的更多資訊,請查看指南: KubernetesPodOperator

注意

如果您使用 Google Kubernetes Engine 且 Airflow 未在同一個叢集中執行,請考慮使用 GKEStartPodOperator,這簡化了授權流程。

參數
  • kubernetes_conn_id (str | None) – Kubernetes 叢集的 kubernetes 連線 ID

  • namespace (str | None) – 在 Kubernetes 中運行的命名空間。

  • image (str | None) – 您想要啟動的 Docker 映像。預設為 hub.docker.com,但完整 URL 將指向自訂倉庫。(樣板化)

  • name (str | None) – 任務將在其中運行的 Pod 名稱,將用於(如果 random_name_suffix 為 True,則加上隨機後綴)產生 Pod ID(DNS-1123 子網域,僅包含 [a-z0-9.-])。

  • random_name_suffix (bool) – 如果為 True,將產生隨機後綴。

  • cmds (list[str] | None) – 容器的進入點。(樣板化)如果未提供,則使用 Docker 映像的進入點。

  • arguments (list[str] | None) – 進入點的參數。(樣板化)如果未提供,則使用 Docker 映像的 CMD。

  • ports (list[kubernetes.client.models.V1ContainerPort] | None) – 啟動的 Pod 的端口。

  • volume_mounts (list[kubernetes.client.models.V1VolumeMount] | None) – 啟動的 Pod 的 volumeMounts。

  • volumes (list[kubernetes.client.models.V1Volume] | None) – 啟動的 Pod 的 Volumes。包括 ConfigMaps 和 PersistentVolumes。

  • env_vars (list[kubernetes.client.models.V1EnvVar] | dict[str, str] | None) – 在容器中初始化的環境變數。(樣板化)

  • env_from (list[kubernetes.client.models.V1EnvFromSource] | None) – (可選)用於在容器中填充環境變數的來源列表。

  • secrets (list[airflow.providers.cncf.kubernetes.secret.Secret] | None) – 要注入到容器中的 Kubernetes secrets。它們可以作為環境變數或磁碟區中的檔案公開。

  • in_cluster (bool | None) – 使用 in-cluster 配置執行 Kubernetes 客戶端。

  • cluster_context (str | None) – 指向 Kubernetes 叢集的上下文。當 in_cluster 為 True 時忽略。如果為 None,則使用 current-context。(樣板化)

  • reattach_on_restart (bool) – 如果 worker 在 Pod 運行時死掉,在下次嘗試時重新連接並監控。如果為 False,則始終為每次嘗試建立一個新的 Pod。

  • labels (dict | None) – 應用於 Pod 的標籤。(樣板化)

  • startup_timeout_seconds (int) – 啟動 Pod 的超時時間(秒)。

  • startup_check_interval_seconds (int) – 檢查 Pod 是否已啟動的間隔時間(秒)

  • get_logs (bool) – 取得基礎容器的 stdout 作為任務日誌。

  • init_container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 將日誌發佈到 stdout 的 init 容器列表。接受容器序列、單個容器名稱或 True。如果為 True,則發佈所有容器日誌。

  • container_logs (collections.abc.Iterable[str] | str | Literal[True] | None) – 將日誌發佈到 stdout 的容器列表。接受容器序列、單個容器名稱或 True。如果為 True,則發佈所有容器日誌。與 get_logs 參數結合使用。預設值為基礎容器。

  • image_pull_policy (str | None) – 指定一個策略來快取或總是拉取映像。

  • annotations (dict | None) – 您可以附加到 Pod 的非識別元數據。可以是大量資料,並且可以包含標籤不允許的字元。(樣板化)

  • container_resources (kubernetes.client.models.V1ResourceRequirements | None) – 啟動的 Pod 的資源。(樣板化)

  • affinity (kubernetes.client.models.V1Affinity | None) – 啟動的 Pod 的親和性調度規則。

  • config_file (str | None) – Kubernetes 配置文件的路徑。(樣板化)如果未指定,預設值為 ~/.kube/config

  • node_selector (dict | None) – 包含一組調度規則的字典。(樣板化)

  • image_pull_secrets (list[kubernetes.client.models.V1LocalObjectReference] | None) – 要給予 Pod 的任何映像拉取密鑰。如果需要多個密鑰,請提供逗號分隔的列表:secret_a,secret_b

  • service_account_name (str | None) – 服務帳戶的名稱

  • hostnetwork (bool) – 如果為 True,則在 Pod 上啟用主機網路。

  • host_aliases (list[kubernetes.client.models.V1HostAlias] | None) – 應用於 Pod 中容器的主機別名列表。

  • tolerations (list[kubernetes.client.models.V1Toleration] | None) – Kubernetes 容忍度列表。

  • security_context (kubernetes.client.models.V1PodSecurityContext | dict | None) – Pod 應運行的安全選項 (PodSecurityContext)。

  • container_security_context (kubernetes.client.models.V1SecurityContext | dict | None) – 容器應運行的安全選項。

  • dnspolicy (str | None) – Pod 的 dnspolicy。

  • dns_config (kubernetes.client.models.V1PodDNSConfig | None) – Pod 的 DNS 配置(IP 地址、搜索、選項)。

  • hostname (str | None) – Pod 的主機名稱。

  • subdomain (str | None) – Pod 的子網域。

  • schedulername (str | None) – 為 Pod 指定一個 schedulername

  • full_pod_spec (kubernetes.client.models.V1Pod | None) – 完整的 podSpec

  • init_containers (list[kubernetes.client.models.V1Container] | None) – 啟動的 Pod 的 init 容器

  • log_events_on_failure (bool) – 如果發生故障,記錄 Pod 的事件

  • do_xcom_push (bool) – 如果為 True,當容器完成時,容器中 /airflow/xcom/return.json 文件的內容也將被推送到 XCom。

  • pod_template_file (str | None) – Pod 模板檔案的路徑(樣板化)

  • pod_template_dict (dict | None) – Pod 模板字典(樣板化)

  • priority_class_name (str | None) – 啟動的 Pod 的優先級類名稱

  • pod_runtime_info_envs (list[kubernetes.client.models.V1EnvVar] | None) – (可選)容器中要設定的環境變數列表。

  • termination_grace_period (int | None) – 如果任務在 UI 中被終止,終止寬限期,預設為 Kubernetes 預設值

  • configmaps (list[str] | None) – (可選)從中收集 ConfigMap 以填充環境變數的 ConfigMap 名稱列表。目標 ConfigMap 的 Data 欄位內容將表示為鍵值對的環境變數。擴展 env_from。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果任務以此退出代碼退出,則將任務保持在 skipped 狀態(預設值:None)。如果設定為 None,任何非零退出代碼都將被視為失敗。

  • base_container_name (str | None) – Pod 中基礎容器的名稱。如果 get_logs 為 True,則此容器的日誌將顯示為此任務日誌的一部分。預設值為 None。如果為 None,將查詢類別變數 BASE_CONTAINER_NAME(預設為 “base”)以取得要使用的基礎容器名稱。

  • deferrable (bool) – 在可延遲模式下運行運算符。

  • poll_interval (float) – 輪詢狀態的間隔時間(秒)。僅在可延遲模式下使用。

  • log_pod_spec_on_failure (bool) – 如果發生故障,記錄 Pod 的規範

  • on_finish_action (str) – 當 Pod 達到最終狀態或執行被中斷時,該怎麼做。如果為 “delete_pod”,則無論其狀態如何,都將刪除 Pod;如果為 “delete_succeeded_pod”,則僅刪除成功的 Pod。您可以設定為 “keep_pod” 以保留 Pod。

  • termination_message_policy (str) – 基礎容器的終止訊息策略。預設值為 “File”

  • active_deadline_seconds (int | None) – active_deadline_seconds,會轉譯為 V1PodSpec 中的 active_deadline_seconds。

  • callbacks (type[airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback] | None) – KubernetesPodOperatorCallback 實例,包含 KubernetesPodOperator 不同步驟的回呼方法。

  • logging_interval (int | None) – 任務在延遲狀態下,應該恢復以獲取最新日誌的最大時間(秒)。如果 None,則任務將保持延遲狀態,直到 Pod 完成,並且在那之前不會顯示任何日誌。

BASE_CONTAINER_NAME = 'base'[source]
ISTIO_CONTAINER_NAME = 'istio-proxy'[source]
KILL_ISTIO_PROXY_SUCCESS_MSG = 'HTTP/1.1 200'[source]
POD_CHECKED_KEY = 'already_checked'[source]
POST_TERMINATION_TIMEOUT = 120[source]
template_fields: collections.abc.Sequence[str] = ('image', 'cmds', 'annotations', 'arguments', 'env_vars', 'labels', 'config_file',...[source]
template_fields_renderers[source]
pod_manager()[source]
hook()[source]
client()[source]
find_pod(namespace, context, *, exclude_checked=True)[source]

傳回此任務實例是否已存在的正在運行的 Pod。

log_matching_pod(pod, context)[source]
get_or_create_pod(pod_request_obj, context)[source]
await_pod_start(pod)[source]
extract_xcom(pod)[source]

擷取 xcom 值並終止 xcom sidecar 容器。

execute(context)[source]

根據 deferrable 參數,異步或同步運行 Pod。

execute_sync(context)[source]
await_init_containers_completion(pod)[source]
await_pod_completion(pod)[source]
execute_async(context)[source]
convert_config_file_to_dict()[source]

將傳遞的 config_file 轉換為字典表示形式。

invoke_defer_method(last_log_time=None)[source]

重新定義在子類別中使用的觸發器。

trigger_reentry(context, event)[source]

從觸發器重新進入點。

如果 logging_interval 為 None,則在這一點,Pod 應該已完成,我們只需獲取日誌並退出。

如果 logging_interval 不為 None,則可能是 Pod 仍在運行,我們只需獲取最新的日誌並再次延遲回觸發器。

post_complete_action(*, pod, remote_pod, **kwargs)[source]

在運算符完成 deferrable_execution 邏輯後必須執行的動作。

cleanup(pod, remote_pod)[source]
is_istio_enabled(pod)[source]

通過檢查命名空間標籤來檢查 Pod 的命名空間是否啟用了 Istio。

kill_istio_sidecar(pod)[source]
process_pod_deletion(pod, *, reraise=True)[source]
patch_already_checked(pod, *, reraise=True)[source]

新增 “already checked” 標籤,以確保我們不會在重試時重新附加。

on_kill()[source]

覆寫此方法以在任務實例被終止時清理子進程。

在運算符內使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下無效的進程。

build_pod_request_obj(context=None)[source]

基於 Pod 範本檔案、完整 Pod 規格和其他運算符參數,傳回 V1Pod 物件。

V1Pod 屬性是從運算符參數、完整 Pod 規格、Pod 範本檔案(依優先順序)衍生而來。

dry_run()[source]

印出此運算符將建立的 Pod 定義。

不包含特定於任務實例的標籤(因為在 dry_run 中沒有任務實例),並排除所有空元素。

process_duplicate_label_pods(pod_list)[source]

修補或刪除具有重複標籤的現有 Pod。

這是為了處理僅當 reattach_on_restart 標誌為 False 時可能發生的邊緣情況,並且先前的運行嘗試因任務進程被叢集或其他進程外部終止而失敗。

如果任務進程在外部被終止,它會中斷程式碼執行並立即退出任務。因此,先前嘗試中建立的 Pod 將不會被 cleanup() 方法正確刪除或修補。

傳回新建立的 Pod 以用於下一次運行嘗試。

此條目是否有幫助?