Google Cloud Batch 運算子

Cloud Batch 是一項全方位託管的批次服務,用於在 Google 的基礎架構上排程、佇列和執行批次作業。

如需更多關於此服務的資訊,請造訪 Google Cloud Batch 文件說明

先決條件任務

若要使用這些運算子,您必須完成幾件事項

提交作業

在 Cloud Batch 中提交作業之前,您需要定義它。如需更多關於 Job 物件欄位的資訊,請造訪 Google Cloud Batch Job 說明

一個簡單的作業配置範例如下

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

def _create_job():
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = "gcr.io/google-containers/busybox"
    runnable.container.entrypoint = "/bin/sh"
    runnable.container.commands = [
        "-c",
        "echo Hello world! This is task ${BATCH_TASK_INDEX}.\
          This job has a total of ${BATCH_TASK_COUNT} tasks.",
    ]

    task = batch_v1.TaskSpec()
    task.runnables = [runnable]

    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000
    resources.memory_mib = 16
    task.compute_resource = resources
    task.max_retry_count = 2

    group = batch_v1.TaskGroup()
    group.task_count = 2
    group.task_spec = task
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}

    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    return job


透過此配置,我們可以提交作業: CloudBatchSubmitJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

submit1 = CloudBatchSubmitJobOperator(
    task_id="submit-job1",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job1_name,
    job=_create_job(),
    dag=dag,
    deferrable=False,
)

或者您可以在可延遲模式中定義相同的運算子: CloudBatchSubmitJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

submit2 = CloudBatchSubmitJobOperator(
    task_id="submit-job2",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job2_name,
    job=batch_v1.Job.to_dict(_create_job()),
    dag=dag,
    deferrable=True,
)

請注意,此運算子會等待作業完成執行,且 Job 的字典表示法會推送至 XCom。

列出作業的任務

若要列出特定作業的任務,您可以使用

CloudBatchListTasksOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

list_tasks = CloudBatchListTasksOperator(
    task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)

此運算子接受兩個選用參數:「limit」以限制傳回的任務數量,以及「filter」以僅列出符合篩選條件的任務。

列出作業

若要列出作業,您可以使用

CloudBatchListJobsOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

list_jobs = CloudBatchListJobsOperator(
    task_id=list_jobs_task_name,
    project_id=PROJECT_ID,
    region=REGION,
    limit=10,
    filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
    dag=dag,
)

此運算子接受兩個選用參數:「limit」以限制傳回的任務數量,以及「filter」以僅列出符合篩選條件的任務。

刪除作業

若要刪除作業,您可以使用

CloudBatchDeleteJobOperator

tests/system/google/cloud/cloud_batch/example_cloud_batch.py[原始碼]

delete_job1 = CloudBatchDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=REGION,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

請注意,此運算子會等待作業被刪除,且已刪除的 Job 的字典表示法會推送至 XCom。

這篇文章對您有幫助嗎?