Google Cloud Dataflow 運算子

Dataflow 是一項用於執行各種資料處理模式的託管服務。這些管線是使用 Apache Beam 程式設計模型建立的,該模型允許批次和串流處理。

先決條件任務

要使用這些運算子,您必須執行以下幾項操作

執行資料管線的方式

根據您的環境、來源檔案,有多種方式可以執行 Dataflow 管線

  • 非範本化管線:如果您有 Java 的 *.jar 檔案或 Python 的 *.py 檔案,開發人員可以在 Airflow 工作站上以本機程序執行管線。這也表示必要系統相依性必須安裝在工作站上。對於 Java,工作站必須安裝 JRE Runtime。對於 Python,則為 Python 解譯器。執行階段版本必須與管線版本相容。這是啟動管線最快的方式,但由於系統相依性經常出現問題,可能會導致問題。請參閱:Java SDK 管線Python SDK 管線 以取得更詳細的資訊。開發人員也可以透過 JSON 格式傳遞管線結構來建立管線,然後執行它以建立 Job。請參閱:JSON 格式管線JSON 格式管線 以取得更詳細的資訊。

  • 範本化管線:程式設計人員可以透過準備範本來使管線獨立於環境,然後在 Google 管理的機器上執行該範本。這樣,環境的變更就不會影響您的管線。範本有兩種型別

    • 傳統範本。開發人員執行管線並建立範本。Apache Beam SDK 在 Cloud Storage 中暫存檔案,建立範本檔案 (類似於工作請求),並將範本檔案儲存在 Cloud Storage 中。請參閱:範本化工作

    • 彈性範本。開發人員將管線封裝到 Docker 映像檔中,然後使用 gcloud 命令列工具來建置彈性範本規格檔案並將其儲存在 Cloud Storage 中。請參閱:範本化工作

  • SQL 管線:開發人員可以將管線編寫為 SQL 陳述式,然後在 Dataflow 中執行它。請參閱:Dataflow SQL

最好先使用非範本化管線測試您的管線,然後在生產環境中使用範本執行管線。

如需管線類型之間差異的詳細資訊,請參閱 Google Cloud 文件中的 Dataflow 範本

啟動非範本化管線

JSON 格式管線

可以透過 JSON 格式傳遞管線結構來建立新的管線。請參閱 DataflowCreatePipelineOperator 這將建立一個新的管線,該管線將在 Dataflow Pipelines UI 上可見。

以下是如何透過執行 DataflowCreatePipelineOperator 來建立 Dataflow 管線的範例

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[原始碼]

create_pipeline = DataflowCreatePipelineOperator(
    task_id="create_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": PIPELINE_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

要執行新建立的管線,您可以使用 DataflowRunPipelineOperator

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[原始碼]

run_pipeline = DataflowRunPipelineOperator(
    task_id="run_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

一旦呼叫,DataflowRunPipelineOperator 將傳回執行給定管線所建立的 Google Cloud Dataflow Job

有關 API 用法的更多資訊,請參閱 Google Cloud 文件中的 Data Pipelines API REST 資源

要使用來源檔案 (Java 中的 JAR 或 Python 檔案) 建立新的管線,請使用建立工作運算子。來源檔案可以位於 GCS 或本機檔案系統上。BeamRunJavaPipelineOperatorBeamRunPythonPipelineOperator

Java SDK 管線

對於 Java 管線,必須為 BeamRunJavaPipelineOperator 指定 jar 引數,因為它包含要在 Dataflow 上執行的管線。JAR 可以位於 Airflow 有權下載的 GCS 上,或位於本機檔案系統上 (提供其絕對路徑)。

以下範例說明如何使用儲存在 GCS 上的 jar 在 Java 中建立和執行管線

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[原始碼]

start_java_job = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.IgnoreJob,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
)

以下範例說明如何在 deferrable 模式中使用儲存在 GCS 上的 jar 在 Java 中建立和執行管線

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[原始碼]

start_java_deferrable = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job_deferrable",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
    deferrable=True,
)

以下範例說明如何在 Java 中使用儲存在本機檔案系統上的 jar 建立和執行管線

tests/system/google/cloud/dataflow/example_dataflow_native_java.py[原始碼]

start_java_job_local = BeamRunJavaPipelineOperator(
    task_id="start_java_job_local",
    jar=LOCAL_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)

Python SDK 管線

必須為 BeamRunPythonPipelineOperator 指定 py_file 引數,因為它包含要在 Dataflow 上執行的管線。Python 檔案可以位於 Airflow 有權下載的 GCS 上,或位於本機檔案系統上 (提供其絕對路徑)。

