DAG 執行¶
DAG 執行個體是一個物件,代表 DAG 在時間上的實例化。每次執行 DAG 時,都會建立一個 DAG 執行個體,並執行其中的所有任務。DAG 執行個體的狀態取決於任務的狀態。每個 DAG 執行個體都是彼此獨立執行的,這表示您可以同時執行 DAG 的多個執行個體。
DAG 執行狀態¶
DAG 執行個體狀態在 DAG 執行完成時確定。DAG 的執行取決於其包含的任務及其依賴關係。當所有任務都處於終端狀態之一(即,如果不可能轉換到另一種狀態),例如 success
、 failed
或 skipped
時,狀態會分配給 DAG 執行個體。DAG 執行個體的狀態是根據所謂的「葉節點」或簡稱「葉」來分配的。葉節點是沒有子節點的任務。
DAG 執行個體有兩種可能的終端狀態
success
如果所有葉節點的狀態都是success
或skipped
,failed
如果任何葉節點的狀態是failed
或upstream_failed
。
注意
如果您的某些任務定義了特定的觸發規則,請小心。這些可能會導致一些意想不到的行為,例如,如果您有一個葉任務的觸發規則為 “all_done”,則無論其餘任務的狀態如何,它都將被執行,如果它成功,那麼即使中間發生了錯誤,整個 DAG 執行個體也將被標記為 success
。
在 Airflow 2.7 中新增
在 UI 儀表板的「執行中」標籤中,可以顯示目前有 DAG 執行個體正在執行的 DAG。同樣地,最新 DAG 執行個體標記為失敗的 DAG 可以在「失敗」標籤中找到。
資料間隔¶
Airflow 中的每個 DAG 執行個體都有一個指定的「資料間隔」,代表它運作的時間範圍。例如,對於以 @daily
排程的 DAG,其每個資料間隔將在每天午夜 (00:00) 開始,並在午夜 (24:00) 結束。
DAG 執行個體通常在其相關的資料間隔結束後排程,以確保執行個體能夠收集時間段內的所有資料。換句話說,涵蓋 2020-01-01 資料期間的執行個體通常要等到 2020-01-01 結束後,即 2020-01-02 00:00:00 之後才開始執行。
Airflow 中的所有日期都以某種方式與資料間隔概念相關聯。例如,DAG 執行個體的「邏輯日期」(在 Airflow 2.2 之前的版本中也稱為 execution_date
)表示資料間隔的開始,而不是 DAG 實際執行的時間。
同樣地,由於 DAG 及其任務的 start_date
引數指向相同的邏輯日期,因此它標記了 DAG 第一個資料間隔的開始,而不是 DAG 中的任務何時開始執行。換句話說,DAG 執行個體只會在 start_date
之後的一個間隔排程。
提示
如果 cron 表達式或 timedelta 物件不足以表達 DAG 的排程、邏輯日期或資料間隔,請參閱Timetables。有關 logical date
的更多資訊,請參閱執行 DAG 和 execution_date 是什麼意思?
重新執行 DAG¶
在某些情況下,您可能想要再次執行 DAG。其中一種情況是排程的 DAG 執行個體失敗時。
補追執行¶
使用 start_date
、可能還有 end_date
和非資料集排程定義的 Airflow DAG,定義了一系列間隔,排程器將這些間隔轉換為個別的 DAG 執行個體並執行。預設情況下,排程器將為自上次資料間隔以來(或已清除)未執行的任何資料間隔啟動 DAG 執行個體。此概念稱為補追執行。
如果您的 DAG 未編寫為處理其補追執行(即,不限於間隔,而是例如 Now
),那麼您將需要關閉補追執行。這可以透過在 DAG 中設定 catchup=False
或在組態檔中設定 catchup_by_default=False
來完成。關閉後,排程器僅為最新間隔建立 DAG 執行個體。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule="@daily",
catchup=False,
)
在上面的範例中,如果排程器守護進程在 2016-01-02 早上 6 點(或從命令列)選取 DAG,則將建立一個 DAG 執行個體,其資料介於 2016-01-01 和 2016-01-02 之間,而下一個 DAG 執行個體將在 2016-01-03 早上午夜過後建立,其資料間隔介於 2016-01-02 和 2016-01-03 之間。
請注意,使用 datetime.timedelta
物件作為排程可能會導致不同的行為。在這種情況下,建立的單個 DAG 執行個體將涵蓋 2016-01-01 06:00 到 2016-01-02 06:00 之間的資料(一個排程間隔現在結束)。如需 cron 和基於 delta 的排程之間差異的更詳細描述,請參閱時間表比較
如果 dag.catchup
值改為 True
,則排程器將為 2015-12-01 到 2016-01-02 之間(但不包括 2016-01-02,因為該間隔尚未完成)的每個已完成間隔建立 DAG 執行個體,並且排程器將依序執行它們。
當您關閉 DAG 一段時間然後重新啟用它時,也會觸發補追執行。
此行為對於可以輕鬆分割成期間的原子資料集非常有用。如果您的 DAG 在內部執行補追執行,則關閉補追執行非常有用。
回填¶
在某些情況下,您可能想要針對指定的歷史期間執行 DAG,例如,使用 start_date
2019-11-21 建立資料填寫 DAG,但另一位使用者需要一個月前的輸出資料,即 2019-10-21。此過程稱為回填。
即使在停用補追執行的情況下,您也可能想要回填資料。這可以透過 CLI 完成。執行以下命令
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
backfill 命令將針對開始日期和結束日期內的所有間隔,重新執行 dag_id 的所有執行個體。
重新執行任務¶
某些任務可能會在排程的執行期間失敗。在您透過日誌找出錯誤並修正錯誤後,您可以透過清除排程日期的任務來重新執行任務。清除任務執行個體會建立任務執行個體的記錄。目前任務執行個體的 try_number
會遞增,max_tries
設定為 0
,狀態設定為 None
,這會導致任務重新執行。
在樹狀或圖形檢視中點擊失敗的任務,然後點擊清除。執行器將重新執行它。
您可以選擇多個選項來重新執行 -
過去 - DAG 最新資料間隔之前的執行中任務的所有執行個體
未來 - DAG 最新資料間隔之後的執行中任務的所有執行個體
上游 - 目前 DAG 中的上游任務
下游 - 目前 DAG 中的下游任務
遞迴 - 子 DAG 和父 DAG 中的所有任務
失敗 - 僅 DAG 最新執行中的失敗任務
您也可以使用以下命令透過 CLI 清除任務
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
對於指定的 dag_id
和時間間隔,此命令會清除與正則表示式匹配的所有任務執行個體。如需更多選項,您可以查看 clear 命令的說明
airflow tasks clear --help
任務執行個體歷史¶
當任務執行個體重試或被清除時,任務執行個體歷史記錄會被保留。您可以透過點擊網格檢視中的任務執行個體來查看此歷史記錄。

