KubernetesPodOperator

KubernetesPodOperator 允許您在 Kubernetes 叢集上建立和執行 Pod。

注意

如果您使用託管 Kubernetes,請考慮使用專門的 KPO 運算子,因為它可以簡化 Kubernetes 授權流程

注意

使用此運算子 **不** 需要 Kubernetes 執行器

此運算子如何運作?

KubernetesPodOperator 使用 Kubernetes API 在 Kubernetes 叢集中啟動 Pod。透過提供映像檔 URL 和包含選用引數的命令,此運算子使用 Kube Python Client 產生 Kubernetes API 請求,以動態啟動這些個別的 Pod。使用者可以使用 config_file 參數指定 kubeconfig 檔案,否則運算子將預設為 ~/.kube/config

KubernetesPodOperator 啟用任務層級的資源組態,並且最適合透過公開 PyPI 儲存庫無法取得的自訂 Python 相依性。它也允許使用者使用 pod_template_file 參數提供範本 YAML 檔案。最終,它允許 Airflow 作為工作流程協調器 — 無論這些工作流程是以何種語言撰寫。

偵錯 KubernetesPodOperator

您可以透過在運算子的實例上呼叫 dry_run(),列印出在執行階段將建立的 Pod 的 Kubernetes 宣告。

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()

引數優先順序

當 KPO 定義 Pod 物件時,KubernetesPodOperator 引數之間可能會有重疊。一般而言,優先順序順序為 KPO 欄位特定引數(例如,secretscmdsaffinity)、更通用的範本 full_pod_specpod_template_filepod_template_dict,然後依序為 V1Pod(預設)。

對於 namespace,如果未透過任何這些方法提供命名空間,那麼我們將首先嘗試取得目前的命名空間(如果任務已在 Kubernetes 中執行),如果失敗,我們將使用 default 命名空間。

對於 Pod 名稱,如果未明確提供,我們將使用 task_id。預設會新增隨機後綴,因此 Pod 名稱通常並不重要。

如何在 Pod 中使用叢集 ConfigMap、Secret 和 Volume?

若要新增 ConfigMap、Volume 和其他 Kubernetes 原生物件,我們建議您匯入 Kubernetes 模型 API,如下所示

from kubernetes.client import models as k8s

透過此 API 物件,您可以存取 Python 類別形式的所有 Kubernetes API 物件。使用此方法將確保正確性和類型安全性。雖然我們已移除幾乎所有 Kubernetes 便利類別,但我們保留了 Secret 類別,以簡化產生密碼 Volume/環境變數的流程。

tests/system/cncf/kubernetes/example_kubernetes.py[原始碼]

secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)

configmaps = [
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)

port = k8s.V1ContainerPort(name="http", container_port=80)

init_container_volume_mounts = [
    k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]

init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]

init_container = k8s.V1Container(
    name="init-container",
    image="ubuntu:16.04",
    env=init_environments,
    volume_mounts=init_container_volume_mounts,
    command=["bash", "-cx"],
    args=["echo 10"],
)

affinity = k8s.V1Affinity(
    node_affinity=k8s.V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[
            k8s.V1PreferredSchedulingTerm(
                weight=1,
                preference=k8s.V1NodeSelectorTerm(
                    match_expressions=[
                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
                    ]
                ),
            )
        ]
    ),
    pod_affinity=k8s.V1PodAffinity(
        required_during_scheduling_ignored_during_execution=[
            k8s.V1WeightedPodAffinityTerm(
                weight=1,
                pod_affinity_term=k8s.V1PodAffinityTerm(
                    label_selector=k8s.V1LabelSelector(
                        match_expressions=[
                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
                        ]
                    ),
                    topology_key="failure-domain.beta.kubernetes.io/zone",
                ),
            )
        ]
    ),
)

tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]

KubernetesPodOperator 與 Kubernetes 物件規格之間的差異

KubernetesPodOperator 可以視為 Kubernetes 物件規格定義的替代方案,該定義能夠在 DAG 環境中的 Airflow 排程器中執行。如果使用運算子,則無需為您想要執行的 Pod 建立等效的 YAML/JSON 物件規格。YAML 檔案仍然可以使用 pod_template_file 提供,甚至可以使用 full_pod_spec 參數在 Python 中建構 Pod 規格,這需要 Kubernetes V1Pod

