airflow.triggers.external_task

模組內容

類別

WorkflowTrigger

用於監控 Apache Airflow 中任務、任務群組和 DAG 執行的觸發器。

TaskStateTrigger

非同步等待不同 DAG 中的任務針對特定邏輯日期完成。

DagStateTrigger

非同步等待 DAG 針對特定邏輯日期完成。

class airflow.triggers.external_task.WorkflowTrigger(external_dag_id, execution_dates, external_task_ids=None, external_task_group_id=None, failed_states=None, skipped_states=None, allowed_states=None, poke_interval=2.0, soft_fail=False, **kwargs)[source]

基底類別: airflow.triggers.base.BaseTrigger

用於監控 Apache Airflow 中任務、任務群組和 DAG 執行的觸發器。

參數
  • external_dag_id (str) – 外部 DAG 的 ID。

  • execution_dates (list) – 外部 DAG 的執行日期列表。

  • external_task_ids (Collection[str] | None) – 要等待的外部任務 ID 集合。

  • external_task_group_id (str | None) – 要等待的外部任務群組 ID。

  • failed_states (Iterable[str] | None) – 視為外部任務失敗的狀態。

  • skipped_states (Iterable[str] | None) – 視為外部任務跳過的狀態。

  • allowed_states (Iterable[str] | None) – 視為外部任務成功的狀態。

  • poke_interval (float) – 輪詢外部任務的間隔(秒)。

  • soft_fail (bool) – 若為 True,則觸發器不會因為外部任務失敗而使整個 DAG 失敗。

serialize()[source]

序列化觸發器參數和模組路徑。

async run()[source]

定期檢查任務、任務群組或 DAG 狀態。

class airflow.triggers.external_task.TaskStateTrigger(dag_id, execution_dates, trigger_start_time, states=None, task_id=None, poll_interval=2.0)[source]

基底類別: airflow.triggers.base.BaseTrigger

非同步等待不同 DAG 中的任務針對特定邏輯日期完成。

參數
  • dag_id (str) – 包含您要等待的任務的 dag_id

  • task_id (str | None) – 包含您要等待的任務的 task_id。

  • states (list[str] | None) – 允許的狀態,預設為 ['success']

  • execution_dates (list[datetime.datetime]) – 任務執行時間間隔

  • poll_interval (float) – 檢查狀態的時間間隔(秒)。預設值為 5 秒。

  • trigger_start_time (datetime.datetime) – 觸發器啟動時的 Datetime 格式時間。用於控制觸發器的執行,以防止在資料庫中不存在指定的 DAG 名稱時發生無限迴圈。它將等待一段時間,時間長度等於從觸發器啟動時起的 _timeout_sec 參數,如果執行時間超過預期,觸發器將以 ‘timeout’ 狀態終止。

serialize()[source]

序列化 TaskStateTrigger 參數和類別路徑。

async run()[source]

定期在資料庫中檢查以查看 DAG 是否存在且處於執行狀態。

如果找到,則等待直到指定的任務達到其中一個預期狀態。如果在觸發器的啟動執行程序後 _timeout_sec 秒後,具有指定名稱的 DAG 未處於執行狀態,則以 ‘timeout’ 狀態終止。

count_running_dags(session)[source]

計算資料庫中有多少個 DAG 實例處於執行狀態。

count_tasks(*, session=NEW_SESSION)[source]

計算資料庫中有多少個任務實例符合我們的標準。

class airflow.triggers.external_task.DagStateTrigger(dag_id, states, execution_dates, poll_interval=5.0)[source]

基底類別: airflow.triggers.base.BaseTrigger

非同步等待 DAG 針對特定邏輯日期完成。

參數
  • dag_id (str) – 包含您要等待的任務的 dag_id

  • states (list[airflow.utils.state.DagRunState]) – 允許的狀態,預設為 ['success']

  • execution_dates (list[datetime.datetime]) – DAG 執行時的邏輯日期。

  • poll_interval (float) – 檢查狀態的時間間隔(秒)。預設值為 5.0 秒。

serialize()[source]

序列化 DagStateTrigger 參數和類別路徑。

async run()[source]

定期檢查 DAG 執行是否存在,以及是否已達到其中一個狀態。

count_dags(*, session=NEW_SESSION)[source]

計算資料庫中有多少個 DAG 執行符合我們的標準。

此條目是否有幫助?