airflow.providers.standard.sensors.external_task

模組內容

類別

ExternalDagLink

ExternalTaskSensor 和 ExternalTaskMarker 的運算子連結。

ExternalTaskSensor

等待不同的 DAG、任務群組或任務完成特定的邏輯日期。

ExternalTaskMarker

使用此運算子來指示不同 DAG 上的任務依賴於此任務。

基底類別: airflow.models.baseoperatorlink.BaseOperatorLink

ExternalTaskSensor 和 ExternalTaskMarker 的運算子連結。

它允許使用者存取透過 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。

name = '外部 DAG'[source]

連結到外部系統。

注意:此函數的舊簽名為 (self, operator, dttm: datetime)。 執行時仍支援該簽名,但已棄用。

參數
返回

外部系統的連結

返回類型

str

class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基底類別: airflow.sensors.base.BaseSensorOperator

等待不同的 DAG、任務群組或任務完成特定的邏輯日期。

如果 external_task_group_idexternal_task_id 均為 None (預設值),則感測器會等待 DAG。 external_task_group_idexternal_task_id 的值不能同時設定。

預設情況下,ExternalTaskSensor 將等待外部任務成功,屆時它也會成功。但是,預設情況下,如果外部任務失敗,它不會失敗,而是會繼續檢查狀態,直到感測器逾時 (因此讓您有時間重試外部任務,而無需清除感測器)。

預設情況下,如果外部任務跳過,ExternalTaskSensor 將不會跳過。 若要變更此行為,只需設定 skipped_states=[TaskInstanceState.SKIPPED] 即可。 請注意,如果您正在監控多個任務,並且一個任務進入錯誤狀態,而另一個任務進入跳過狀態,則外部任務將對其首先看到的狀態做出反應。 如果兩者同時發生,則失敗狀態優先。

可以透過設定導致感測器失敗的狀態來更改預設行為,例如,透過設定 allowed_states=[DagRunState.FAILED]failed_states=[DagRunState.SUCCESS],您將翻轉行為,以獲得一個感測器,當外部任務失敗時變為綠色,而當外部任務成功時立即變為紅色!

請注意,在檢查 failed_states 時,會遵循 soft_fail。 因此,如果外部任務進入失敗狀態且 soft_fail == True,則感測器將 _skip_ 而不是失敗。 因此,如果設定 soft_fail=Truefailed_states=[DagRunState.SKIPPED],將導致感測器在外部任務跳過時跳過。 然而,這是一個牽強的範例 — 如果您想要此行為,請考慮使用 skipped_states。 使用 skipped_states 允許感測器在目標失敗時跳過,但仍會在逾時時進入失敗狀態。 如上所述使用 soft_fail == True 將導致感測器在目標失敗時跳過,並且在逾時時也跳過。

參數
  • external_dag_id (str) – 包含您要等待的任務的 dag_id。 (可使用範本)

  • external_task_id (str | None) – 包含您要等待的任務的 task_id。 (可使用範本)

  • external_task_ids (collections.abc.Collection[str] | None) – 您要等待的 task_id 清單。 (可使用範本) 如果 None (預設值),則感測器會等待 DAG。 external_task_id 或 external_task_ids 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。

  • external_task_group_id (str | None) – 包含您要等待的任務的 task_group_id。 (可使用範本)

  • allowed_states (collections.abc.Iterable[str] | None) – 允許狀態的可迭代物件,預設值為 ['success']

  • skipped_states (collections.abc.Iterable[str] | None) – 使此任務標記為跳過的狀態的可迭代物件,預設值為 None

  • failed_states (collections.abc.Iterable[str] | None) – 失敗或不允許狀態的可迭代物件,預設值為 None

  • execution_delta (datetime.timedelta | None) – 與先前執行的時間差以查看,預設值與目前任務或 DAG 的邏輯日期相同。 對於昨天,請使用 [正值!] datetime.timedelta(days=1)。 execution_delta 或 execution_date_fn 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。

  • execution_date_fn (Callable | None) – 接收目前執行的邏輯日期作為第一個位置引數,以及選擇性地接收內容字典中可用的任意數量的關鍵字引數的函數,並返回要查詢的所需邏輯日期。 execution_delta 或 execution_date_fn 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。

  • check_existence (bool) – 設定為 True 以檢查外部任務是否存在 (當 external_task_id 不是 None 時) 或檢查要等待的 DAG 是否存在 (當 external_task_id 為 None 時),如果外部任務或 DAG 不存在,則立即停止等待 (預設值:False)。

  • poll_interval (float) – 檢查狀態的輪詢週期 (秒)

  • deferrable (bool) – 在可延遲模式下執行感測器

template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]
ui_color = '#4db7db'[source]
poke(context, session=NEW_SESSION)[source]

覆寫衍生此類別時。

execute(context)[source]

在工作節點上執行,如果 deferrable 設定為 True,則使用觸發器延遲。

execute_complete(context, event=None)[source]

當觸發器觸發時執行 - 立即返回。

get_count(dttm_filter, session, states)[source]

取得針對 dttm 篩選器和狀態的記錄計數。

參數
  • dttm_filter – 邏輯日期的日期時間篩選器

  • session – airflow session 物件

  • states – 任務或 dag 狀態

返回

針對篩選器的記錄計數

返回類型

int

get_external_task_group_task_ids(session, dttm_filter)[source]
class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]

基底類別: airflow.operators.empty.EmptyOperator

使用此運算子來指示不同 DAG 上的任務依賴於此任務。

當選取「遞迴」清除此任務時,Airflow 將以遞迴方式清除另一個 DAG 上的任務及其下游任務。 傳遞依賴項會追蹤到達到 recursion_depth 為止。

參數
  • external_dag_id (str) – 包含需要清除的相依任務的 dag_id。

  • external_task_id (str) – 需要清除的相依任務的 task_id。

  • logical_date (str | datetime.datetime | None) – 需要清除的相依任務執行的邏輯日期。

  • recursion_depth (int) – 允許的傳遞依賴項最大層級。 預設值為 10。 這主要用於防止循環依賴項。 如果需要,可以增加此數字。 但是,過多層級的傳遞依賴項會使在 Web UI 中清除任務的速度變慢。

template_fields = ['external_dag_id', 'external_task_id', 'logical_date'][source]
ui_color = '#4db7db'[source]
classmethod get_serialized_fields()[source]

序列化 ExternalTaskMarker 以精確包含這些欄位 + templated_fields 。

此條目是否有幫助?