如何使用私有映像檔(容器登錄檔)?

預設情況下,KubernetesPodOperator 將尋找 Dockerhub 上公開託管的映像檔。若要從私有登錄檔(例如 ECR、GCR、Quay 或其他)提取映像檔,您必須建立 Kubernetes Secret,以表示從私有登錄檔存取映像檔的憑證,最終在 image_pull_secrets 參數中指定。

使用 kubectl 建立 Secret

kubectl create secret docker-registry testquay \
    --docker-server=quay.io \
    --docker-username=<Profile name> \
    --docker-password=<password>

然後在您的 Pod 中使用它,如下所示

tests/system/cncf/kubernetes/example_kubernetes.py[原始碼]

    quay_k8s = KubernetesPodOperator(
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="task-two",
        get_logs=True,
    )

此外,對於此動作,您可以在可延遲模式中使用運算子

tests/system/cncf/kubernetes/example_kubernetes_async.py[原始碼]

    quay_k8s_async = KubernetesPodOperator(
        task_id="kubernetes_private_img_task_async",
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

定期擷取和顯示容器日誌的範例

tests/system/cncf/kubernetes/example_kubernetes_async.py[原始碼]

    kubernetes_task_async_log = KubernetesPodOperator(
        task_id="kubernetes_task_async_log",
        namespace="kubernetes_task_async_log",
        in_cluster=False,
        name="astro_k8s_test_pod",
        image="ubuntu",
        cmds=[
            "bash",
            "-cx",
            (
                "i=0; "
                "while [ $i -ne 100 ]; "
                "do i=$(($i+1)); "
                "echo $i; "
                "sleep 1; "
                "done; "
                "mkdir -p /airflow/xcom/; "
                'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
            ),
        ],
        do_xcom_push=True,
        deferrable=True,
        get_logs=True,
        logging_interval=5,
    )

XCom 如何運作?

KubernetesPodOperator 處理 XCom 值的方式與其他運算子不同。為了從您的 Pod 傳遞 XCom 值,您必須將 do_xcom_push 指定為 True。這將建立一個與 Pod 並行執行的 Sidecar 容器。Pod 必須將 XCom 值寫入此位置的 /airflow/xcom/return.json 路徑。

注意

無效的 JSON 內容將會失敗,例如 echo 'hello' > /airflow/xcom/return.json 會失敗,而 echo '\"hello\"' > /airflow/xcom/return.json 則會成功

請參閱以下範例,了解其運作方式

tests/system/cncf/kubernetes/example_kubernetes.py[原始碼]

    write_xcom = KubernetesPodOperator(
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

注意

XCOM 將僅針對標記為 State.SUCCESS 的任務推送。

此外,對於此動作,您可以在可延遲模式中使用運算子

tests/system/cncf/kubernetes/example_kubernetes_async.py[原始碼]

    write_xcom_async = KubernetesPodOperator(
        task_id="kubernetes_write_xcom_task_async",
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

    pod_task_xcom_result_async = BashOperator(
        task_id="pod_task_xcom_result_async",
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
    )

    write_xcom_async >> pod_task_xcom_result_async

在電子郵件警示中包含錯誤訊息

寫入 /dev/termination-log 的任何內容都將由 Kubernetes 擷取,並在任務失敗時包含在例外訊息中。

k = KubernetesPodOperator(
    task_id="test_error_message",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-error-message",
    email="airflow@example.com",
    email_on_failure=True,
)

在此處閱讀更多關於 termination-log 的資訊 這裡

KubernetesPodOperator 回呼

KubernetesPodOperator 支援不同的回呼,可用於在 Pod 的生命週期中觸發動作。為了使用它們,您需要建立 KubernetesPodOperatorCallback 的子類別,並覆寫您想要使用的回呼方法。然後,您可以使用 callbacks 參數將您的回呼類別傳遞給運算子。

支援以下回呼

  • on_sync_client_creation:在建立同步用戶端後呼叫

  • on_pod_creation:在建立 Pod 後呼叫

  • on_pod_starting:在 Pod 啟動後呼叫

  • on_pod_completion:在 Pod 完成時呼叫

  • on_pod_cleanup:在清理/刪除 Pod 後呼叫

  • on_operator_resuming:從延遲狀態恢復任務時

  • progress_callback:在容器日誌的每一行呼叫

目前,回呼方法不會在非同步模式中呼叫,此支援將在未來新增。

範例:

import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback


class MyCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
        client.create_namespaced_service(
            namespace=pod.metadata.namespace,
            body=k8s.V1Service(
                metadata=k8s.V1ObjectMeta(
                    name=pod.metadata.name,
                    labels=pod.metadata.labels,
                    owner_references=[
                        k8s.V1OwnerReference(
                            api_version=pod.api_version,
                            kind=pod.kind,
                            name=pod.metadata.name,
                            uid=pod.metadata.uid,
                            controller=True,
                            block_owner_deletion=True,
                        )
                    ],
                ),
                spec=k8s.V1ServiceSpec(
                    selector=pod.metadata.labels,
                    ports=[
                        k8s.V1ServicePort(
                            name="http",
                            port=80,
                            target_port=80,
                        )
                    ],
                ),
            ),
        )


k = KubernetesPodOperator(
    task_id="test_callback",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-callback",
    callbacks=MyCallback,
)

傳遞密碼

切勿使用環境變數將密碼(例如連線驗證資訊)傳遞給 Kubernetes Pod 運算子。任何有權限查看和描述 Kubernetes 中 POD 的人都可以看到這些環境變數。相反地,請透過原生 Kubernetes Secrets 傳遞您的密碼,或使用 Airflow 中的連線和變數。對於後者,您需要在映像檔中安裝 apache-airflow 套件,其版本與您執行 Kubernetes Pod 運算子的 Airflow 版本相同)。

SparkKubernetesOperator

SparkKubernetesOperator 允許您在 Kubernetes 叢集上建立和執行 Spark 工作。它基於 spark-on-k8s-operator 專案。

此運算子簡化了介面,並接受不同的參數來設定和執行 Kubernetes 上的 Spark 應用程式。與 KubernetesOperator 類似,我們新增了邏輯,以便在提交工作後等待工作完成、管理錯誤處理、從驅動程式 Pod 擷取日誌,以及刪除 Spark 工作的功能。它也支援開箱即用的 Kubernetes 功能,例如處理 Volume、Config Map、Secret 等。

此運算子如何運作?

運算子透過在 Kubernetes 中產生 SparkApplication 自訂資源定義 (CRD) 來啟動 Spark 任務。此 SparkApplication 任務隨後會使用使用者指定的參數產生驅動程式和必要的執行器 Pod。運算子會持續監控任務的進度,直到任務成功或失敗為止。它會從驅動程式 Pod 擷取日誌,並將其顯示在 Airflow UI 中。

使用範例

為了建立 SparkKubernetesOperator 任務,您必須提供一個基本範本,其中包含 Spark 組態和 Kubernetes 相關的資源組態。此範本可以是 YAML 或 JSON 格式,作為運算子的起點。以下是您可以利用的範例範本

spark_job_template.yaml

spark:
  apiVersion: sparkoperator.k8s.io/v1beta2
  version: v1beta2
  kind: SparkApplication
  apiGroup: sparkoperator.k8s.io
  metadata:
    namespace: ds
  spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    sparkVersion: 3.0.0
    successfulRunHistoryLimit: 1
    restartPolicy:
      type: Never
    imagePullPolicy: Always
    hadoopConf: {}
    imagePullSecrets: []
    dynamicAllocation:
      enabled: false
      initialExecutors: 1
      minExecutors: 1
      maxExecutors: 1
    labels: {}
    driver:
      serviceAccount: default
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
    executor:
      instances: 1
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
kubernetes:
  # example:
  # env_vars:
  # - name: TEST_NAME
  #   value: TEST_VALUE
  env_vars: []

  # example:
  # env_from:
  # - name: test
  #   valueFrom:
  #     secretKeyRef:
  #       name: mongo-secret
  #       key: mongo-password
  env_from: []

  # example:
  # node_selector:
  #   karpenter.sh/provisioner-name: spark
  node_selector: {}

  # example: https://kubernetes.dev.org.tw/docs/concepts/scheduling-eviction/assign-pod-node/
  # affinity:
  #   nodeAffinity:
  #     requiredDuringSchedulingIgnoredDuringExecution:
  #       nodeSelectorTerms:
  #       - matchExpressions:
  #         - key: beta.kubernetes.io/instance-type
  #           operator: In
  #           values:
  #           - r5.xlarge
  affinity:
    nodeAffinity: {}
    podAffinity: {}
    podAntiAffinity: {}

  # example: https://kubernetes.dev.org.tw/docs/concepts/scheduling-eviction/taint-and-toleration/
  # type: list
  # tolerations:
  # - key: "key1"
  #   operator: "Equal"
  #   value: "value1"
  #   effect: "NoSchedule"
  tolerations: []

  # example:
  # config_map_mounts:
  #   snowflake-default: /mnt/tmp
  config_map_mounts: {}

  # example:
  # volume_mounts:
  # - name: config
  #   mountPath: /airflow
  volume_mounts: []

  # https://kubernetes.dev.org.tw/docs/concepts/storage/volumes/
  # example:
  # volumes:
  # - name: config
  #   persistentVolumeClaim:
  #     claimName: airflow
  volumes: []

  # read config map into an env variable
  # example:
  # from_env_config_map:
  # - configmap_1
  # - configmap_2
  from_env_config_map: []

  # load secret into an env variable
  # example:
  # from_env_secret:
  # - secret_1
  # - secret_2
  from_env_secret: []

  in_cluster: true
  conn_id: kubernetes_default
  kube_config_file: null
  cluster_context: null

重要

  • 範本檔案包含兩個主要類別:sparkkubernetes

    • spark:此區段包含任務的 Spark 組態,反映 Spark API 範本的結構。

    • kubernetes:此區段包含任務的 Kubernetes 資源組態,直接對應於 Kubernetes API 文件。每個資源類型都在範本中包含一個範例。

  • 要使用的指定基礎映像檔是 gcr.io/spark-operator/spark-py:v3.1.1

  • 確保 Spark 程式碼已嵌入在映像檔中、使用 persistentVolume 掛載,或可從外部位置存取,例如 S3 儲存桶。

接下來,使用以下程式碼建立任務

SparkKubernetesOperator(
    task_id="spark_task",
    image="gcr.io/spark-operator/spark-py:v3.1.1",  # OR custom image using that
    code_path="local://path/to/spark/code.py",
    application_file="spark_job_template.yaml",  # OR spark_job_template.json
    dag=dag,
)

注意:或者,application_file 也可以是 JSON 檔案。請參閱以下範例

spark_job_template.json

{
  "spark": {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "version": "v1beta2",
    "kind": "SparkApplication",
    "apiGroup": "sparkoperator.k8s.io",
    "metadata": {
      "namespace": "ds"
    },
    "spec": {
      "type": "Python",
      "pythonVersion": "3",
      "mode": "cluster",
      "sparkVersion": "3.0.0",
      "successfulRunHistoryLimit": 1,
      "restartPolicy": {
        "type": "Never"
      },
      "imagePullPolicy": "Always",
      "hadoopConf": {},
      "imagePullSecrets": [],
      "dynamicAllocation": {
        "enabled": false,
        "initialExecutors": 1,
        "minExecutors": 1,
        "maxExecutors": 1
      },
      "labels": {},
      "driver": {
        "serviceAccount": "default",
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      },
      "executor": {
        "instances": 1,
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      }
    }
  },
  "kubernetes": {
    "env_vars": [],
    "env_from": [],
    "node_selector": {},
    "affinity": {
      "nodeAffinity": {},
      "podAffinity": {},
      "podAntiAffinity": {}
    },
    "tolerations": [],
    "config_map_mounts": {},
    "volume_mounts": [
      {
        "name": "config",
        "mountPath": "/airflow"
      }
    ],
    "volumes": [
      {
        "name": "config",
        "persistentVolumeClaim": {
          "claimName": "hsaljoog-airflow"
        }
      }
    ],
    "from_env_config_map": [],
    "from_env_secret": [],
    "in_cluster": true,
    "conn_id": "kubernetes_default",
    "kube_config_file": null,
    "cluster_context": null
  }
}

除了使用 YAML 或 JSON 檔案之外,另一種方法是直接傳遞 template_spec 欄位,而不是 application_file,如果您不想使用檔案進行組態設定。

KubernetesJobOperator

KubernetesJobOperator 允許您在 Kubernetes 叢集上建立和執行 Job。

注意

如果您使用託管 Kubernetes,請考慮使用專門的 KJO 運算子,因為它可以簡化 Kubernetes 授權流程

注意

使用此運算子 **不** 需要 Kubernetes 執行器

此運算子如何運作?

KubernetesJobOperator 使用 Kubernetes API 在 Kubernetes 叢集中啟動 Job。此運算子使用 Kube Python Client 產生 Kubernetes API 請求,以動態啟動此 Job。使用者可以使用 config_file 參數指定 kubeconfig 檔案,否則運算子將預設為 ~/.kube/config。它也允許使用者使用 job_template_file 參數提供範本 YAML 檔案。

tests/system/cncf/kubernetes/example_kubernetes_job.py[原始碼]

k8s_job = KubernetesJobOperator(
    task_id="job-task",
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

KubernetesJobOperator 也支援可延遲模式

tests/system/cncf/kubernetes/example_kubernetes_job.py[原始碼]

k8s_job_def = KubernetesJobOperator(
    task_id="job-task-def",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME + "-def",
    wait_until_job_complete=True,
    deferrable=True,
)

KubernetesPodOperatorKubernetesJobOperator 之間的差異

KubernetesJobOperator 是用於建立 Job 的運算子。Job 會建立一個或多個 Pod,並將繼續重試 Pod 的執行,直到指定數量的 Pod 成功終止。隨著 Pod 成功完成,Job 會追蹤成功的完成次數。當達到指定數量的成功完成次數時,Job 即完成。使用者可以使用組態參數(例如 activeDeadlineSecondsbackoffLimit)限制 Job 重試執行的次數。此運算子不是使用用於建立 Pod 的 template 參數,而是使用 KubernetesPodOperator。這表示使用者可以在 KubernetesJobOperator 中使用 KubernetesPodOperator 中的所有參數。

關於 Job 的更多資訊請參閱此處:Kubernetes Job 文件

KubernetesDeleteJobOperator

KubernetesDeleteJobOperator 允許您刪除 Kubernetes 叢集上的 Job。

tests/system/cncf/kubernetes/example_kubernetes_job.py[原始碼]

delete_job_task = KubernetesDeleteJobOperator(
    task_id="delete_job_task",
    name=k8s_job.output["job_name"],
    namespace=JOB_NAMESPACE,
    wait_for_completion=True,
    delete_on_status="Complete",
    poll_interval=1.0,
)

KubernetesPatchJobOperator

KubernetesPatchJobOperator 允許您更新 Kubernetes 叢集上的 Job。

tests/system/cncf/kubernetes/example_kubernetes_job.py[原始碼]

update_job = KubernetesPatchJobOperator(
    task_id="update-job-task",
    namespace="default",
    name=k8s_job.output["job_name"],
    body={"spec": {"suspend": False}},
)

KubernetesInstallKueueOperator

KubernetesInstallKueueOperator 允許您在 Kubernetes 叢集中安裝 Kueue 組件

tests/system/cncf/kubernetes/example_kubernetes_kueue.py[原始碼]

install_kueue = KubernetesInstallKueueOperator(
    task_id="install_kueue",
    kueue_version="v0.9.1",
)

參考

如需更多資訊,請參閱

KubernetesStartKueueJobOperator

KubernetesStartKueueJobOperator 允許您在 Kubernetes 叢集中啟動 Kueue 工作

tests/system/cncf/kubernetes/example_kubernetes_kueue.py[原始碼]

start_kueue_job = KubernetesStartKueueJobOperator(
    task_id="kueue_job",
    queue_name=QUEUE_NAME,
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
    wait_until_job_complete=True,
)

如需更多資訊,請參閱

此條目是否有幫助?