py_interpreter 引數指定執行管線時要使用的 Python 版本,預設為 python3。如果您的 Airflow 執行個體在 Python 2 上執行,請指定 python2 並確保您的 py_file 是 Python 2。為了獲得最佳結果,請使用 Python 3。

如果指定 py_requirements 引數,將建立一個具有指定需求的暫時 Python 虛擬環境,並且管線將在其中執行。

py_system_site_packages 引數指定是否可以從您的 Airflow 執行個體存取虛擬環境中的所有 Python 套件 (如果指定 py_requirements 引數),建議避免使用,除非 Dataflow 工作需要它。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py[原始碼]

start_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)

執行模型

Dataflow 具有多種執行管線的選項。它可以在以下模式下完成:批次非同步 (fire and forget)、批次封鎖 (等待完成) 或串流 (無限期執行)。在 Airflow 中,最佳實務是使用非同步批次管線或串流,並使用感測器來監聽預期的工作狀態。

預設情況下,BeamRunJavaPipelineOperatorBeamRunPythonPipelineOperatorDataflowTemplatedJobStartOperatorDataflowStartFlexTemplateOperator 具有設定為 None 的引數 wait_until_finished,這會導致不同的行為,具體取決於管線的類型

  • 對於串流管線,等待工作開始,

  • 對於批次管線,等待工作完成。

如果 wait_until_finished 設定為 True,運算子將始終等待管線執行結束。如果設定為 False,則僅提交工作。

請參閱:在 Cloud Dataflow 服務上執行時設定 PipelineOptions

非同步執行

Dataflow 批次工作預設為非同步;但是,這取決於應用程式碼 (包含在 JAR 或 Python 檔案中) 及其編寫方式。為了使 Dataflow 工作非同步執行,請確保管線物件未被等待 (未在您的應用程式碼中的 PipelineResult 上呼叫 waitUntilFinishwait_until_finish)。

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[原始碼]

start_python_job_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_async",
    runner=BeamRunnerType.DataflowRunner,
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={
        "job_name": "start_python_job_async",
        "location": LOCATION,
        "wait_until_finished": False,
    },
)

封鎖執行

為了使 Dataflow 工作執行並等待完成,請確保在應用程式碼中等待管線物件。對於 Java SDK,可以透過在從 pipeline.run() 傳回的 PipelineResult 上呼叫 waitUntilFinish 來完成,對於 Python SDK,可以透過在從 pipeline.run() 傳回的 PipelineResult 上呼叫 wait_until_finish 來完成。

應避免使用封鎖工作,因為在 Airflow 上執行時會發生背景程序。此程序會持續執行以等待 Dataflow 工作完成,並因此增加 Airflow 的資源消耗。

串流執行

要執行串流 Dataflow 工作,請確保設定串流選項 (對於 Python) 或從無界限的資料來源 (例如 Pub/Sub) 在您的管線中讀取 (對於 Java)。

tests/system/google/cloud/dataflow/example_dataflow_streaming_python.py[原始碼]

start_streaming_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_streaming_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "temp_location": GCS_TMP,
        "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
        "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
        "streaming": True,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)

將引數 drain_pipeline 設定為 True 允許透過排空串流工作而不是在終止工作執行個體期間取消串流工作來停止串流工作。

請參閱 停止執行中的管線

範本化工作

範本提供在 Cloud Storage 上暫存管線並從那裡執行的能力。這在開發工作流程中提供了彈性,因為它將管線的開發與暫存和執行步驟分開。Dataflow 有兩種範本類型:傳統範本和彈性範本。如需更多資訊,請參閱 Dataflow 範本的官方文件

以下範例說明如何使用 DataflowTemplatedJobStartOperator 使用傳統範本執行 Dataflow 工作

tests/system/google/cloud/dataflow/example_dataflow_template.py[原始碼]

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start_template_job",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    wait_until_finished=True,
)

對於此動作,您也可以在 deferrable 模式下使用運算子

tests/system/google/cloud/dataflow/example_dataflow_template.py[原始碼]

start_template_job_deferrable = DataflowTemplatedJobStartOperator(
    task_id="start_template_job_deferrable",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    deferrable=True,
)

請參閱 可與此運算子搭配使用的 Google 提供範本清單

以下範例說明如何使用 DataflowStartFlexTemplateOperator 使用彈性範本執行 Dataflow 工作

tests/system/google/cloud/dataflow/example_dataflow_template.py[原始碼]

start_flex_template_job = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    wait_until_finished=True,
)

對於此動作,您也可以在 deferrable 模式下使用運算子

tests/system/google/cloud/dataflow/example_dataflow_template.py[原始碼]

start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job_deferrable",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    deferrable=True,
)

