Google Cloud Composer 運算子

Cloud Composer 是一項完全託管的工作流程協調服務,讓您能夠建立、排程、監控和管理跨雲端和內部部署資料中心的工作流程。

Cloud Composer 建構於熱門的 Apache Airflow 開放原始碼專案之上,並使用 Python 程式語言運作。

透過使用 Cloud Composer 而非 Apache Airflow 的本機執行個體,您可以享受 Airflow 的所有優點,且無需安裝或管理額外負荷。Cloud Composer 可協助您快速建立 Airflow 環境,並使用 Airflow 原生工具,例如功能強大的 Airflow Web 介面和命令列工具,讓您可以專注於工作流程,而非基礎架構。

如需有關此服務的詳細資訊,請造訪Cloud Composer 生產環境文件 <產品文件

建立環境

在建立 Cloud Composer 環境之前,您需要先定義它。如需有關建立環境時可傳遞的可用欄位的詳細資訊,請造訪Cloud Composer 建立環境 API。

一個簡單的環境設定範例如下

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2-airflow-2"},
    }
}

透過此設定,我們可以建立環境: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

或者,您可以使用可延遲模式定義相同的運算子: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

取得環境

若要取得環境,您可以使用

CloudComposerGetEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

列出環境

若要取得環境,您可以使用

CloudComposerListEnvironmentsOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

更新環境

您可以透過提供環境設定和 updateMask 來更新環境。在 updateMask 引數中,您指定要更新的欄位路徑 (相對於 Environment)。如需有關 updateMask 和其他參數的詳細資訊,請參閱Cloud Composer 更新環境 API。

新的服務設定和 updateMask 範例

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

若要更新服務,您可以使用: CloudComposerUpdateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

或者,您可以使用可延遲模式定義相同的運算子: CloudComposerCreateEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

刪除服務

若要刪除服務,您可以使用

CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

或者,您可以使用可延遲模式定義相同的運算子: CloudComposerDeleteEnvironmentOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

Composer 映像檔清單

您也可以列出所有支援的 Cloud Composer 映像檔

CloudComposerListImageVersionsOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

執行 Airflow CLI 命令

您可以在您的環境中執行 Airflow CLI 命令,請使用: CloudComposerRunAirflowCLICommandOperator

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

或者,您可以使用可延遲模式定義相同的運算子

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

檢查 DAG 執行是否已完成

您可以使用感測器來檢查 DAG 執行是否已在您的環境中完成,請使用: CloudComposerDAGRunSensor

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
)

或者,您可以使用可延遲模式定義相同的感測器

tests/system/google/cloud/composer/example_cloud_composer.py[原始碼]

defer_dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="defer_dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
    deferrable=True,
)

這個條目是否有幫助?