airflow.providers.cncf.kubernetes.operators.spark_kubernetes

模組內容

類別

SparkKubernetesOperator

在 kubernetes 叢集中建立 sparkApplication 物件。

class airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator(*, image=None, code_path=None, namespace='default', name=None, application_file=None, template_spec=None, get_logs=True, do_xcom_push=False, success_run_history_limit=1, startup_timeout_seconds=600, log_events_on_failure=False, reattach_on_restart=True, delete_on_termination=True, kubernetes_conn_id='kubernetes_default', random_name_suffix=True, **kwargs)[原始碼]

基底類別: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

在 kubernetes 叢集中建立 sparkApplication 物件。

另請參閱

關於 Spark Application Object 的更多詳細資訊,請參閱參考資料: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication

參數
  • image (str | None) – 您想要啟動的 Docker 映像檔。預設為 hub.docker.com,

  • code_path (str | None) – 映像檔中 spark 程式碼的路徑,

  • namespace (str) – 要放置 sparkApplication 的 kubernetes 命名空間

  • name (str | None) – 任務將在其中執行的 Pod 名稱,將用於 (如果 random_name_suffix 為 True,則加上隨機後綴) 生成 Pod ID (DNS-1123 子網域,僅包含 [a-z0-9.-])。

  • application_file (str | None) – sparkApplication 的 kubernetes custom_resource_definition 檔案路徑

  • template_spec – kubernetes sparkApplication 規格

  • get_logs (bool) – 取得容器的 stdout 作為任務的日誌。

  • do_xcom_push (bool) – 如果為 True,容器完成時,容器中 /airflow/xcom/return.json 檔案的內容也將被推送到 XCom。

  • success_run_history_limit (int) – 要保留的應用程式過去成功執行次數。

  • startup_timeout_seconds – 啟動 Pod 的逾時秒數。

  • log_events_on_failure (bool) – 如果發生故障,則記錄 Pod 的事件

  • reattach_on_restart (bool) – 如果排程器在 Pod 執行時死掉,則重新附加並監控

  • delete_on_termination (bool) – 當 Pod 達到最終狀態或執行中斷時要執行的操作。如果為 True (預設值),則刪除 Pod;如果為 False,則保留 Pod。

  • kubernetes_conn_id (str) – 連接到 Kubernetes 叢集的連線

  • random_name_suffix (bool) – 如果為 True,則將隨機後綴添加到 Pod 名稱

property template_body[原始碼]

CustomObjectLauncher 的範本主體。

template_fields = ['application_file', 'namespace', 'template_spec', 'kubernetes_conn_id'][原始碼]
template_fields_renderers[原始碼]
template_ext = ('yaml', 'yml', 'json')[原始碼]
ui_color = '#f4a460'[原始碼]
BASE_CONTAINER_NAME = 'spark-kubernetes-driver'[原始碼]
manage_template_specs()[原始碼]
create_job_name()[原始碼]
static create_labels_for_pod(context=None, include_try_number=True)[原始碼]

產生 Pod 的標籤,以便在 Operator 崩潰時追蹤 Pod。

參數
  • include_try_number (bool) – 將嘗試次數新增至標籤

  • context (dict | None) – airflow DAG 提供的任務上下文

回傳

dict。

回傳類型

dict

pod_manager()[原始碼]
find_spark_job(context)[原始碼]
get_or_create_spark_crd(launcher, context)[原始碼]
process_pod_deletion(pod, *, reraise=True)[原始碼]
hook()[原始碼]
client()[原始碼]
custom_obj_api()[原始碼]
execute(context)[原始碼]

根據 deferrable 參數,以非同步或同步方式執行 Pod。

on_kill()[原始碼]

覆寫此方法以在任務實例被終止時清除子行程。

在 Operator 中使用 threading、subprocess 或 multiprocessing 模組的任何情況都需要清理,否則會留下無效的行程。

patch_already_checked(pod, *, reraise=True)[原始碼]

新增「已檢查」註釋,以確保我們不會在重試時重新附加。

dry_run()[原始碼]

印出此 Operator 將建立的 spark 任務。

這個條目有幫助嗎?