airflow.providers.cncf.kubernetes.utils.pod_manager

啟動 POD。

模組內容

類別

PodPhase

可能的 Pod 階段。

PodOperatorHookProtocol

定義 KubernetesPodOperator 所依賴方法的協議。

PodLoggingStatus

fetch_container_logs 退出時,返回 Pod 的狀態和上次日誌時間。

PodManager

創建、監控 Kubernetes Pod,並與之互動,以便與 KubernetesPodOperator 一起使用。

OnFinishAction

Pod 完成時要採取的動作。

函數

should_retry_start_pod(exception)

檢查 Exception 是否指示暫時性錯誤並值得重試。

get_container_status(pod, container_name)

檢索容器狀態。

container_is_running(pod, container_name)

檢查 V1Pod pod 以確定 container_name 是否正在運行。

container_is_completed(pod, container_name)

檢查 V1Pod pod 以確定 container_name 是否已完成。

container_is_succeeded(pod, container_name)

檢查 V1Pod pod 以確定 container_name 是否已完成並成功。

container_is_wait(pod, container_name)

檢查 V1Pod pod 以確定 container_name 是否正在等待。

container_is_terminated(pod, container_name)

檢查 V1Pod pod 以確定 container_name 是否已終止。

get_container_termination_message(pod, container_name)

check_exception_is_kubernetes_api_unauthorized(exc)

is_log_group_marker(line)

檢查該行是否為日誌群組標記,例如 ::group::::endgroup::

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[source]

基礎:airflow.exceptions.AirflowException

當在 KubernetesPodOperator 中啟動 Pod 失敗時。

airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[source]

檢查 Exception 是否指示暫時性錯誤並值得重試。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[source]

可能的 Pod 階段。

請參閱 https://kubernetes.dev.org.tw/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase

PENDING = 'Pending'[source]
RUNNING = 'Running'[source]
FAILED = 'Failed'[source]
SUCCEEDED = 'Succeeded'[source]
terminal_states[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodOperatorHookProtocol[source]

基礎:Protocol

定義 KubernetesPodOperator 所依賴方法的協議。

KubernetesPodOperator 的子類別,例如 GKEStartPodOperator,可能會使用未擴展 KubernetesHook 的 Hook。我們使用此協議來記錄 KPO 使用的方法,並確保這些方法存在於其他此類 Hook 上。

property core_v1_client: kubernetes.client.CoreV1Api[source]

取得已驗證的客戶端物件。

property is_in_cluster: bool[source]

揭露 Hook 是否配置了 load_incluster_config

get_pod(name, namespace)[source]

從 Kubernetes API 讀取 Pod 物件。

get_namespace()[source]

返回連線中定義的命名空間。

get_xcom_sidecar_container_image()[source]

返回連線中定義的 xcom sidecar 映像。

get_xcom_sidecar_container_resources()[source]

返回連線中定義的 xcom sidecar 資源。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status(pod, container_name)[source]

檢索容器狀態。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running(pod, container_name)[source]

檢查 V1Pod pod 以確定 container_name 是否正在運行。

如果該容器存在且正在運行,則返回 True。否則返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_completed(pod, container_name)[source]

檢查 V1Pod pod 以確定 container_name 是否已完成。

如果該容器存在且已完成,則返回 True。否則返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded(pod, container_name)[source]

檢查 V1Pod pod 以確定 container_name 是否已完成並成功。

如果該容器存在、已完成且已成功,則返回 True。否則返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_wait(pod, container_name)[source]

檢查 V1Pod pod 以確定 container_name 是否正在等待。

如果該容器存在且正在等待,則返回 True。否則返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_terminated(pod, container_name)[source]

檢查 V1Pod pod 以確定 container_name 是否已終止。

如果該容器存在且已終止,則返回 True。否則返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_termination_message(pod, container_name)[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[source]
exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[source]

基礎:airflow.exceptions.AirflowException

當 Pod 在指定的超時時間內未離開 `Pending` 階段時。

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[source]

基礎:airflow.exceptions.AirflowException

預期的 Pod 在 kube-api 中不存在。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[source]

fetch_container_logs 退出時,返回 Pod 的狀態和上次日誌時間。

running: bool[source]
last_log_time: pendulum.DateTime | None[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[source]

基礎:airflow.utils.log.logging_mixin.LoggingMixin

創建、監控 Kubernetes Pod,並與之互動,以便與 KubernetesPodOperator 一起使用。

run_pod_async(pod, **kwargs)[source]

異步運行 POD。

delete_pod(pod)[source]

刪除 POD。

create_pod(pod)[source]

異步啟動 Pod。

await_pod_start(pod, startup_timeout=120, startup_check_interval=1)[source]

等待 Pod 達到 Pending 以外的階段。

參數
  • pod (kubernetes.client.models.v1_pod.V1Pod) –

  • startup_timeout (int) – Pod 啟動的超時時間(秒)(如果 Pod 待定時間過長,則任務失敗)

  • startup_check_interval (int) – 檢查之間的間隔(秒)

返回

返回類型

None

await_container_completion(pod, container_name)[source]

等待給定 Pod 中的給定容器完成。

參數
  • pod (kubernetes.client.models.v1_pod.V1Pod) – 將被監控的 Pod 規範

  • container_name (str) – Pod 內要監控的容器名稱

await_pod_completion(pod, istio_enabled=False, container_name=base)[source]

監控 Pod 並返回最終狀態。

參數
  • istio_enabled (bool) – Istio 是否在命名空間中啟用

  • pod (kubernetes.client.models.v1_pod.V1Pod) – 將被監控的 Pod 規範

  • container_name (str) – Pod 內容器的名稱

返回

tuple[State, str | None]

返回類型

kubernetes.client.models.v1_pod.V1Pod

parse_log_line(line)[source]

解析 K8s 日誌行並返回最終狀態。

參數

line (str) – k8s 日誌行

返回

時間戳記和日誌訊息

返回類型

tuple[pendulum.DateTime | None, str]

container_is_running(pod, container_name)[source]

讀取 Pod 並檢查容器是否正在運行。

container_is_terminated(pod, container_name)[source]

讀取 Pod 並檢查容器是否已終止。

read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]

從 POD 讀取日誌。

read_pod_events(pod)[source]

從 POD 讀取事件。

read_pod(pod)[source]

讀取 POD 資訊。

await_xcom_sidecar_container_start(pod, timeout=900, log_interval=30)[source]

檢查 sidecar 容器是否已達到「Running」狀態,然後再執行 do_xcom_push。

extract_xcom(pod)[source]

檢索 XCom 值並終止 xcom sidecar 容器。

extract_xcom_json(pod)[source]

檢索 XCom 值並檢查 xcom json 是否有效。

extract_xcom_kill(pod)[source]

終止 xcom sidecar 容器。

class airflow.providers.cncf.kubernetes.utils.pod_manager.OnFinishAction[source]

基礎: str, enum.Enum

Pod 完成時要採取的動作。

KEEP_POD = 'keep_pod'[原始碼]
DELETE_POD = 'delete_pod'[原始碼]
DELETE_SUCCEEDED_POD = 'delete_succeeded_pod'[原始碼]
airflow.providers.cncf.kubernetes.utils.pod_manager.is_log_group_marker(line)[原始碼]

檢查該行是否為日誌群組標記,例如 ::group::::endgroup::

這個條目是否有幫助?