Google Cloud Dataflow 運算子¶
Dataflow 是一項用於執行各種資料處理模式的託管服務。這些管線是使用 Apache Beam 程式設計模型建立的,該模型允許批次和串流處理。
先決條件任務¶
要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'如需詳細資訊,請參閱 安裝。
執行資料管線的方式¶
根據您的環境、來源檔案,有多種方式可以執行 Dataflow 管線
非範本化管線:如果您有 Java 的
*.jar
檔案或 Python 的*.py
檔案,開發人員可以在 Airflow 工作站上以本機程序執行管線。這也表示必要系統相依性必須安裝在工作站上。對於 Java,工作站必須安裝 JRE Runtime。對於 Python,則為 Python 解譯器。執行階段版本必須與管線版本相容。這是啟動管線最快的方式,但由於系統相依性經常出現問題,可能會導致問題。請參閱:Java SDK 管線、Python SDK 管線 以取得更詳細的資訊。開發人員也可以透過 JSON 格式傳遞管線結構來建立管線,然後執行它以建立 Job。請參閱:JSON 格式管線 和 JSON 格式管線 以取得更詳細的資訊。範本化管線:程式設計人員可以透過準備範本來使管線獨立於環境,然後在 Google 管理的機器上執行該範本。這樣,環境的變更就不會影響您的管線。範本有兩種型別
SQL 管線:開發人員可以將管線編寫為 SQL 陳述式,然後在 Dataflow 中執行它。請參閱:Dataflow SQL
最好先使用非範本化管線測試您的管線,然後在生產環境中使用範本執行管線。
如需管線類型之間差異的詳細資訊,請參閱 Google Cloud 文件中的 Dataflow 範本。
啟動非範本化管線¶
JSON 格式管線¶
可以透過 JSON 格式傳遞管線結構來建立新的管線。請參閱 DataflowCreatePipelineOperator
這將建立一個新的管線,該管線將在 Dataflow Pipelines UI 上可見。
以下是如何透過執行 DataflowCreatePipelineOperator 來建立 Dataflow 管線的範例
create_pipeline = DataflowCreatePipelineOperator(
task_id="create_pipeline",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body={
"name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
"type": PIPELINE_TYPE,
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": GCS_PATH,
"jobName": PIPELINE_JOB_NAME,
"environment": {"tempLocation": TEMP_LOCATION},
"parameters": {
"inputFile": INPUT_FILE,
"output": OUTPUT,
},
},
"projectId": GCP_PROJECT_ID,
"location": GCP_LOCATION,
}
},
},
)
要執行新建立的管線,您可以使用 DataflowRunPipelineOperator
run_pipeline = DataflowRunPipelineOperator(
task_id="run_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
)
一旦呼叫,DataflowRunPipelineOperator 將傳回執行給定管線所建立的 Google Cloud Dataflow Job。
有關 API 用法的更多資訊,請參閱 Google Cloud 文件中的 Data Pipelines API REST 資源。
要使用來源檔案 (Java 中的 JAR 或 Python 檔案) 建立新的管線,請使用建立工作運算子。來源檔案可以位於 GCS 或本機檔案系統上。BeamRunJavaPipelineOperator
或 BeamRunPythonPipelineOperator
Java SDK 管線¶
對於 Java 管線,必須為 BeamRunJavaPipelineOperator
指定 jar
引數,因為它包含要在 Dataflow 上執行的管線。JAR 可以位於 Airflow 有權下載的 GCS 上,或位於本機檔案系統上 (提供其絕對路徑)。
以下範例說明如何使用儲存在 GCS 上的 jar 在 Java 中建立和執行管線
start_java_job = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
)
以下範例說明如何在 deferrable 模式中使用儲存在 GCS 上的 jar 在 Java 中建立和執行管線
start_java_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job_deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
deferrable=True,
)
以下範例說明如何在 Java 中使用儲存在本機檔案系統上的 jar 建立和執行管線
start_java_job_local = BeamRunJavaPipelineOperator(
task_id="start_java_job_local",
jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
)
Python SDK 管線¶
必須為 BeamRunPythonPipelineOperator
指定 py_file
引數,因為它包含要在 Dataflow 上執行的管線。Python 檔案可以位於 Airflow 有權下載的 GCS 上,或位於本機檔案系統上 (提供其絕對路徑)。
py_interpreter
引數指定執行管線時要使用的 Python 版本,預設為 python3
。如果您的 Airflow 執行個體在 Python 2 上執行,請指定 python2
並確保您的 py_file
是 Python 2。為了獲得最佳結果,請使用 Python 3。
如果指定 py_requirements
引數,將建立一個具有指定需求的暫時 Python 虛擬環境,並且管線將在其中執行。
py_system_site_packages
引數指定是否可以從您的 Airflow 執行個體存取虛擬環境中的所有 Python 套件 (如果指定 py_requirements
引數),建議避免使用,除非 Dataflow 工作需要它。
start_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
執行模型¶
Dataflow 具有多種執行管線的選項。它可以在以下模式下完成:批次非同步 (fire and forget)、批次封鎖 (等待完成) 或串流 (無限期執行)。在 Airflow 中,最佳實務是使用非同步批次管線或串流,並使用感測器來監聽預期的工作狀態。
預設情況下,BeamRunJavaPipelineOperator
、BeamRunPythonPipelineOperator
、DataflowTemplatedJobStartOperator
和 DataflowStartFlexTemplateOperator
具有設定為 None
的引數 wait_until_finished
,這會導致不同的行為,具體取決於管線的類型
對於串流管線,等待工作開始,
對於批次管線,等待工作完成。
如果 wait_until_finished
設定為 True
,運算子將始終等待管線執行結束。如果設定為 False
,則僅提交工作。
請參閱:在 Cloud Dataflow 服務上執行時設定 PipelineOptions
非同步執行¶
Dataflow 批次工作預設為非同步;但是,這取決於應用程式碼 (包含在 JAR 或 Python 檔案中) 及其編寫方式。為了使 Dataflow 工作非同步執行,請確保管線物件未被等待 (未在您的應用程式碼中的 PipelineResult
上呼叫 waitUntilFinish
或 wait_until_finish
)。
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_async",
runner=BeamRunnerType.DataflowRunner,
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={
"job_name": "start_python_job_async",
"location": LOCATION,
"wait_until_finished": False,
},
)
封鎖執行¶
為了使 Dataflow 工作執行並等待完成,請確保在應用程式碼中等待管線物件。對於 Java SDK,可以透過在從 pipeline.run()
傳回的 PipelineResult
上呼叫 waitUntilFinish
來完成,對於 Python SDK,可以透過在從 pipeline.run()
傳回的 PipelineResult
上呼叫 wait_until_finish
來完成。
應避免使用封鎖工作,因為在 Airflow 上執行時會發生背景程序。此程序會持續執行以等待 Dataflow 工作完成,並因此增加 Airflow 的資源消耗。
串流執行¶
要執行串流 Dataflow 工作,請確保設定串流選項 (對於 Python) 或從無界限的資料來源 (例如 Pub/Sub) 在您的管線中讀取 (對於 Java)。
start_streaming_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_streaming_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"temp_location": GCS_TMP,
"input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
"output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
"streaming": True,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)
將引數 drain_pipeline
設定為 True
允許透過排空串流工作而不是在終止工作執行個體期間取消串流工作來停止串流工作。
請參閱 停止執行中的管線。
範本化工作¶
範本提供在 Cloud Storage 上暫存管線並從那裡執行的能力。這在開發工作流程中提供了彈性,因為它將管線的開發與暫存和執行步驟分開。Dataflow 有兩種範本類型:傳統範本和彈性範本。如需更多資訊,請參閱 Dataflow 範本的官方文件。
以下範例說明如何使用 DataflowTemplatedJobStartOperator
使用傳統範本執行 Dataflow 工作
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
wait_until_finished=True,
)
對於此動作,您也可以在 deferrable 模式下使用運算子
start_template_job_deferrable = DataflowTemplatedJobStartOperator(
task_id="start_template_job_deferrable",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
deferrable=True,
)
請參閱 可與此運算子搭配使用的 Google 提供範本清單。
以下範例說明如何使用 DataflowStartFlexTemplateOperator
使用彈性範本執行 Dataflow 工作
start_flex_template_job = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
wait_until_finished=True,
)
對於此動作,您也可以在 deferrable 模式下使用運算子
start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job_deferrable",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
deferrable=True,
)
Dataflow SQL¶
Dataflow SQL 支援 ZetaSQL 查詢語法的變體,並包含用於執行 Dataflow 串流工作的其他串流擴充功能。
以下範例說明如何使用 DataflowStartSqlJobOperator
執行 Dataflow SQL 工作
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
SELECT
emp_name as employee,
salary as employee_salary
FROM
bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
WHERE salary >= 1000;
""",
options={
"bigquery-project": PROJECT_ID,
"bigquery-dataset": BQ_SQL_DATASET,
"bigquery-table": BQ_SQL_TABLE_OUTPUT,
},
location=LOCATION,
do_xcom_push=True,
)
警告
此運算子需要 gcloud
命令 (Google Cloud SDK) 必須安裝在 Airflow 工作站上 <https://cloud.google.com/sdk/docs/install>`__
請參閱 Dataflow SQL 參考資料。
Dataflow YAML¶
Beam YAML 是一種無程式碼 SDK,用於透過使用 YAML 檔案來設定 Apache Beam 管線。您可以使用 Beam YAML 來撰寫和執行 Beam 管線,而無需編寫任何程式碼。此 API 可用於定義串流和批次管線。
以下範例說明如何使用 DataflowStartYamlJobOperator
執行 Dataflow YAML 工作
start_dataflow_yaml_job = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=False,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES,
)
此運算子可以透過傳遞 deferrable=True
作為參數,在 deferrable 模式下執行。
start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job_def",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=True,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES_DEF,
expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)
警告
此運算子需要 gcloud
命令 (Google Cloud SDK) 必須安裝在 Airflow 工作站上 <https://cloud.google.com/sdk/docs/install>`__
請參閱 Dataflow YAML 參考資料。
停止管線¶
要停止一個或多個 Dataflow 管線,您可以使用 DataflowStopJobOperator
。串流管線預設會排空,將 drain_pipeline
設定為 False
將會取消它們。提供 job_id
以停止特定工作,或提供 job_name_prefix
以停止所有具有提供名稱前綴的工作。
stop_dataflow_job = DataflowStopJobOperator(
task_id="stop_dataflow_job",
location=LOCATION,
job_name_prefix="start-python-pipeline",
)
請參閱:停止執行中的管線。
刪除管線¶
要刪除 Dataflow 管線,您可以使用 DataflowDeletePipelineOperator
。以下是如何使用此運算子的範例
delete_pipeline = DataflowDeletePipelineOperator(
task_id="delete_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
感測器¶
當工作以非同步方式觸發時,可以使用感測器來執行特定工作屬性的檢查。
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_python_job_async_done",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
此運算子可以透過傳遞 deferrable=True
作為參數,在 deferrable 模式下執行。
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
task_id="wait_for_beam_python_pipeline_job_status_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
location=LOCATION,
deferrable=True,
)
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait_for_python_job_async_metric",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
此運算子可以透過傳遞 deferrable=True
作為參數,在 deferrable 模式下執行。
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
task_id="wait_for_beam_python_pipeline_job_metric_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
deferrable=True,
)
def check_message(messages: list[dict]) -> bool:
"""Check message"""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_python_job_async_message",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
)
此運算子可以透過傳遞 deferrable=True
作為參數,在 deferrable 模式下執行。
def check_job_message(messages: list[dict]) -> bool:
"""Check job message."""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
task_id="wait_for_beam_python_pipeline_job_message_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_job_message,
fail_on_terminal_state=False,
deferrable=True,
)
DataflowJobAutoScalingEventsSensor
.
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_python_job_async_autoscaling_event",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
此運算子可以透過傳遞 deferrable=True
作為參數,在 deferrable 模式下執行。
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event."""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
deferrable=True,
)