airflow.providers.google.cloud.triggers.dataflow

模組內容

類別

TemplateJobStartTrigger

Dataflow 觸發器,用於檢查範本化作業是否已完成。

DataflowJobStatusTrigger

觸發器,用於監控 Dataflow 作業是否已達到任何預期的狀態。

DataflowStartYamlJobTrigger

Dataflow 觸發器,用於檢查 Dataflow YAML 作業的狀態。

DataflowJobMetricsTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的指標。

DataflowJobAutoScalingEventTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的自動擴展事件。

DataflowJobMessagesTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的作業訊息。

屬性

DEFAULT_DATAFLOW_LOCATION

airflow.providers.google.cloud.triggers.dataflow.DEFAULT_DATAFLOW_LOCATION = 'us-central1'[原始碼]
class airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, cancel_timeout=5 * 60)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

Dataflow 觸發器,用於檢查範本化作業是否已完成。

參數
  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • job_id (str) – 必填。作業的 ID。

  • location (str) – 選填。執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

  • cancel_timeout (int | None) – 選填。當任務被終止時,運算子應等待管道成功取消的時間長度(秒)。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

擷取作業狀態或產生特定事件。

類別的主要迴圈,在其中擷取作業狀態並產生特定事件。

如果作業狀態為成功,則會產生具有成功狀態的 TriggerEvent;如果作業狀態為失敗,則會產生具有錯誤狀態的 TriggerEvent。在任何其他情況下,Trigger 將等待儲存在 self.poll_sleep 變數中的指定時間量。

