Google Cloud Tasks¶
Cloud Tasks 是一項全受管服務,可讓您管理大量分散式任務的執行、調度和傳遞。使用 Cloud Tasks,您可以在使用者或服務對服務請求之外非同步執行工作。
如需有關此服務的更多資訊,請造訪Cloud Tasks 產品文件
先決條件任務¶
若要使用這些運算子,您必須執行一些操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
如 Google Cloud 文件所述,為您的專案啟用計費功能。
如 Cloud Console 文件所述,啟用 API。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱安裝。
佇列操作¶
建立佇列¶
若要建立新的佇列,請使用 CloudTasksQueueCreateOperator
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
刪除佇列¶
若要刪除佇列,請使用 CloudTasksQueueDeleteOperator
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
恢復佇列¶
若要恢復佇列,請使用 CloudTasksQueueResumeOperator
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
暫停佇列¶
若要暫停佇列,請使用 CloudTasksQueuePauseOperator
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
清除佇列¶
若要清除佇列,請使用 CloudTasksQueuePurgeOperator
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
取得佇列¶
若要取得佇列,請使用 CloudTasksQueueGetOperator
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
更新佇列¶
若要更新佇列,請使用 CloudTasksQueueUpdateOperator
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
列出佇列¶
若要列出所有佇列,請使用 CloudTasksQueuesListOperator
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
任務操作¶
建立任務¶
若要在特定佇列中建立新任務,請使用 CloudTasksTaskCreateOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
取得任務¶
若要取得特定佇列中的任務,請使用 CloudTasksTaskGetOperator
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
執行任務¶
若要執行特定佇列中的任務,請使用 CloudTasksTaskRunOperator
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
列出任務¶
若要列出特定佇列中的所有任務,請使用 CloudTasksTasksListOperator
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
刪除任務¶
若要從特定佇列中刪除任務,請使用 CloudTasksTaskDeleteOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)