Google Cloud Workflows 運算子

您可以使用 Workflows 建立無伺服器工作流程,將一系列無伺服器任務以您定義的順序連結在一起。結合 Google Cloud API、無伺服器產品(如 Cloud Functions 和 Cloud Run)以及對外部 API 的呼叫,以建立彈性的無伺服器應用程式。

如需更多關於此服務的資訊,請造訪Workflows 生產環境文件 <產品文件

先決條件任務

若要使用這些運算子,您必須完成幾件事

建立工作流程

若要建立工作流程,請使用 WorkflowsCreateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

工作流程的定義方式應與此範例類似

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

WORKFLOW_CONTENT = """
- getLanguage:
    assign:
        - inputLanguage: "English"
- readWikipedia:
    call: http.get
    args:
        url: https://www.wikipedia.org/
        query:
            action: opensearch
            search: ${inputLanguage}
    result: wikiResult
- returnResult:
    return: ${wikiResult}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

如需更多關於撰寫工作流程的資訊,請查看官方生產環境文件 <產品文件

更新工作流程

若要更新工作流程,請使用 WorkflowsUpdateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

update_workflow = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask=FieldMask(paths=["name", "description"]),
)

取得工作流程

若要取得工作流程,請使用 WorkflowsGetWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

列出工作流程

若要列出工作流程,請使用 WorkflowsListWorkflowsOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

刪除工作流程

若要刪除工作流程,請使用 WorkflowsDeleteWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立執行個體

若要建立執行個體,請使用 WorkflowsCreateExecutionOperator。由於 API 限制,此運算子不具備冪等性。

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

建立運算子不會等待執行完成。若要等待執行結果,請使用 WorkflowExecutionSensor

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

取得執行個體

若要取得執行個體,請使用 WorkflowsGetExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

列出執行個體

若要列出執行個體,請使用 WorkflowsListExecutionsOperator。預設情況下,此運算子只會傳回最近 60 分鐘內的執行個體。

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

取消執行個體

若要取消執行個體,請使用 WorkflowsCancelExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py[原始碼]

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id=cancel_execution_id,
)

這個條目是否有幫助?