class airflow.providers.google.cloud.triggers.dataflow.DataflowJobStatusTrigger(job_id, expected_statuses, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

觸發器,用於監控 Dataflow 作業是否已達到任何預期的狀態。

參數
  • job_id (str) – 必填。作業的 ID。

  • expected_statuses (set[str]) – 作業的預期狀態。請參閱: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • location (str) – 選填。執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值。

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • poll_sleep (int) – 檢查作業的兩個連續呼叫之間等待的時間(秒)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

迴圈直到作業達到預期或終止狀態。

如果用戶端傳回預期的作業狀態,則產生具有成功狀態的 TriggerEvent。

如果用戶端傳回非預期的終止作業狀態,或在迴圈期間引發任何例外狀況,則產生具有錯誤狀態的 TriggerEvent。

在任何其他情況下,Trigger 將等待儲存在 self.poll_sleep 變數中的指定時間量。

async_hook()[原始碼]
class airflow.providers.google.cloud.triggers.dataflow.DataflowStartYamlJobTrigger(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, cancel_timeout=5 * 60, expected_terminal_state=None, impersonation_chain=None)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

Dataflow 觸發器,用於檢查 Dataflow YAML 作業的狀態。

參數
  • job_id (str) – 必填。作業的 ID。

  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • location (str) – 執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值。

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • poll_sleep (int) – 選填。輪詢 Google Cloud Platform 以取得 Dataflow 作業的時間間隔(秒)。

  • cancel_timeout (int | None) – 選填。當任務被終止時,運算子應等待管道成功取消的時間長度(秒)。

  • expected_terminal_state (str | None) – 選填。Dataflow 作業的預期終止狀態,運算子任務將在此狀態下設定為成功。批次作業預設為 ‘JOB_STATE_DONE’,串流作業預設為 ‘JOB_STATE_RUNNING’。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

擷取作業並根據作業類型和狀態產生事件。

如果作業達到終止狀態,則產生 TriggerEvent。否則,等待儲存在 self.poll_sleep 變數中的指定時間量。

class airflow.providers.google.cloud.triggers.dataflow.DataflowJobMetricsTrigger(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, fail_on_terminal_state=True)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的指標。

參數
  • job_id (str) – 必填。作業的 ID。

  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • location (str) – 選填。執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值。

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • poll_sleep (int) – 檢查作業的兩個連續呼叫之間等待的時間(秒)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

  • fail_on_terminal_state (bool) – 如果設定為 True,則當作業達到終止狀態時,觸發器將產生具有錯誤狀態的 TriggerEvent。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

迴圈直到傳回終止作業狀態或任何作業指標。

如果用戶端傳回任何作業指標,且 fail_on_terminal_state 屬性為 False,則產生具有成功狀態的 TriggerEvent。

如果用戶端傳回具有終止狀態值的作業狀態,且 fail_on_terminal_state 屬性為 True,則產生具有錯誤狀態的 TriggerEvent。

如果在迴圈期間引發任何例外狀況,則產生具有錯誤狀態的 TriggerEvent。

在任何其他情況下,Trigger 將等待儲存在 self.poll_sleep 變數中的指定時間量。

async get_job_metrics()[原始碼]

等待 Dataflow 用戶端回應,然後以序列化清單形式傳回。

async_hook()[原始碼]
class airflow.providers.google.cloud.triggers.dataflow.DataflowJobAutoScalingEventTrigger(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, fail_on_terminal_state=True)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的自動擴展事件。

參數
  • job_id (str) – 必填。作業的 ID。

  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • location (str) – 選填。執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值。

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • poll_sleep (int) – 檢查作業的兩個連續呼叫之間等待的時間(秒)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

  • fail_on_terminal_state (bool) – 如果設定為 True,則當作業達到終止狀態時,觸發器將產生具有錯誤狀態的 TriggerEvent。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

迴圈直到傳回終止作業狀態或任何自動擴展事件。

如果用戶端傳回任何自動擴展事件,且 fail_on_terminal_state 屬性為 False,則產生具有成功狀態的 TriggerEvent。

如果用戶端傳回具有終止狀態值的作業狀態,且 fail_on_terminal_state 屬性為 True,則產生具有錯誤狀態的 TriggerEvent。

如果在迴圈期間引發任何例外狀況,則產生具有錯誤狀態的 TriggerEvent。

在任何其他情況下,Trigger 將等待儲存在 self.poll_sleep 變數中的指定時間量。

async list_job_autoscaling_events()[原始碼]

等待 Dataflow 用戶端回應,然後以序列化清單形式傳回。

async_hook()[原始碼]
class airflow.providers.google.cloud.triggers.dataflow.DataflowJobMessagesTrigger(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, fail_on_terminal_state=True)[原始碼]

基底類別: airflow.triggers.base.BaseTrigger

觸發器,用於檢查與 Dataflow 作業相關聯的作業訊息。

參數
  • job_id (str) – 必填。作業的 ID。

  • project_id (str | None) – 必填。啟動作業的 Google Cloud 專案 ID。

  • location (str) – 選填。執行作業的位置。如果設定為 None,則會使用 DEFAULT_DATAFLOW_LOCATION 的值。

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • poll_sleep (int) – 檢查作業的兩個連續呼叫之間等待的時間(秒)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 選填。要使用短期憑證模擬的服務帳戶,或取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(範本化)。

  • fail_on_terminal_state (bool) – 如果設定為 True,則當作業達到終止狀態時,觸發器將產生具有錯誤狀態的 TriggerEvent。

serialize()[原始碼]

序列化類別引數和類別路徑。

async run()[原始碼]

迴圈直到傳回終止作業狀態或任何作業訊息。

如果用戶端傳回任何作業訊息,且 fail_on_terminal_state 屬性為 False,則產生具有成功狀態的 TriggerEvent。

如果用戶端傳回具有終止狀態值的作業狀態,且 fail_on_terminal_state 屬性為 True,則產生具有錯誤狀態的 TriggerEvent。

如果在迴圈期間引發任何例外狀況,則產生具有錯誤狀態的 TriggerEvent。

在任何其他情況下,Trigger 將等待儲存在 self.poll_sleep 變數中的指定時間量。

async list_job_messages()[原始碼]

等待 Dataflow 用戶端回應,然後以序列化清單形式傳回。

async_hook()[原始碼]

這個條目有幫助嗎?