Google Cloud Workflows 運算子¶
您可以使用 Workflows 建立無伺服器工作流程,將一系列無伺服器任務以您定義的順序連結在一起。結合 Google Cloud API、無伺服器產品(如 Cloud Functions 和 Cloud Run)以及對外部 API 的呼叫,以建立彈性的無伺服器應用程式。
如需更多關於此服務的資訊,請造訪Workflows 生產環境文件 <產品文件。
先決條件任務¶
若要使用這些運算子,您必須完成幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 函式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
建立工作流程¶
若要建立工作流程,請使用 WorkflowsCreateWorkflowOperator
。
create_workflow = WorkflowsCreateWorkflowOperator(
task_id="create_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow=WORKFLOW,
workflow_id=WORKFLOW_ID,
)
工作流程的定義方式應與此範例類似
WORKFLOW_CONTENT = """
- getLanguage:
assign:
- inputLanguage: "English"
- readWikipedia:
call: http.get
args:
url: https://www.wikipedia.org/
query:
action: opensearch
search: ${inputLanguage}
result: wikiResult
- returnResult:
return: ${wikiResult}
"""
WORKFLOW = {
"description": "Test workflow",
"labels": {"airflow-version": "dev"},
"source_contents": WORKFLOW_CONTENT,
}
如需更多關於撰寫工作流程的資訊,請查看官方生產環境文件 <產品文件。
更新工作流程¶
若要更新工作流程,請使用 WorkflowsUpdateWorkflowOperator
。
update_workflow = WorkflowsUpdateWorkflowOperator(
task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
update_mask=FieldMask(paths=["name", "description"]),
)
取得工作流程¶
若要取得工作流程,請使用 WorkflowsGetWorkflowOperator
。
get_workflow = WorkflowsGetWorkflowOperator(
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
列出工作流程¶
若要列出工作流程,請使用 WorkflowsListWorkflowsOperator
。
list_workflows = WorkflowsListWorkflowsOperator(
task_id="list_workflows",
location=LOCATION,
project_id=PROJECT_ID,
)
刪除工作流程¶
若要刪除工作流程,請使用 WorkflowsDeleteWorkflowOperator
。
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立執行個體¶
若要建立執行個體,請使用 WorkflowsCreateExecutionOperator
。由於 API 限制,此運算子不具備冪等性。
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location=LOCATION,
project_id=PROJECT_ID,
execution=EXECUTION,
workflow_id=WORKFLOW_ID,
)
建立運算子不會等待執行完成。若要等待執行結果,請使用 WorkflowExecutionSensor
。
wait_for_execution = WorkflowExecutionSensor(
task_id="wait_for_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
取得執行個體¶
若要取得執行個體,請使用 WorkflowsGetExecutionOperator
。
get_execution = WorkflowsGetExecutionOperator(
task_id="get_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
列出執行個體¶
若要列出執行個體,請使用 WorkflowsListExecutionsOperator
。預設情況下,此運算子只會傳回最近 60 分鐘內的執行個體。
list_executions = WorkflowsListExecutionsOperator(
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
取消執行個體¶
若要取消執行個體,請使用 WorkflowsCancelExecutionOperator
。
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
execution_id=cancel_execution_id,
)