airflow.sensors.external_task
¶
模組內容¶
類別¶
ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。 |
|
等待不同的 DAG、任務群組或任務完成特定的邏輯日期。 |
|
使用此 operator 指示不同 DAG 上的任務依賴於此任務。 |
|
此外部連結已棄用。 |
- class airflow.sensors.external_task.ExternalDagLink[原始碼]¶
基底類別:
airflow.models.baseoperatorlink.BaseOperatorLink
ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。
它允許使用者存取使用 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。
- get_link(operator, *, ti_key)[原始碼]¶
連結到外部系統。
注意:此函數的舊簽名為
(self, operator, dttm: datetime)
。 執行時仍支援該簽名,但已棄用。- 參數
operator (airflow.models.baseoperator.BaseOperator) – 與此連結相關聯的 Airflow operator 物件。
ti_key (airflow.models.taskinstancekey.TaskInstanceKey) – 要返回連結的 TaskInstance ID。
- 返回
連結到外部系統
- 返回類型
- class airflow.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[原始碼]¶
基底類別:
airflow.sensors.base.BaseSensorOperator
等待不同的 DAG、任務群組或任務完成特定的邏輯日期。
如果 external_task_group_id 和 external_task_id 均為
None
(預設值),則感測器會等待 DAG。 external_task_group_id 和 external_task_id 的值不能同時設定。預設情況下,ExternalTaskSensor 將等待外部任務成功,屆時它也會成功。但是,預設情況下,如果外部任務失敗,它不會失敗,而是會繼續檢查狀態直到感測器逾時(因此讓您有時間重試外部任務,而無需清除感測器)。
預設情況下,如果外部任務跳過,ExternalTaskSensor 不會跳過。 若要變更此設定,只需設定
skipped_states=[TaskInstanceState.SKIPPED]
即可。 請注意,如果您正在監控多個任務,且一個任務進入錯誤狀態,而另一個任務進入跳過狀態,則外部任務將對其首先看到的狀態做出反應。 如果兩者同時發生,則失敗狀態優先。可以透過設定導致感測器失敗的狀態來變更預設行為,例如,透過設定
allowed_states=[DagRunState.FAILED]
和failed_states=[DagRunState.SUCCESS]
,您將翻轉行為以獲得一個感測器,該感測器在外部任務失敗時變為綠色,而在外部任務成功時立即變為紅色!請注意,檢查 failed_states 時會遵循
soft_fail
。 因此,如果外部任務進入失敗狀態且soft_fail == True
,則感測器將跳過而不是失敗。 因此,設定soft_fail=True
和failed_states=[DagRunState.SKIPPED]
將導致感測器在外部任務跳過時跳過。 然而,這是一個刻意的範例—如果您想要此行為,請考慮使用skipped_states
。 使用skipped_states
允許感測器在目標失敗時跳過,但仍會在逾時時進入失敗狀態。 如上所述使用soft_fail == True
將導致感測器在目標失敗時跳過,並且在逾時時也會跳過。- 參數
external_dag_id (str) – 包含您要等待的任務的 dag_id。 (已套用範本)
external_task_id (str | None) – 包含您要等待的任務的 task_id。 (已套用範本)
external_task_ids (Collection[str] | None) – 您要等待的 task_id 清單。 (已套用範本) 如果
None
(預設值),則感測器會等待 DAG。 external_task_id 或 external_task_ids 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。external_task_group_id (str | None) – 包含您要等待的任務的 task_group_id。 (已套用範本)
allowed_states (Iterable[str] | None) – 允許狀態的 Iterable,預設值為
['success']
skipped_states (Iterable[str] | None) – 使此任務標記為跳過的狀態 Iterable,預設值為
None
failed_states (Iterable[str] | None) – 失敗或不允許狀態的 Iterable,預設值為
None
execution_delta (datetime.timedelta | None) – 與先前執行時間的時間差,用於查找,預設值與當前任務或 DAG 的邏輯日期相同。 對於昨天,請使用 [正數!] datetime.timedelta(days=1)。 execution_delta 或 execution_date_fn 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。
execution_date_fn (Callable | None) – 接收目前執行的邏輯日期作為第一個位置引數,並且可選擇接收上下文字典中可用的任意數量的關鍵字引數的函數,並傳回要查詢的所需邏輯日期。 execution_delta 或 execution_date_fn 可以傳遞給 ExternalTaskSensor,但不能同時傳遞兩者。
check_existence (bool) – 設定為 True 以檢查外部任務是否存在(當 external_task_id 不是 None 時)或檢查要等待的 DAG 是否存在(當 external_task_id 為 None 時),如果外部任務或 DAG 不存在,則立即停止等待(預設值:False)。
poll_interval (float) – 輪詢週期(秒),用於檢查狀態
deferrable (bool) – 在可延遲模式下執行感測器
- template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][原始碼]¶
- class airflow.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, execution_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[原始碼]¶
基底類別:
airflow.operators.empty.EmptyOperator
使用此 operator 指示不同 DAG 上的任務依賴於此任務。
當選取「遞迴」清除此任務時,Airflow 將清除另一個 DAG 上的任務及其下游任務(以遞迴方式)。 傳遞依賴項會被追蹤,直到達到 recursion_depth。
- 參數
external_dag_id (str) – 包含需要清除的依賴任務的 dag_id。
external_task_id (str) – 需要清除的依賴任務的 task_id。
execution_date (str | datetime.datetime | None) – 需要清除的依賴任務執行的邏輯日期。
recursion_depth (int) – 允許的傳遞依賴項最大層級。 預設值為 10。 這主要用於防止循環依賴。 如有必要,可以增加此數字。 但是,過多層級的傳遞依賴項會使在 Web UI 中清除任務的速度變慢。
- class airflow.sensors.external_task.ExternalTaskSensorLink[原始碼]¶
基底類別:
ExternalDagLink
此外部連結已棄用。