airflow.providers.standard.sensors.external_task
¶
模組內容¶
類別¶
ExternalTaskSensor 和 ExternalTaskMarker 的運算子連結。 |
|
等待不同的 DAG、任務群組或任務完成特定的邏輯日期。 |
|
使用此運算子來指示不同 DAG 上的任務依賴於此任務。 |
- class airflow.providers.standard.sensors.external_task.ExternalDagLink[source]¶
基底類別:
airflow.models.baseoperatorlink.BaseOperatorLink
ExternalTaskSensor 和 ExternalTaskMarker 的運算子連結。
它允許使用者存取透過 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。
- get_link(operator, *, ti_key)[source]¶
連結到外部系統。
注意:此函數的舊簽名為
(self, operator, dttm: datetime)
。 執行時仍支援該簽名,但已棄用。- 參數
operator (airflow.models.baseoperator.BaseOperator) – 與此連結關聯的 Airflow 運算子物件。
ti_key (airflow.models.taskinstancekey.TaskInstanceKey) – 要返回連結的 TaskInstance ID。
- 返回
外部系統的連結
- 返回類型
- class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基底類別:
airflow.sensors.base.BaseSensorOperator
等待不同的 DAG、任務群組或任務完成特定的邏輯日期。
如果 external_task_group_id 和 external_task_id 均為
None
(預設值),則感測器會等待 DAG。 external_task_group_id 和 external_task_id 的值不能同時設定。預設情況下,ExternalTaskSensor 將等待外部任務成功,屆時它也會成功。但是,預設情況下,如果外部任務失敗,它不會失敗,而是會繼續檢查狀態,直到感測器逾時 (因此讓您有時間重試外部任務,而無需清除感測器)。
預設情況下,如果外部任務跳過,ExternalTaskSensor 將不會跳過。 若要變更此行為,只需設定
skipped_states=[TaskInstanceState.SKIPPED]
即可。 請注意,如果您正在監控多個任務,並且一個任務進入錯誤狀態,而另一個任務進入跳過狀態,則外部任務將對其首先看到的狀態做出反應。 如果兩者同時發生,則失敗狀態優先。可以透過設定導致感測器失敗的狀態來更改預設行為,例如,透過設定
allowed_states=[DagRunState.FAILED]
和failed_states=[DagRunState.SUCCESS]
,您將翻轉行為,以獲得一個感測器,當外部任務失敗時變為綠色,而當外部任務成功時立即變為紅色!請注意,在檢查 failed_states 時,會遵循
soft_fail
。 因此,如果外部任務進入失敗狀態且soft_fail == True
,則感測器將 _skip_ 而不是失敗。 因此,如果設定soft_fail=True
和failed_states=[DagRunState.SKIPPED]
,將導致感測器在外部任務跳過時跳過。 然而,這是一個牽強的範例 — 如果您想要此行為,請考慮使用skipped_states
。 使用skipped_states
允許感測器在目標失敗時跳過,但仍會在逾時時進入失敗狀態。 如上所述使用soft_fail == True
將導致感測器在目標失敗時跳過,並且在逾時時也跳過。- 參數
external_dag_id (str) – 包含您要等待的任務的 dag_id。 (可使用範本)
external_task_id (str | None) – 包含您要等待的任務的 task_id。 (可使用範本)
external_task_ids (collections.abc.Collection[str] | None) – 您要等待的 task_id 清單。 (可使用範本) 如果
None
(預設值),則感測器會等待 DAG。 external_task_id 或 external_task_ids 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。external_task_group_id (str | None) – 包含您要等待的任務的 task_group_id。 (可使用範本)
allowed_states (collections.abc.Iterable[str] | None) – 允許狀態的可迭代物件,預設值為
['success']
skipped_states (collections.abc.Iterable[str] | None) – 使此任務標記為跳過的狀態的可迭代物件,預設值為
None
failed_states (collections.abc.Iterable[str] | None) – 失敗或不允許狀態的可迭代物件,預設值為
None
execution_delta (datetime.timedelta | None) – 與先前執行的時間差以查看,預設值與目前任務或 DAG 的邏輯日期相同。 對於昨天,請使用 [正值!] datetime.timedelta(days=1)。 execution_delta 或 execution_date_fn 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。
execution_date_fn (Callable | None) – 接收目前執行的邏輯日期作為第一個位置引數,以及選擇性地接收內容字典中可用的任意數量的關鍵字引數的函數,並返回要查詢的所需邏輯日期。 execution_delta 或 execution_date_fn 可以傳遞至 ExternalTaskSensor,但不能同時傳遞兩者。
check_existence (bool) – 設定為 True 以檢查外部任務是否存在 (當 external_task_id 不是 None 時) 或檢查要等待的 DAG 是否存在 (當 external_task_id 為 None 時),如果外部任務或 DAG 不存在,則立即停止等待 (預設值:False)。
poll_interval (float) – 檢查狀態的輪詢週期 (秒)
deferrable (bool) – 在可延遲模式下執行感測器
- template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]¶
- class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]¶
基底類別:
airflow.operators.empty.EmptyOperator
使用此運算子來指示不同 DAG 上的任務依賴於此任務。
當選取「遞迴」清除此任務時,Airflow 將以遞迴方式清除另一個 DAG 上的任務及其下游任務。 傳遞依賴項會追蹤到達到 recursion_depth 為止。
- 參數
external_dag_id (str) – 包含需要清除的相依任務的 dag_id。
external_task_id (str) – 需要清除的相依任務的 task_id。
logical_date (str | datetime.datetime | None) – 需要清除的相依任務執行的邏輯日期。
recursion_depth (int) – 允許的傳遞依賴項最大層級。 預設值為 10。 這主要用於防止循環依賴項。 如果需要,可以增加此數字。 但是,過多層級的傳遞依賴項會使在 Web UI 中清除任務的速度變慢。