注意
上面顯示的嘗試選擇器僅適用於已重試或清除的任務。
歷史記錄顯示特定執行結束時任務執行個體屬性的值。在日誌頁面上,您還可以查看每個任務執行個體嘗試的日誌。這對於偵錯很有用。

注意
相關的任務執行個體物件(例如 XComs、呈現的範本欄位等)不會保留在歷史記錄中。僅保留任務執行個體屬性,包括日誌。
外部觸發¶
請注意,也可以透過 CLI 手動建立 DAG 執行個體。只需執行命令 -
airflow dags trigger --exec-date logical_date run_id
在排程器外部建立的 DAG 執行個體會與觸發器的時間戳記相關聯,並與排程的 DAG 執行個體一起顯示在 UI 中。可以使用 -e
引數指定在 DAG 內部傳遞的邏輯日期。預設值是 UTC 時區的目前日期。
此外,您也可以使用 Web UI 手動觸發 DAG 執行個體(標籤 DAGs -> 欄位 連結 -> 按鈕 觸發 Dag)
觸發 DAG 時傳遞參數¶
從 CLI、REST API 或 UI 觸發 DAG 時,可以將 DAG 執行個體的組態作為 JSON blob 傳遞。
參數化 DAG 的範例
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
注意:dag_run.conf 中的參數只能在運算子的範本欄位中使用。
使用 CLI¶
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
注意事項¶
可以透過 UI 將任務執行個體標記為失敗。這可以用於停止正在執行的任務執行個體。
可以透過 UI 將任務執行個體標記為成功。這主要是為了修正誤判,或例如,當修復程式已在 Airflow 外部應用時。