airflow.providers.cncf.kubernetes.operators.job

執行 Kubernetes Job。

模組內容

類別

KubernetesJobOperator

執行 Kubernetes Job。

KubernetesDeleteJobOperator

刪除 Kubernetes Job。

KubernetesPatchJobOperator

更新 Kubernetes Job。

屬性

log

airflow.providers.cncf.kubernetes.operators.job.log[source]
class airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator(*, job_template_file=None, full_job_spec=None, backoff_limit=None, completion_mode=None, completions=None, manual_selector=None, parallelism=None, selector=None, suspend=None, ttl_seconds_after_finished=None, wait_until_job_complete=False, job_poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基底類別: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

執行 Kubernetes Job。

另請參閱

關於如何使用此運算子的更多資訊,請參閱指南: KubernetesJobOperator

注意

如果您使用 Google Kubernetes Engine 且 Airflow 未在同一個叢集中執行,請考慮使用 GKEStartJobOperator,這可以簡化授權流程。

參數
  • job_template_file (str | None) – Job 範本檔案路徑 (可使用 Jinja 範本)

  • full_job_spec (kubernetes.client.models.V1Job | None) – 完整的 JobSpec

  • backoff_limit (int | None) – 指定在將此 Job 標記為失敗之前重試的次數。預設值為 6

  • completion_mode (str | None) – CompletionMode 指定如何追蹤 Pod 完成次數。可以是 NonIndexed (預設) 或 Indexed

  • completions (int | None) – 指定 Job 應成功完成的 Pod 數量。

  • manual_selector (bool | None) – manualSelector 控制 Pod 標籤和 Pod 選擇器的產生方式。

  • parallelism (int | None) – 指定 Job 在任何給定時間應執行的最大 Pod 數量。

  • selector (kubernetes.client.models.V1LabelSelector | None) – 此 V1JobSpec 的選擇器。

  • suspend (bool | None) – Suspend 指定 Job 控制器是否應建立 Pod。

  • ttl_seconds_after_finished (int | None) – ttlSecondsAfterFinished 限制已完成執行的 Job (無論是 Complete 或 Failed) 的生命週期。

  • wait_until_job_complete (bool) – 是否等待已啟動的 Job 完成執行 (無論是 Complete 或 Failed)。預設值為 False。

  • job_poll_interval (float) – 輪詢 Job 狀態的間隔秒數。預設值為 10。如果參數 wait_until_job_complete 設定為 True 則會使用。

  • deferrable (bool) – 以可延遲模式執行運算子。請注意,參數 wait_until_job_complete 必須設定為 True。

template_fields: collections.abc.Sequence[str][source]
hook()[source]
job_client()[source]
create_job(job_request_obj)[source]
execute(context)[source]

根據 deferrable 參數,以非同步或同步方式執行 Pod。

execute_deferrable()[source]
execute_complete(context, event, **kwargs)[source]
static deserialize_job_template_file(path)[source]

從檔案產生 Job。

很遺憾,我們需要存取 Kubernetes Client 中的私有方法 _ApiClient__deserialize_model。此問題已在此處追蹤: https://github.com/kubernetes-client/python/issues/977

參數

path (str) – 檔案路徑

回傳

kubernetes.client.models.V1Job

回傳類型

kubernetes.client.models.V1Job

on_kill()[source]

覆寫此方法以在任務執行個體被終止時清除子程序。

在運算子中使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清除,否則會留下無效程序。

build_job_request_obj(context=None)[source]

根據 Job 範本檔案、完整 Job 規格和其他運算子參數回傳 V1Job 物件。

V1Job 屬性衍生自 (依優先順序) 運算子參數、完整 Job 規格、Job 範本檔案。

static reconcile_jobs(base_job, client_job)[source]

合併 Kubernetes Job 物件。

參數
  • base_job (kubernetes.client.models.V1Job) – 具有基本屬性,如果這些屬性存在於 client job 中則會被覆寫,如果不存在於 client_job 中則會保留

  • client_job (kubernetes.client.models.V1Job | None) – 用戶端想要建立的 Job。

回傳

已合併的 Job

回傳類型

kubernetes.client.models.V1Job

這無法以遞迴方式完成,因為某些欄位會被覆寫,而另一些欄位則會被串連。

static reconcile_job_specs(base_spec, client_spec)[source]

合併 Kubernetes JobSpec 物件。

參數
  • base_spec (kubernetes.client.models.V1JobSpec | None) – 具有基本屬性,如果這些屬性存在於 client_spec 中則會被覆寫,如果不存在於 client_spec 中則會保留

  • client_spec (kubernetes.client.models.V1JobSpec | None) – 用戶端想要建立的 spec。

回傳

已合併的 spec

回傳類型

kubernetes.client.models.V1JobSpec | None

class airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator(*, name, namespace, kubernetes_conn_id=KubernetesHook.default_conn_name, config_file=None, in_cluster=None, cluster_context=None, delete_on_status=None, wait_for_completion=False, poll_interval=10.0, **kwargs)[source]

基底類別: airflow.models.BaseOperator

刪除 Kubernetes Job。

另請參閱

關於如何使用此運算子的更多資訊,請參閱指南: KubernetesDeleteJobOperator

參數
  • name (str) – Job 的名稱。

  • namespace (str) – 要在 Kubernetes 中執行的命名空間。

  • kubernetes_conn_id (str | None) – Kubernetes 叢集的 kubernetes 連線 ID

  • config_file (str | None) – Kubernetes 組態檔的路徑。(可使用 Jinja 範本) 如果未指定,預設值為 ~/.kube/config

  • in_cluster (bool | None) – 使用叢集內組態執行 Kubernetes Client。

  • cluster_context (str | None) – 指向 Kubernetes 叢集的內容。當 in_cluster 為 True 時會忽略。如果為 None,則使用 current-context。(可使用 Jinja 範本)

  • delete_on_status (str | None) – 根據 Job 狀態執行刪除操作的條件。值: None - 無論 Job 狀態為何都刪除 Job,“Complete” - 僅刪除成功完成的 Job,“Failed” - 僅刪除失敗的 Job。(預設值: None)

  • wait_for_completion (bool) – 是否等待 Job 完成。(預設值: False)

  • poll_interval (float) – 輪詢 Job 狀態的間隔秒數。當設定 delete_on_status 參數時使用。(預設值: 10.0)

template_fields: collections.abc.Sequence[str] = ('config_file', 'name', 'namespace', 'cluster_context')[source]
hook()[source]
client()[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.cncf.kubernetes.operators.job.KubernetesPatchJobOperator(*, name, namespace, body, kubernetes_conn_id=KubernetesHook.default_conn_name, config_file=None, in_cluster=None, cluster_context=None, **kwargs)[source]

基底類別: airflow.models.BaseOperator

更新 Kubernetes Job。

另請參閱

關於如何使用此運算子的更多資訊,請參閱指南: KubernetesPatchJobOperator

參數
  • name (str) – Job 的名稱

  • namespace (str) – 要在 Kubernetes 中執行的命名空間

  • body (object) – 具有更新參數的 Job json 物件 https://kubernetes.dev.org.tw/docs/reference/generated/kubernetes-api/v1.25/#job-v1-batch 例如 {"spec": {"suspend": True}}

  • kubernetes_conn_id (str | None) – Kubernetes 叢集的 kubernetes 連線 ID

  • config_file (str | None) – Kubernetes 組態檔的路徑。(可使用 Jinja 範本) 如果未指定,預設值為 ~/.kube/config

  • in_cluster (bool | None) – 使用叢集內組態執行 Kubernetes Client。

  • cluster_context (str | None) – 指向 Kubernetes 叢集的內容。當 in_cluster 為 True 時會忽略。如果為 None,則使用 current-context。(可使用 Jinja 範本)

template_fields: collections.abc.Sequence[str] = ('config_file', 'name', 'namespace', 'body', 'cluster_context')[source]
hook()[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 Jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

這個條目是否有幫助?