airflow.providers.google.cloud.hooks.dataflow
¶
此模組包含 Google Dataflow Hook。
模組內容¶
類別¶
具有 Dataflow 工作狀態的輔助類別。 |
|
具有 Dataflow 工作類型的輔助類別。 |
|
Google Dataflow 的 Hook。 |
|
Dataflow 服務的非同步 hook 類別。 |
函數¶
建立觸發指定函數的回呼函數。 |
屬性¶
- airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[原始碼]¶
建立觸發指定函數的回呼函數。
傳回的回呼函數旨在用作
BeamCommandRunner
中的process_line_callback
。- 參數
on_new_job_id_callback (Callable[[str], None] | None) – 當工作 ID 已知時呼叫的回呼函數
- class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[原始碼]¶
具有 Dataflow 工作狀態的輔助類別。
參考文獻: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
- class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[原始碼]¶
基底類別:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
Google Dataflow 的 Hook。
hook 中所有使用 project_id 的方法都必須使用關鍵字引數而不是位置引數來呼叫。
- start_java_dataflow(job_name, variables, jar, project_id, job_class=None, append_job_name=True, multiple_jobs=False, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]¶
啟動 Dataflow java 工作。
- 參數
job_name (str) – 工作的名稱。
variables (dict) – 傳遞給工作的變數。
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
jar (str) – 工作的 jar 名稱
job_class (str | None) – 工作的 java 類別名稱。
append_job_name (bool) – 如果必須將唯一後綴附加到工作名稱,則為 True。
multiple_jobs (bool) – 如果要檢查 dataflow 中的多個工作,則為 True
on_new_job_id_callback (Callable[[str], None] | None) – 當工作 ID 已知時呼叫的回呼函數。
location (str) – 工作位置。
- start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]¶
使用傳統範本啟動 Dataflow 工作,並等待其完成。
- 參數
job_name (str) – 工作的名稱。
variables (dict) –
工作執行階段環境選項的對應。如果傳遞 environment 引數,則會更新該引數。
另請參閱
如需可能組態的詳細資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 範本的參數
dataflow_template (str) – 範本的 GCS 路徑。
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
append_job_name (bool) – 如果必須將唯一後綴附加到工作名稱,則為 True。
on_new_job_id_callback (Callable[[str], None] | None) – (已棄用) 當工作已知時呼叫的回呼函數。
on_new_job_callback (Callable[[dict], None] | None) – 當工作已知時呼叫的回呼函數。
location (str) –
工作位置。
另請參閱
如需可能組態的詳細資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]¶
使用傳統範本啟動 Dataflow 工作,並在不等待其完成的情況下結束。
- 參數
job_name (str) – 工作的名稱。
variables (dict) –
工作執行階段環境選項的對應。如果傳遞 environment 引數,則會更新該引數。
另請參閱
如需可能組態的詳細資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 範本的參數
dataflow_template (str) – 範本的 GCS 路徑。
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
append_job_name (bool) – 如果必須將唯一後綴附加到工作名稱,則為 True。
location (str) –
工作位置。
另請參閱
如需可能組態的詳細資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- 傳回
Dataflow 工作回應
- 傳回類型
- send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[原始碼]¶
- start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[原始碼]¶
使用彈性範本啟動 Dataflow 工作,並等待其完成。
- 參數
body (dict) – 請求主體。請參閱: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 工作的位置 (例如 europe-west1)
project_id (str) – 擁有工作的 GCP 專案 ID。如果設定為
None
或遺失,則使用 GCP 連線中的預設 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已棄用) 在偵測到工作 ID 時呼叫的回呼函數。
on_new_job_callback (Callable[[dict], None] | None) – 在偵測到工作時呼叫的回呼函數。
- 傳回
工作
- 傳回類型
- launch_job_with_flex_template(body, location, project_id)[原始碼]¶
使用彈性範本啟動 Dataflow 工作,並在不等待工作完成的情況下結束。
- 參數
body (dict) – 請求主體。請參閱: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 工作的位置 (例如 europe-west1)
project_id (str) – 擁有工作的 GCP 專案 ID。如果設定為
None
或遺失,則使用 GCP 連線中的預設 project_id。
- 傳回
Dataflow 工作回應
- 傳回類型
- launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]¶
啟動 Dataflow YAML 工作並執行至完成。
- 參數
job_name (str) – 要指派給 Cloud Dataflow 工作的唯一名稱。
yaml_pipeline_file (str) – 定義要執行之 YAML 管線的檔案路徑。必須是本機檔案或以 ‘gs://’ 開頭的 URL。
append_job_name (bool) – 如果必須將唯一後綴附加到 job_name,則設定為 True。
jinja_variables (dict[str, str] | None) – Jinja2 變數的字典,用於具體化 yaml 管線檔案。
options (dict[str, Any] | None) – 其他 gcloud 或 Beam 工作參數。它必須是一個字典,其鍵與 gcloud 中的選用旗標名稱相符。支援的旗標列表可在以下位置找到: https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。請注意,如果旗標不需要值,則其字典值必須為 True 或 None。例如,–log-http 旗標可以作為 {‘log-http’: True} 傳遞。
project_id (str) – 擁有工作的 GCP 專案 ID。
location (str) – 工作區域端點的區域 ID。預設為 ‘us-central1’。
on_new_job_callback – 一旦已知工作,將工作傳遞給運算子的回呼函數。
- 傳回
工作 ID。
- 傳回類型
- start_python_dataflow(job_name, variables, dataflow, py_options, project_id, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, append_job_name=True, on_new_job_id_callback=None, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]¶
啟動 Dataflow 工作。
- 參數
job_name (str) – 工作的名稱。
variables (dict) – 傳遞給工作的變數。
dataflow (str) – Dataflow 程序名稱。
project_id (str) – 擁有工作的 GCP 專案 ID。如果設定為
None
或遺失,則使用 GCP 連線中的預設 project_id。py_interpreter (str) – beam 管線的 Python 版本。如果為 None,則預設為 python3。若要追蹤 beam 支援的 python 版本和相關問題,請查看: https://issues.apache.org/jira/browse/BEAM-1251
py_requirements (list[str] | None) –
要安裝的其他 python 套件。如果將值傳遞給此參數,則會建立一個新的虛擬環境並安裝其他套件。
如果您的系統上未安裝 apache-beam 套件,或者您想要使用其他版本,您也可以安裝該套件。
py_system_site_packages (bool) –
是否在您的 virtualenv 中包含 system_site_packages。請參閱 virtualenv 文件以取得更多資訊。
只有在
py_requirements
參數不是 None 時,此選項才相關。append_job_name (bool) – 如果必須將唯一後綴附加到工作名稱,則為 True。
project_id – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
on_new_job_id_callback (Callable[[str], None] | None) – 當工作 ID 已知時呼叫的回呼函數。
location (str) – 工作位置。
- is_job_dataflow_running(name, project_id, location=None, variables=None)[原始碼]¶
檢查 Dataflow 中的任務是否仍在執行。
- cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取消具有指定名稱前綴或任務 ID 的任務。
參數
name
和job_id
互斥。
- start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]¶
啟動 Dataflow SQL 查詢。
- 參數
job_name (str) – 要指派給 Cloud Dataflow 工作的唯一名稱。
query ( str ) – 要執行的 SQL 查詢。
options ( dict[str, Any] ) – 要執行的任務參數。 更多資訊,請參閱: https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令參考
location (str) – Dataflow 工作的位置 (例如 europe-west1)
project_id (str) – 擁有工作的 GCP 專案 ID。如果設定為
None
或遺失,則使用 GCP 連線中的預設 project_id。on_new_job_id_callback ( Callable[[str], None] | None ) – (已棄用) 當任務 ID 已知時呼叫的回呼函數。
on_new_job_callback ( Callable[[dict], None] | None ) – 當任務已知時呼叫的回呼函數。
- 傳回
新的任務物件
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務。
- 參數
job_id ( str ) – 要取得的任務 ID。
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
location ( str ) – Dataflow 任務的位置 (例如 europe-west1)。 請參閱: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 傳回
工作
- 傳回類型
- fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務指標。
- 參數
job_id ( str ) – 要取得的任務 ID。
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
location ( str ) – Dataflow 任務的位置 (例如 europe-west1)。 請參閱: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 傳回
JobMetrics。 請參閱: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- 傳回類型
- fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務訊息。
- fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務自動調整事件。
- wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]¶
等待 Dataflow 任務完成。
- 參數
job_name ( str ) – 執行 DataFlow 任務時要使用的 ‘jobName’ (可使用範本)。 這最終會設定在管道選項中,因此
options
中任何具有鍵'jobName'
的項目都將被覆寫。location ( str ) – 任務執行的位置
project_id (str) – 選擇性,啟動工作的 Google Cloud 專案 ID。如果設定為 None 或遺失,則使用 Google Cloud 連線中的預設 project_id。
job_id ( str | None ) – Dataflow 任務 ID
multiple_jobs ( bool ) – 如果管道建立多個任務,則監控所有任務
- create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
建立新的 Dataflow Data Pipelines 執行個體。
- 參數
body ( dict ) – 請求主體 (包含 Pipeline 的執行個體)。 請參閱: https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
project_id (str) – 擁有工作的 GCP 專案 ID。
location ( str ) – 將 Data Pipelines 執行個體導向的位置 (例如 us-central1)。
以 JSON 格式傳回建立的 Data Pipelines 執行個體。
- get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
擷取新的 Dataflow Data Pipelines 執行個體。
- 參數
以 JSON 格式傳回建立的 Data Pipelines 執行個體。
- run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
執行 Dataflow Data Pipeline 執行個體。
- 參數
以 JSON 格式傳回建立的任務。
- class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]¶
基底類別:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
Dataflow 服務的非同步 hook 類別。
- async initialize_client(client_class)[source]¶
初始化給定類別的物件。
此方法用於初始化非同步用戶端。 由於 Dataflow 服務使用大量的類別,因此決定使用從 GoogleBaseHook 類別的方法接收的憑證以相同的方式初始化它們。 :param client_class: Google Cloud SDK 的類別
- async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務。
- 參數
job_id ( str ) – 要取得的任務 ID。
project_id ( str ) – 要在其中啟動任務的 Google Cloud 專案 ID。 如果設定為 None 或遺失,則會使用 Google Cloud 連線中的預設 project_id。
job_view ( int ) – 選擇性。 JobView 物件,決定傳回資料的表示方式
location ( str ) – 選擇性。 Dataflow 任務的位置 (例如 europe-west1)。 請參閱: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
取得具有指定任務 ID 的任務狀態。
- 參數
job_id ( str ) – 要取得的任務 ID。
project_id ( str ) – 要在其中啟動任務的 Google Cloud 專案 ID。 如果設定為 None 或遺失,則會使用 Google Cloud 連線中的預設 project_id。
job_view ( int ) – 選擇性。 JobView 物件,決定傳回資料的表示方式
location ( str ) – 選擇性。 Dataflow 任務的位置 (例如 europe-west1)。 請參閱: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]¶
列出任務。
- 參數
jobs_filter ( int | None ) – 選擇性。 此欄位篩選掉並傳回指定任務狀態的任務。
project_id ( str | None ) – 選擇性。 要在其中啟動任務的 Google Cloud 專案 ID。 如果設定為 None 或遺失,則會使用 Google Cloud 連線中的預設 project_id。
location ( str | None ) – 選擇性。 Dataflow 任務的位置 (例如 europe-west1)。
page_size ( int | None ) – 選擇性。 如果任務很多,則將回應限制為最多這麼多。
page_token ( str | None ) – 選擇性。 將此設定為先前回應的 ‘next_page_token’ 欄位,以在長列表中請求其他結果。
- async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
從 MessagesV1Beta3AsyncClient 傳回 ListJobMessagesAsyncPager 物件。
此方法包裝了 MessagesV1Beta3AsyncClient 的類似方法。 ListJobMessagesAsyncPager 可以迭代以提取與特定任務 ID 關聯的訊息。
如需更多詳細資訊,請參閱 MessagesV1Beta3AsyncClient 方法描述,網址為: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient
- 參數
job_id ( str ) – 要取得訊息的 Dataflow 任務 ID。
project_id ( str | None ) – 選擇性。 要在其中啟動任務的 Google Cloud 專案 ID。 如果設定為 None 或遺失,則會使用 Google Cloud 連線中的預設 project_id。
minimum_importance ( int ) – 選擇性。 篩選以僅取得重要性 >= level 的訊息。 如需更多詳細資訊,請參閱以下描述: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance
page_size ( int | None ) – 選擇性。 如果指定,則決定要傳回的最大訊息數。 如果未指定,服務可能會選擇適當的預設值,或可能會傳回任意大量的結果。
page_token ( str | None ) – 選擇性。 如果提供,則應為先前呼叫傳回的 next_page_token 值。 這將導致傳回下一頁的結果。
start_time ( google.protobuf.timestamp_pb2.Timestamp | None ) – 選擇性。 如果指定,則僅傳回時間戳記 >= start_time 的訊息。 預設值為任務建立時間 (即訊息的開頭)。
end_time ( google.protobuf.timestamp_pb2.Timestamp | None ) – 選擇性。 如果指定,則僅傳回時間戳記 < end_time 的訊息。 預設值為目前時間。
location ( str | None ) – 選擇性。 包含 job_id 指定之任務的 [區域端點] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。
- async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
從 MetricsV1Beta3AsyncClient 傳回 JobMetrics 物件。
此方法包裝了 MetricsV1Beta3AsyncClient 的類似方法。
如需更多詳細資訊,請參閱 MetricsV1Beta3AsyncClient 方法描述,網址為: https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient
- 參數
job_id ( str ) – 要取得指標的 Dataflow 任務 ID。
project_id ( str | None ) – 選擇性。 要在其中啟動任務的 Google Cloud 專案 ID。 如果設定為 None 或遺失,則會使用 Google Cloud 連線中的預設 project_id。
start_time ( google.protobuf.timestamp_pb2.Timestamp | None ) – 選擇性。 僅傳回自此時間以來已變更的指標資料。 預設值為傳回任務所有指標的所有資訊。
location ( str | None ) – 選擇性。 包含 job_id 指定之任務的 [區域端點] (https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。