airflow.sensors.external_task

模組內容

類別

ExternalDagLink

ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。

ExternalTaskSensor

等待不同的 DAG、任務群組或任務完成特定的邏輯日期。

ExternalTaskMarker

使用此 operator 指示不同 DAG 上的任務依賴於此任務。

ExternalTaskSensorLink

此外部連結已棄用。

基底類別: airflow.models.baseoperatorlink.BaseOperatorLink

ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。

它允許使用者存取使用 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。

name = '外部 DAG'[原始碼]

連結到外部系統。

注意:此函數的舊簽名為 (self, operator, dttm: datetime)。 執行時仍支援該簽名,但已棄用。

參數
返回

連結到外部系統

返回類型

str

class airflow.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[原始碼]

基底類別: airflow.sensors.base.BaseSensorOperator

等待不同的 DAG、任務群組或任務完成特定的邏輯日期。

如果 external_task_group_idexternal_task_id 均為 None(預設值),則感測器會等待 DAG。 external_task_group_idexternal_task_id 的值不能同時設定。

預設情況下,ExternalTaskSensor 將等待外部任務成功,屆時它也會成功。但是,預設情況下,如果外部任務失敗,它不會失敗,而是會繼續檢查狀態直到感測器逾時(因此讓您有時間重試外部任務,而無需清除感測器)。

預設情況下,如果外部任務跳過,ExternalTaskSensor 不會跳過。 若要變更此設定,只需設定 skipped_states=[TaskInstanceState.SKIPPED] 即可。 請注意,如果您正在監控多個任務,且一個任務進入錯誤狀態,而另一個任務進入跳過狀態,則外部任務將對其首先看到的狀態做出反應。 如果兩者同時發生,則失敗狀態優先。

可以透過設定導致感測器失敗的狀態來變更預設行為,例如,透過設定 allowed_states=[DagRunState.FAILED]failed_states=[DagRunState.SUCCESS],您將翻轉行為以獲得一個感測器,該感測器在外部任務失敗時變為綠色,而在外部任務成功時立即變為紅色!

請注意,檢查 failed_states 時會遵循 soft_fail。 因此,如果外部任務進入失敗狀態且 soft_fail == True,則感測器將跳過而不是失敗。 因此,設定 soft_fail=Truefailed_states=[DagRunState.SKIPPED] 將導致感測器在外部任務跳過時跳過。 然而,這是一個刻意的範例—如果您想要此行為,請考慮使用 skipped_states。 使用 skipped_states 允許感測器在目標失敗時跳過,但仍會在逾時時進入失敗狀態。 如上所述使用 soft_fail == True 將導致感測器在目標失敗時跳過,並且在逾時時也會跳過。

參數
  • external_dag_id (str) – 包含您要等待的任務的 dag_id。 (已套用範本)

  • external_task_id (str | None) – 包含您要等待的任務的 task_id。 (已套用範本)

  • external_task_ids (Collection[str] | None) – 您要等待的 task_id 清單。 (已套用範本) 如果 None(預設值),則感測器會等待 DAG。 external_task_id 或 external_task_ids 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。

  • external_task_group_id (str | None) – 包含您要等待的任務的 task_group_id。 (已套用範本)

  • allowed_states (Iterable[str] | None) – 允許狀態的 Iterable,預設值為 ['success']

  • skipped_states (Iterable[str] | None) – 使此任務標記為跳過的狀態 Iterable,預設值為 None

  • failed_states (Iterable[str] | None) – 失敗或不允許狀態的 Iterable,預設值為 None

  • execution_delta (datetime.timedelta | None) – 與先前執行時間的時間差,用於查找,預設值與當前任務或 DAG 的邏輯日期相同。 對於昨天,請使用 [正數!] datetime.timedelta(days=1)。 execution_delta 或 execution_date_fn 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。

  • execution_date_fn (Callable | None) – 接收目前執行的邏輯日期作為第一個位置引數,並且可選擇接收上下文字典中可用的任意數量的關鍵字引數的函數,並傳回要查詢的所需邏輯日期。 execution_delta 或 execution_date_fn 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。

  • check_existence (bool) – 設定為 True 以檢查外部任務是否存在(當 external_task_id 不是 None 時)或檢查要等待的 DAG 是否存在(當 external_task_id 為 None 時),如果外部任務或 DAG 不存在,則立即停止等待(預設值:False)。

  • poll_interval (float) – 輪詢週期(秒),用於檢查狀態

  • deferrable (bool) – 在可延遲模式下執行感測器

template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][原始碼]
ui_color = '#4db7db'[原始碼]
poke(context, session=NEW_SESSION)[原始碼]

覆寫衍生此類別時。

execute(context)[原始碼]

在 worker 上執行,如果 deferrable 設定為 True,則使用觸發器延遲。

execute_complete(context, event=None)[原始碼]

觸發器觸發時執行 - 立即返回。

get_count(dttm_filter, session, states)[原始碼]

取得針對 dttm 篩選器和狀態的記錄計數。

參數
  • dttm_filter – 執行日期的日期時間篩選器

  • session – airflow session 物件

  • states – 任務或 dag 狀態

返回

符合篩選器的記錄計數

返回類型

int

get_external_task_group_task_ids(session, dttm_filter)[原始碼]
class airflow.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, execution_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[原始碼]

基底類別: airflow.operators.empty.EmptyOperator

使用此 operator 指示不同 DAG 上的任務依賴於此任務。

當選取「遞迴」清除此任務時,Airflow 將清除另一個 DAG 上的任務及其下游任務(以遞迴方式)。 傳遞依賴項會被追蹤,直到達到 recursion_depth。

參數
  • external_dag_id (str) – 包含需要清除的依賴任務的 dag_id。

  • external_task_id (str) – 需要清除的依賴任務的 task_id。

  • execution_date (str | datetime.datetime | None) – 需要清除的依賴任務執行的邏輯日期。

  • recursion_depth (int) – 允許的傳遞依賴項最大層級。 預設值為 10。 這主要用於防止循環依賴。 如有必要,可以增加此數字。 但是,過多層級的傳遞依賴項會使在 Web UI 中清除任務的速度變慢。

template_fields = ['external_dag_id', 'external_task_id', 'execution_date'][原始碼]
ui_color = '#4db7db'[原始碼]
classmethod get_serialized_fields()[原始碼]

序列化 ExternalTaskMarker 以精確包含這些欄位 + templated_fields 。

基底類別: ExternalDagLink

此外部連結已棄用。

請使用 airflow.sensors.external_task.ExternalDagLink

__attrs_post_init__()[原始碼]

此條目是否有幫助?