DAG 執行

DAG 執行個體是一個物件,代表 DAG 在時間上的實例化。每次執行 DAG 時,都會建立一個 DAG 執行個體,並執行其中的所有任務。DAG 執行個體的狀態取決於任務的狀態。每個 DAG 執行個體都是彼此獨立執行的,這表示您可以同時執行 DAG 的多個執行個體。

DAG 執行狀態

DAG 執行個體狀態在 DAG 執行完成時確定。DAG 的執行取決於其包含的任務及其依賴關係。當所有任務都處於終端狀態之一(即,如果不可能轉換到另一種狀態),例如 successfailedskipped 時,狀態會分配給 DAG 執行個體。DAG 執行個體的狀態是根據所謂的「葉節點」或簡稱「葉」來分配的。葉節點是沒有子節點的任務。

DAG 執行個體有兩種可能的終端狀態

  • success 如果所有葉節點的狀態都是 successskipped

  • failed 如果任何葉節點的狀態是 failedupstream_failed

注意

如果您的某些任務定義了特定的觸發規則,請小心。這些可能會導致一些意想不到的行為,例如,如果您有一個葉任務的觸發規則為 “all_done”,則無論其餘任務的狀態如何,它都將被執行,如果它成功,那麼即使中間發生了錯誤,整個 DAG 執行個體也將被標記為 success

在 Airflow 2.7 中新增

在 UI 儀表板的「執行中」標籤中,可以顯示目前有 DAG 執行個體正在執行的 DAG。同樣地,最新 DAG 執行個體標記為失敗的 DAG 可以在「失敗」標籤中找到。

資料間隔

Airflow 中的每個 DAG 執行個體都有一個指定的「資料間隔」,代表它運作的時間範圍。例如,對於以 @daily 排程的 DAG,其每個資料間隔將在每天午夜 (00:00) 開始,並在午夜 (24:00) 結束。

DAG 執行個體通常在其相關的資料間隔結束後排程,以確保執行個體能夠收集時間段內的所有資料。換句話說,涵蓋 2020-01-01 資料期間的執行個體通常要等到 2020-01-01 結束後,即 2020-01-02 00:00:00 之後才開始執行。

Airflow 中的所有日期都以某種方式與資料間隔概念相關聯。例如,DAG 執行個體的「邏輯日期」(在 Airflow 2.2 之前的版本中也稱為 execution_date)表示資料間隔的開始,而不是 DAG 實際執行的時間。

同樣地,由於 DAG 及其任務的 start_date 引數指向相同的邏輯日期,因此它標記了 DAG 第一個資料間隔的開始,而不是 DAG 中的任務何時開始執行。換句話說,DAG 執行個體只會在 start_date 之後的一個間隔排程。

提示

如果 cron 表達式或 timedelta 物件不足以表達 DAG 的排程、邏輯日期或資料間隔,請參閱Timetables。有關 logical date 的更多資訊,請參閱執行 DAGexecution_date 是什麼意思?

重新執行 DAG

在某些情況下,您可能想要再次執行 DAG。其中一種情況是排程的 DAG 執行個體失敗時。

補追執行

使用 start_date、可能還有 end_date 和非資料集排程定義的 Airflow DAG,定義了一系列間隔,排程器將這些間隔轉換為個別的 DAG 執行個體並執行。預設情況下,排程器將為自上次資料間隔以來(或已清除)未執行的任何資料間隔啟動 DAG 執行個體。此概念稱為補追執行。

如果您的 DAG 未編寫為處理其補追執行(即,不限於間隔,而是例如 Now),那麼您將需要關閉補追執行。這可以透過在 DAG 中設定 catchup=False 或在組態檔中設定 catchup_by_default=False 來完成。關閉後,排程器僅為最新間隔建立 DAG 執行個體。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
    catchup=False,
)

在上面的範例中,如果排程器守護進程在 2016-01-02 早上 6 點(或從命令列)選取 DAG,則將建立一個 DAG 執行個體,其資料介於 2016-01-01 和 2016-01-02 之間,而下一個 DAG 執行個體將在 2016-01-03 早上午夜過後建立,其資料間隔介於 2016-01-02 和 2016-01-03 之間。

請注意,使用 datetime.timedelta 物件作為排程可能會導致不同的行為。在這種情況下,建立的單個 DAG 執行個體將涵蓋 2016-01-01 06:00 到 2016-01-02 06:00 之間的資料(一個排程間隔現在結束)。如需 cron 和基於 delta 的排程之間差異的更詳細描述,請參閱時間表比較