Dataflow SQL

Dataflow SQL 支援 ZetaSQL 查詢語法的變體,並包含用於執行 Dataflow 串流工作的其他串流擴充功能。

以下範例說明如何使用 DataflowStartSqlJobOperator 執行 Dataflow SQL 工作

tests/system/google/cloud/dataflow/example_dataflow_sql.py[原始碼]

start_sql = DataflowStartSqlJobOperator(
    task_id="start_sql_query",
    job_name=DATAFLOW_SQL_JOB_NAME,
    query=f"""
        SELECT
            emp_name as employee,
            salary as employee_salary
        FROM
            bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
        WHERE salary >= 1000;
    """,
    options={
        "bigquery-project": PROJECT_ID,
        "bigquery-dataset": BQ_SQL_DATASET,
        "bigquery-table": BQ_SQL_TABLE_OUTPUT,
    },
    location=LOCATION,
    do_xcom_push=True,
)

警告

此運算子需要 gcloud 命令 (Google Cloud SDK) 必須安裝在 Airflow 工作站上 <https://cloud.google.com/sdk/docs/install>`__

請參閱 Dataflow SQL 參考資料

Dataflow YAML

Beam YAML 是一種無程式碼 SDK,用於透過使用 YAML 檔案來設定 Apache Beam 管線。您可以使用 Beam YAML 來撰寫和執行 Beam 管線,而無需編寫任何程式碼。此 API 可用於定義串流和批次管線。

以下範例說明如何使用 DataflowStartYamlJobOperator 執行 Dataflow YAML 工作

tests/system/google/cloud/dataflow/example_dataflow_yaml.py[原始碼]

start_dataflow_yaml_job = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=False,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES,
)

此運算子可以透過傳遞 deferrable=True 作為參數,在 deferrable 模式下執行。

tests/system/google/cloud/dataflow/example_dataflow_yaml.py[原始碼]

start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job_def",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=True,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES_DEF,
    expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)

警告

此運算子需要 gcloud 命令 (Google Cloud SDK) 必須安裝在 Airflow 工作站上 <https://cloud.google.com/sdk/docs/install>`__

請參閱 Dataflow YAML 參考資料

停止管線

要停止一個或多個 Dataflow 管線,您可以使用 DataflowStopJobOperator。串流管線預設會排空,將 drain_pipeline 設定為 False 將會取消它們。提供 job_id 以停止特定工作,或提供 job_name_prefix 以停止所有具有提供名稱前綴的工作。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py[原始碼]

stop_dataflow_job = DataflowStopJobOperator(
    task_id="stop_dataflow_job",
    location=LOCATION,
    job_name_prefix="start-python-pipeline",
)

請參閱:停止執行中的管線

刪除管線

要刪除 Dataflow 管線,您可以使用 DataflowDeletePipelineOperator。以下是如何使用此運算子的範例

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py[原始碼]

delete_pipeline = DataflowDeletePipelineOperator(
    task_id="delete_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

感測器

當工作以非同步方式觸發時,可以使用感測器來執行特定工作屬性的檢查。

DataflowJobStatusSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[原始碼]

wait_for_python_job_async_done = DataflowJobStatusSensor(
    task_id="wait_for_python_job_async_done",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    location=LOCATION,
)

此運算子可以透過傳遞 deferrable=True 作為參數,在 deferrable 模式下執行。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[原始碼]

wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
    task_id="wait_for_beam_python_pipeline_job_status_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
    location=LOCATION,
    deferrable=True,
)

DataflowJobMetricsSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[原始碼]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
    task_id="wait_for_python_job_async_metric",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
)

此運算子可以透過傳遞 deferrable=True 作為參數,在 deferrable 模式下執行。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[原始碼]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
    task_id="wait_for_beam_python_pipeline_job_metric_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobMessagesSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[原始碼]

def check_message(messages: list[dict]) -> bool:
    """Check message"""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_python_job_async_message = DataflowJobMessagesSensor(
    task_id="wait_for_python_job_async_message",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_message,
    fail_on_terminal_state=False,
)

此運算子可以透過傳遞 deferrable=True 作為參數,在 deferrable 模式下執行。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[原始碼]

def check_job_message(messages: list[dict]) -> bool:
    """Check job message."""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
    task_id="wait_for_beam_python_pipeline_job_message_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_job_message,
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobAutoScalingEventsSensor.

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py[原始碼]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event"""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_python_job_async_autoscaling_event",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
)

此運算子可以透過傳遞 deferrable=True 作為參數,在 deferrable 模式下執行。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[原始碼]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event."""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
    deferrable=True,
)

這個條目是否有幫助?