Google Cloud Batch 運算子¶
Cloud Batch 是一項全方位託管的批次服務,用於在 Google 的基礎架構上排程、佇列和執行批次作業。
如需更多關於此服務的資訊,請造訪 Google Cloud Batch 文件說明。
先決條件任務¶
若要使用這些運算子,您必須完成幾件事項
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
如 Google Cloud 文件說明中所述,為您的專案啟用計費功能。
如 Cloud Console 文件說明中所述,啟用 API。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'安裝提供詳細資訊,請參閱安裝。
提交作業¶
在 Cloud Batch 中提交作業之前,您需要定義它。如需更多關於 Job 物件欄位的資訊,請造訪 Google Cloud Batch Job 說明。
一個簡單的作業配置範例如下
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
透過此配置,我們可以提交作業: CloudBatchSubmitJobOperator
submit1 = CloudBatchSubmitJobOperator(
task_id="submit-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
或者您可以在可延遲模式中定義相同的運算子: CloudBatchSubmitJobOperator
submit2 = CloudBatchSubmitJobOperator(
task_id="submit-job2",
project_id=PROJECT_ID,
region=REGION,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
請注意,此運算子會等待作業完成執行,且 Job 的字典表示法會推送至 XCom。
列出作業的任務¶
若要列出特定作業的任務,您可以使用
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)
此運算子接受兩個選用參數:「limit」以限制傳回的任務數量,以及「filter」以僅列出符合篩選條件的任務。
列出作業¶
若要列出作業,您可以使用
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=REGION,
limit=10,
filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
dag=dag,
)
此運算子接受兩個選用參數:「limit」以限制傳回的任務數量,以及「filter」以僅列出符合篩選條件的任務。
刪除作業¶
若要刪除作業,您可以使用
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
請注意,此運算子會等待作業被刪除,且已刪除的 Job 的字典表示法會推送至 XCom。