如果 dag.catchup 值改為 True,則排程器將為 2015-12-01 到 2016-01-02 之間(但不包括 2016-01-02,因為該間隔尚未完成)的每個已完成間隔建立 DAG 執行個體,並且排程器將依序執行它們。

當您關閉 DAG 一段時間然後重新啟用它時,也會觸發補追執行。

此行為對於可以輕鬆分割成期間的原子資料集非常有用。如果您的 DAG 在內部執行補追執行,則關閉補追執行非常有用。

回填

在某些情況下,您可能想要針對指定的歷史期間執行 DAG,例如,使用 start_date 2019-11-21 建立資料填寫 DAG,但另一位使用者需要一個月前的輸出資料,即 2019-10-21。此過程稱為回填。

即使在停用補追執行的情況下,您也可能想要回填資料。這可以透過 CLI 完成。執行以下命令

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

backfill 命令將針對開始日期和結束日期內的所有間隔,重新執行 dag_id 的所有執行個體。

重新執行任務

某些任務可能會在排程的執行期間失敗。在您透過日誌找出錯誤並修正錯誤後,您可以透過清除排程日期的任務來重新執行任務。清除任務執行個體會建立任務執行個體的記錄。目前任務執行個體的 try_number 會遞增,max_tries 設定為 0,狀態設定為 None,這會導致任務重新執行。

在樹狀或圖形檢視中點擊失敗的任務,然後點擊清除。執行器將重新執行它。

您可以選擇多個選項來重新執行 -

  • 過去 - DAG 最新資料間隔之前的執行中任務的所有執行個體

  • 未來 - DAG 最新資料間隔之後的執行中任務的所有執行個體

  • 上游 - 目前 DAG 中的上游任務

  • 下游 - 目前 DAG 中的下游任務

  • 遞迴 - 子 DAG 和父 DAG 中的所有任務

  • 失敗 - 僅 DAG 最新執行中的失敗任務

您也可以使用以下命令透過 CLI 清除任務

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

對於指定的 dag_id 和時間間隔,此命令會清除與正則表示式匹配的所有任務執行個體。如需更多選項,您可以查看 clear 命令的說明

airflow tasks clear --help

任務執行個體歷史

當任務執行個體重試或被清除時,任務執行個體歷史記錄會被保留。您可以透過點擊網格檢視中的任務執行個體來查看此歷史記錄。

../_images/task_instance_history.png

注意

上面顯示的嘗試選擇器僅適用於已重試或清除的任務。

歷史記錄顯示特定執行結束時任務執行個體屬性的值。在日誌頁面上,您還可以查看每個任務執行個體嘗試的日誌。這對於偵錯很有用。

../_images/task_instance_history_log.png

注意

相關的任務執行個體物件(例如 XComs、呈現的範本欄位等)不會保留在歷史記錄中。僅保留任務執行個體屬性,包括日誌。

外部觸發

請注意,也可以透過 CLI 手動建立 DAG 執行個體。只需執行命令 -

airflow dags trigger --exec-date logical_date run_id

在排程器外部建立的 DAG 執行個體會與觸發器的時間戳記相關聯,並與排程的 DAG 執行個體一起顯示在 UI 中。可以使用 -e 引數指定在 DAG 內部傳遞的邏輯日期。預設值是 UTC 時區的目前日期。

此外,您也可以使用 Web UI 手動觸發 DAG 執行個體(標籤 DAGs -> 欄位 連結 -> 按鈕 觸發 Dag

觸發 DAG 時傳遞參數

從 CLI、REST API 或 UI 觸發 DAG 時,可以將 DAG 執行個體的組態作為 JSON blob 傳遞。

參數化 DAG 的範例

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意:dag_run.conf 中的參數只能在運算子的範本欄位中使用。

使用 CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

使用 UI

在 UI 中,觸發 DAG 的參數可以透過 params 文件中描述的 參數 定義更好地表示。透過定義的參數,會呈現用於值輸入的適當表單。

如果 DAG 未定義參數,則通常會跳過表單,透過組態選項 show_trigger_form_if_no_params,可以強制顯示僅字典輸入的傳統表單以傳遞組態選項。

../_images/example_passing_conf.png

請考慮將此類用法轉換為 params,因為這是一種更方便的方式,並且還允許驗證使用者輸入。

注意事項

  • 可以透過 UI 將任務執行個體標記為失敗。這可以用於停止正在執行的任務執行個體。

  • 可以透過 UI 將任務執行個體標記為成功。這主要是為了修正誤判,或例如,當修復程式已在 Airflow 外部應用時。

這個條目有幫助嗎?