Google Cloud Tasks

Cloud Tasks 是一項全受管服務,可讓您管理大量分散式任務的執行、調度和傳遞。使用 Cloud Tasks,您可以在使用者或服務對服務請求之外非同步執行工作。

如需有關此服務的更多資訊,請造訪Cloud Tasks 產品文件

先決條件任務

若要使用這些運算子,您必須執行一些操作

佇列操作

建立佇列

若要建立新的佇列,請使用 CloudTasksQueueCreateOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

create_queue = CloudTasksQueueCreateOperator(
    location=LOCATION,
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_queue",
)

刪除佇列

若要刪除佇列,請使用 CloudTasksQueueDeleteOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

delete_queue = CloudTasksQueueDeleteOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="delete_queue",
)

恢復佇列

若要恢復佇列,請使用 CloudTasksQueueResumeOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

resume_queue = CloudTasksQueueResumeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="resume_queue",
)

暫停佇列

若要暫停佇列,請使用 CloudTasksQueuePauseOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

pause_queue = CloudTasksQueuePauseOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="pause_queue",
)

清除佇列

若要清除佇列,請使用 CloudTasksQueuePurgeOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

purge_queue = CloudTasksQueuePurgeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="purge_queue",
)

取得佇列

若要取得佇列,請使用 CloudTasksQueueGetOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

get_queue = CloudTasksQueueGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="get_queue",
)

get_queue_result = BashOperator(
    task_id="get_queue_result",
    bash_command=f"echo {get_queue.output}",
)

更新佇列

若要更新佇列,請使用 CloudTasksQueueUpdateOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

update_queue = CloudTasksQueueUpdateOperator(
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
    task_id="update_queue",
)

列出佇列

若要列出所有佇列,請使用 CloudTasksQueuesListOperator

tests/system/google/cloud/tasks/example_queue.py[原始碼]

list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")

任務操作

建立任務

若要在特定佇列中建立新任務,請使用 CloudTasksTaskCreateOperator

tests/system/google/cloud/tasks/example_tasks.py[原始碼]

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

取得任務

若要取得特定佇列中的任務,請使用 CloudTasksTaskGetOperator

tests/system/google/cloud/tasks/example_tasks.py[原始碼]

tasks_get = CloudTasksTaskGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="tasks_get",
)

執行任務

若要執行特定佇列中的任務,請使用 CloudTasksTaskRunOperator

tests/system/google/cloud/tasks/example_tasks.py[原始碼]

run_task = CloudTasksTaskRunOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    task_id="run_task",
)

列出任務

若要列出特定佇列中的所有任務,請使用 CloudTasksTasksListOperator

tests/system/google/cloud/tasks/example_tasks.py[原始碼]

list_tasks = CloudTasksTasksListOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="list_tasks",
)

刪除任務

若要從特定佇列中刪除任務,請使用 CloudTasksTaskDeleteOperator

tests/system/google/cloud/tasks/example_tasks.py[原始碼]

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

參考資料

如需更多資訊,請參閱

此條目是否有幫助?