DAGs

DAG (有向無環圖) 是 Airflow 的核心概念,它將 任務 集合在一起,並透過依賴關係和關聯性組織起來,說明它們應該如何執行。

這是一個基本的 DAG 範例

../_images/basic-dag.png

它定義了四個任務 - A、B、C 和 D - 並規定了它們必須執行的順序,以及哪些任務依賴於其他任務。它也會說明 DAG 的執行頻率 - 可能是「從明天開始每 5 分鐘執行一次」,或是「自 2020 年 1 月 1 日起每天執行一次」。

DAG 本身並不關心任務內部發生什麼事;它僅關注如何執行它們 - 執行的順序、重試的次數、是否有逾時等等。

宣告 DAG

有三種宣告 DAG 的方式 - 你可以使用 with 語句 (上下文管理器),它會將其中包含的任何內容隱式地添加到 DAG 中

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")

或者,你可以使用標準的建構子,將 DAG 傳遞給你使用的任何運算子中

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

或者,你可以使用 @dag 裝飾器來 將函數轉換為 DAG 生成器

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

DAG 沒有 任務 就毫無意義,而這些任務通常以 Operators (運算子)Sensors (感測器)TaskFlow 的形式出現。

任務依賴關係

一個任務/運算子通常不會單獨存在;它依賴於其他任務 (位於它上游的任務),並且其他任務也依賴於它 (位於它下游的任務)。宣告任務之間的這些依賴關係構成了 DAG 結構 (有向無環圖)。

有兩種主要方式來宣告個別的任務依賴關係。建議的方式是使用 >><< 運算子

first_task >> [second_task, third_task]
third_task << fourth_task

或者,你也可以使用更明確的 set_upstreamset_downstream 方法

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

還有一些宣告更複雜依賴關係的捷徑。如果你想讓一個任務列表依賴於另一個任務列表,你不能使用上述任何一種方法,所以你需要使用 cross_downstream

from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

如果你想將依賴關係鏈接在一起,你可以使用 chain

from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

Chain 也可以為相同大小的列表執行成對依賴關係 (這與 cross_downstream 建立的交叉依賴關係不同!)

from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

載入 DAG

Airflow 從 Python 原始碼檔案載入 DAG,它會在配置的 DAG_FOLDER 內尋找這些檔案。它會取得每個檔案,執行它,然後從該檔案載入任何 DAG 物件。

這表示你可以在每個 Python 檔案中定義多個 DAG,甚至可以使用導入將一個非常複雜的 DAG 分散到多個 Python 檔案中。

但請注意,當 Airflow 從 Python 檔案載入 DAG 時,它只會提取頂層的任何物件,這些物件是 DAG 實例。例如,以下 DAG 檔案

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

雖然當存取檔案時,兩個 DAG 建構子都會被呼叫,但只有 dag_1 位於頂層 (在 globals() 中),因此只有它會被添加到 Airflow 中。dag_2 不會被載入。

注意

DAG_FOLDER 內搜尋 DAG 時,Airflow 僅考慮包含字串 airflowdag (不區分大小寫) 的 Python 檔案,作為一種最佳化方式。

若要改為考慮所有 Python 檔案,請停用 DAG_DISCOVERY_SAFE_MODE 配置標誌。

你也可以在 DAG_FOLDER 或其任何子資料夾中提供 .airflowignore 檔案,其中描述了載入器要忽略的檔案模式。它涵蓋了它所在的目錄以及其下的所有子資料夾。請參閱下方的 .airflowignore 以取得檔案語法的詳細資訊。

如果 .airflowignore 無法滿足你的需求,並且你想要更彈性的方式來控制是否需要讓 airflow 解析 Python 檔案,你可以在設定檔中設定 might_contain_dag_callable 來插入你的可呼叫物件。請注意,這個可呼叫物件將取代預設的 Airflow 啟發式方法,即檢查 Python 檔案中是否存在字串 airflowdag (不區分大小寫)。

def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
    # Your logic to check if there are DAGs defined in the file_path
    # Return True if the file_path needs to be parsed, otherwise False

執行 DAG

DAG 將以兩種方式之一執行

  • 當它們被觸發時,無論是手動或透過 API

  • 在定義的排程上,排程在 DAG 中定義

DAG 並不需要排程,但定義排程非常常見。你可以透過 schedule 參數來定義它,如下所示

with DAG("my_daily_dag", schedule="@daily"):
    ...

schedule 參數有各種有效值

with DAG("my_daily_dag", schedule="0 0 * * *"):
    ...

with DAG("my_one_time_dag", schedule="@once"):
    ...

with DAG("my_continuous_dag", schedule="@continuous"):
    ...

提示

如需有關不同排程類型的更多資訊,請參閱 撰寫與排程

每次執行 DAG 時,你都在建立該 DAG 的新實例,Airflow 將其稱為 DAG Run (DAG 執行)。DAG Run 可以為同一個 DAG 平行執行,並且每個 DAG Run 都有一個定義的資料間隔,用於識別任務應運作的資料期間。

舉例說明為何這很有用,假設你正在編寫一個 DAG 來處理每日的實驗資料集。它已被重寫,並且你想要在過去 3 個月的資料上執行它 - 沒問題,因為 Airflow 可以回填 DAG,並為過去 3 個月中的每一天執行 DAG 的副本,全部一次完成。

這些 DAG Run 都將在同一個實際日期啟動,但每個 DAG Run 都將有一個資料間隔,涵蓋該 3 個月期間的單一天,而該資料間隔是 DAG 內部的所有任務、運算子和感測器在執行時所關注的。

就像 DAG 每次執行時都會實例化為 DAG Run 一樣,DAG 內指定的任務也會與之一起實例化為 任務實例

DAG 執行將有一個開始日期 (start date),表示它何時開始,以及一個結束日期 (end date),表示它何時結束。這個期間描述了 DAG 實際「執行」的時間。除了 DAG 執行的開始和結束日期之外,還有另一個日期稱為邏輯日期 (正式名稱為執行日期),它描述了 DAG 執行排程或觸發的預期時間。之所以稱為邏輯日期,是因為它具有抽象的性質,具有多重含義,具體取決於 DAG 執行本身的上下文。

例如,如果 DAG 執行是由使用者手動觸發的,則其邏輯日期將是 DAG 執行被觸發的日期和時間,並且該值應等於 DAG 執行的開始日期。但是,當 DAG 正在自動排程時,並設定了特定的排程間隔,則邏輯日期將指示它標記資料間隔開始的時間,而 DAG 執行的開始日期將是邏輯日期 + 排程間隔。

提示

如需有關 logical date 的更多資訊,請參閱 資料間隔execution_date 代表什麼意思?

DAG 指派

請注意,每個運算子/任務都必須指派給 DAG 才能執行。Airflow 有幾種方法可以計算 DAG,而無需你明確傳遞它

  • 如果你在 with DAG 區塊內宣告你的運算子

  • 如果你在 @dag 裝飾器內宣告你的運算子

  • 如果你將你的運算子放在具有 DAG 的運算子的上游或下游

否則,你必須使用 dag= 將其傳遞給每個運算子。

預設參數

通常,DAG 內部的許多運算子需要相同的預設參數集 (例如它們的 retries)。與其為每個運算子個別指定,不如在建立 DAG 時將 default_args 傳遞給 DAG,它會自動將它們應用於與其關聯的任何運算子

import pendulum

with DAG(
    dag_id="my_dag",
    start_date=pendulum.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 2},
):
    op = BashOperator(task_id="hello_world", bash_command="Hello World!")
    print(op.retries)  # 2

DAG 裝飾器

2.0 版本新增功能。

除了使用上下文管理器或 DAG() 建構子宣告單個 DAG 的更傳統方式之外,你還可以裝飾一個帶有 @dag 的函數,以將其轉換為 DAG 生成器函數

airflow/example_dags/example_dag_decorator.py[原始碼]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_dag_decorator(email: str = "example@example.com"):
    """
    DAG to send server IP to email.

    :param email: Email to send IP to. Defaults to example@example.com.
    """
    get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")

    @task(multiple_outputs=True)
    def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
        external_ip = raw_json["origin"]
        return {
            "subject": f"Server connected from {external_ip}",
            "body": f"Seems like today your server executing Airflow is connected from IP {external_ip}<br>",
        }

    email_info = prepare_email(get_ip.output)

    EmailOperator(
        task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
    )


example_dag = example_dag_decorator()

除了是一種乾淨地建立 DAG 的新方法之外,裝飾器還會將你在函數中擁有的任何參數設定為 DAG 參數,讓你可以在 觸發 DAG 時設定這些參數。然後,你可以從 Python 程式碼或從 Jinja 範本 內的 {{ context.params }} 存取這些參數。

注意

Airflow 只會載入 出現在 DAG 檔案頂層 的 DAG。這表示你不能只宣告一個帶有 @dag 的函數 - 你還必須在你的 DAG 檔案中至少呼叫它一次並將其指派給頂層物件,如你在上面的範例中所見。

控制流程

預設情況下,DAG 只會在它依賴的所有任務都成功時才執行任務。但是,有幾種修改此行為的方式

  • 分支 - 根據條件選擇要移動到哪個任務

  • 觸發規則 - 設定 DAG 執行任務的條件

  • 設定與拆解 - 定義設定和拆解關係

  • 僅限最新 - 一種特殊的分支形式,僅在針對目前執行的 DAG 上執行

  • 依賴於過去 - 任務可以依賴於它們自己從先前的執行

分支

你可以使用分支來告知 DAG 不要執行所有依賴任務,而是選擇一條或多條路徑來往下走。這就是 @task.branch 裝飾器的用武之地。

@task.branch 裝飾器很像 @task,只是它期望裝飾的函數返回一個任務 ID (或 ID 列表)。將會追蹤指定的任務,而所有其他路徑都會被略過。它也可以返回 None 以略過所有下游任務。

Python 函數返回的 task_id 必須直接參照 @task.branch 裝飾任務的下游任務。

注意

當一個任務既是分支運算子的下游任務,是一個或多個選定任務的下游任務時,它將不會被略過

../_images/branch_note.png

分支任務的路徑為 branch_ajoinbranch_b。由於 joinbranch_a 的下游任務,因此即使它沒有作為分支決策的一部分返回,它仍將執行。

@task.branch 也可以與 XComs 一起使用,允許分支上下文根據上游任務動態決定要追蹤哪個分支。例如

@task.branch(task_id="branch_task")
def branch_func(ti=None):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
    else:
        return None


start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    do_xcom_push=True,
    dag=dag,
)

branch_op = branch_func()

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

如果你希望使用分支功能實作你自己的運算子,你可以繼承自 BaseBranchOperator,它的行為與 @task.branch 裝飾器類似,但期望你提供 choose_branch 方法的實作。

注意

建議使用 @task.branch 裝飾器,而不是直接在 DAG 中實例化 BranchPythonOperator。後者通常只應被子類化以實作自訂運算子。

@task.branch 的可呼叫物件一樣,此方法可以返回下游任務的 ID,或任務 ID 列表,這些任務將被執行,而所有其他任務將被略過。它也可以返回 None 以略過所有下游任務

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run an extra branch on the first day of the month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None

與常規 Python 程式碼的 @task.branch 裝飾器類似,也有使用虛擬環境的分支裝飾器,稱為 @task.branch_virtualenv 或外部 Python 程式碼,稱為 @task.branch_external_python

僅限最新

Airflow 的 DAG Run 通常是針對與目前日期不同的日期執行的 - 例如,為上個月的每一天執行一個 DAG 副本,以回填一些資料。

但是,在某些情況下,你希望讓 DAG 的某些 (或全部) 部分針對先前的日期執行;在這種情況下,你可以使用 LatestOnlyOperator

這個特殊的運算子會略過其自身下游的所有任務,如果你不是在「最新」的 DAG 執行上 (如果現在的實際時間介於其 execution_time 和下一個排程的 execution_time 之間,並且它不是外部觸發的執行)。

這是一個範例

airflow/example_dags/example_latest_only_with_trigger.py[原始碼]

import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="latest_only_with_trigger",
    schedule=datetime.timedelta(hours=4),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

在這個 DAG 的情況下

  • task1 直接位於 latest_only 的下游,並且除了最新的執行之外,所有執行都會被略過。

  • task2 完全獨立於 latest_only,並且會在所有排程期間執行

  • task3 位於 task1task2 的下游,並且由於預設的 觸發規則all_success,因此會收到來自 task1 的級聯略過。

  • task4 位於 task1task2 的下游,但它不會被略過,因為它的 trigger_rule 設定為 all_done

../_images/latest_only_with_trigger.png

依賴於過去

你也可以說,只有當前一個 DAG Run 中任務的先前執行成功時,任務才能執行。若要使用此功能,你只需要將任務上的 depends_on_past 參數設定為 True

請注意,如果你在 DAG 生命週期的最開始執行 DAG - 具體來說,是它的首次自動化執行 - 則任務仍然會執行,因為沒有先前的執行可以依賴。

觸發規則

預設情況下,Airflow 會等待任務的所有上游 (直接父項) 任務 成功,然後才會執行該任務。

但是,這只是預設行為,你可以使用任務的 trigger_rule 參數來控制它。trigger_rule 的選項如下

  • all_success (預設): 所有上游任務都已成功

  • all_failed: 所有上游任務都處於 failedupstream_failed 狀態

  • all_done: 所有上游任務都已完成執行

  • all_skipped: 所有上游任務都處於 skipped 狀態

  • one_failed: 至少有一個上游任務失敗 (不等待所有上游任務完成)

  • one_success: 至少有一個上游任務成功 (不等待所有上游任務完成)

  • one_done: 至少有一個上游任務成功或失敗

  • none_failed: 所有上游任務都未 failedupstream_failed - 也就是說,所有上游任務都已成功或被略過

  • none_failed_min_one_success: 所有上游任務都未 failedupstream_failed,並且至少有一個上游任務成功。

  • none_skipped: 沒有上游任務處於 skipped 狀態 - 也就是說,所有上游任務都處於 successfailedupstream_failed 狀態

  • always: 完全沒有依賴關係,隨時執行此任務

如果你願意,你也可以將此與 依賴於過去 功能結合使用。

注意

務必注意觸發規則和略過任務之間的交互作用,尤其是作為分支操作一部分被略過的任務。你幾乎永遠不希望在分支操作的下游使用 all_success 或 all_failed

略過的任務將級聯通過觸發規則 all_successall_failed,並導致它們也被略過。考慮以下 DAG

# dags/branch_without_trigger.py
import pendulum

from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="branch_without_trigger",
    schedule="@once",
    start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)

run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)


@task.branch(task_id="branching")
def do_branching():
    return "branch_a"


branching = do_branching()

branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)

branch_false = EmptyOperator(task_id="branch_false", dag=dag)

join = EmptyOperator(task_id="join", dag=dag)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join

join 位於 follow_branch_abranch_false 的下游。join 任務將顯示為已略過,因為其 trigger_rule 預設設定為 all_success,並且分支操作導致的略過級聯向下略過標記為 all_success 的任務。

../_images/branch_without_trigger.png

透過在 join 任務中將 trigger_rule 設定為 none_failed_min_one_success,我們可以改為獲得預期的行為

../_images/branch_with_trigger.png

設定與拆解

在資料工作流程中,常見的做法是建立資源 (例如計算資源)、使用它來完成一些工作,然後將其拆解。Airflow 提供設定和拆解任務來支援這種需求。

請參閱主要文章 設定與拆解,以取得有關如何使用此功能的詳細資訊。

動態 DAG

由於 DAG 是由 Python 程式碼定義的,因此無需使其純粹是宣告式的;你可以自由使用迴圈、函數等等來定義你的 DAG。

例如,這是一個使用 for 迴圈來定義一些任務的 DAG

 with DAG("loop_example", ...):
     first = EmptyOperator(task_id="first")
     last = EmptyOperator(task_id="last")

     options = ["branch_a", "branch_b", "branch_c", "branch_d"]
     for option in options:
         t = EmptyOperator(task_id=option)
         first >> t >> last

一般而言,我們建議你嘗試保持 DAG 任務的拓撲 (佈局) 相對穩定;動態 DAG 通常更適合用於動態載入配置選項或變更運算子選項。

DAG 可視化

如果你想查看 DAG 的視覺化表示,你有兩個選項

  • 你可以載入 Airflow UI,導航到你的 DAG,然後選擇「Graph (圖形)」

  • 你可以執行 airflow dags show,它會將其呈現為圖像檔案

我們通常建議你使用「Graph (圖形)」視圖,因為它還會顯示你選擇的任何 DAG 執行中所有 任務實例 的狀態。

當然,隨著你開發 DAG,它們將變得越來越複雜,因此我們提供了一些方法來修改這些 DAG 視圖,使其更易於理解。

TaskGroups

TaskGroup 可用於在「Graph (圖形)」視圖中將任務組織成階層式群組。它對於建立重複模式和減少視覺混亂很有用。

SubDAGs 不同,TaskGroups 純粹是 UI 介面上的分組概念。TaskGroups 中的任務與原始 DAG 位於相同的 DAG 上,並遵循所有 DAG 設定和 Pool 配置。

../_images/task_group.gif

依賴關係可以使用 >><< 運算符應用於 TaskGroup 中的所有任務。例如,以下程式碼將 task1task2 放入 TaskGroup group1 中,然後將這兩個任務都放在 task3 的上游。

 from airflow.decorators import task_group


 @task_group()
 def group1():
     task1 = EmptyOperator(task_id="task1")
     task2 = EmptyOperator(task_id="task2")


 task3 = EmptyOperator(task_id="task3")

 group1() >> task3

TaskGroup 也支援像 DAG 一樣的 default_args,它會覆寫 DAG 層級的 default_args

import datetime

from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dag1",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1},
):

    @task_group(default_args={"retries": 3})
    def group1():
        """This docstring will become the tooltip for the TaskGroup."""
        task1 = EmptyOperator(task_id="task1")
        task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
        print(task1.retries)  # 3
        print(task2.retries)  # 2

如果您想查看 TaskGroup 更進階的用法,可以查看 Airflow 隨附的 example_task_group_decorator.py 範例 DAG。

注意

預設情況下,子任務/TaskGroup 的 ID 會加上其父 TaskGroup 的 group_id 作為前綴。這有助於確保整個 DAG 中 group_id 和 task_id 的唯一性。

若要停用前綴,請在建立 TaskGroup 時傳遞 prefix_group_id=False,但請注意,您現在將負責確保每個任務和群組都有自己唯一的 ID。

注意

當使用 @task_group 裝飾器時,裝飾函數的 docstring 將用作 TaskGroup 在 UI 中的工具提示,除非明確提供 tooltip 值。

邊緣標籤

除了將任務分組外,您還可以在圖形檢視中標記不同任務之間的 *依賴邊緣* - 這對於 DAG 的分支區域特別有用,因此您可以標記某些分支可能運行的條件。

若要新增標籤,您可以直接將它們與 >><< 運算符內聯使用。

from airflow.utils.edgemodifier import Label

my_task >> Label("When empty") >> other_task

或者,您可以將 Label 物件傳遞給 set_upstream/set_downstream

from airflow.utils.edgemodifier import Label

my_task.set_downstream(other_task, Label("When empty"))

這是一個範例 DAG,說明如何標記不同的分支。

../_images/edge_label_example.png

airflow/example_dags/example_branch_labels.py[原始碼]


with DAG(
    "example_branch_labels",
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    ingest = EmptyOperator(task_id="ingest")
    analyse = EmptyOperator(task_id="analyze")
    check = EmptyOperator(task_id="check_integrity")
    describe = EmptyOperator(task_id="describe_integrity")
    error = EmptyOperator(task_id="email_error")
    save = EmptyOperator(task_id="save")
    report = EmptyOperator(task_id="report")

    ingest >> analyse >> check
    check >> Label("No errors") >> save >> report
    check >> Label("Errors found") >> describe >> error >> report

DAG 和任務文件

可以將文件或註解新增到您的 DAG 和任務物件中,這些物件在 Web 介面中可見(DAG 的「圖形」和「樹狀」檢視,任務的「任務實例詳細資訊」)。

有一組特殊的任務屬性,如果定義了,會呈現為豐富內容。

屬性

呈現為

doc

等寬字型

doc_json

json

doc_yaml

yaml

doc_md

markdown

doc_rst

reStructuredText

請注意,對於 DAG,只有 doc_md 屬性會被解析。對於 DAG,它可以包含字串或 Markdown 檔案的參考。Markdown 檔案透過以 .md 結尾的字串來識別。如果提供相對路徑,它將從 Airflow Scheduler 或 DAG 解析器啟動時的相對路徑載入。如果 Markdown 檔案不存在,則傳遞的檔案名稱將用作文字,不會顯示例外狀況。請注意,Markdown 檔案是在 DAG 解析期間載入的,Markdown 內容的變更需要一個 DAG 解析週期才能顯示變更。

如果您的任務是從配置文件動態建立的,這尤其有用,因為它允許您在 Airflow 中公開導致相關任務的配置。

"""
### My great DAG
"""
import pendulum

dag = DAG(
    "my_dag",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
)
dag.doc_md = __doc__

t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""

SubDAGs

注意

SubDAG 已被棄用,因此 TaskGroup 始終是首選。

有時,您會發現您經常將完全相同的任務集添加到每個 DAG,或者您想將大量任務分組到一個單一的邏輯單元中。這就是 SubDAGs 的用途。

例如,這是一個 DAG,在兩個區段中有很多平行任務。

../_images/subdag_before.png

我們可以將所有平行的 task-* 運算符組合到一個單一的 SubDAG 中,以便產生的 DAG 類似於以下內容。

../_images/subdag_after.png

請注意,SubDAG 運算符應包含一個工廠方法,該方法返回一個 DAG 物件。這將防止 SubDAG 在主 UI 中被視為單獨的 DAG - 請記住,如果 Airflow 在 Python 檔案的頂層看到 DAG,它會 將其載入為自己的 DAG。例如

airflow/example_dags/subdags/subdag.py[原始碼]

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator


def subdag(parent_dag_name, child_dag_name, args) -> DAG:
    """
    Generate a DAG to be used as a subdag.

    :param str parent_dag_name: Id of the parent DAG
    :param str child_dag_name: Id of the child DAG
    :param dict args: Default arguments to provide to the subdag
    :return: DAG to use as a subdag
    """
    dag_subdag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        schedule="@daily",
    )

    for i in range(5):
        EmptyOperator(
            task_id=f"{child_dag_name}-task-{i + 1}",
            default_args=args,
            dag=dag_subdag,
        )

    return dag_subdag


然後可以在您的主 DAG 檔案中引用此 SubDAG。

airflow/example_dags/example_subdag_operator.py[原始碼]

import datetime

from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
    dag_id=DAG_NAME,
    default_args={"retries": 2},
    start_date=datetime.datetime(2022, 1, 1),
    schedule="@once",
    tags=["example"],
) as dag:
    start = EmptyOperator(
        task_id="start",
    )

    section_1 = SubDagOperator(
        task_id="section-1",
        subdag=subdag(DAG_NAME, "section-1", dag.default_args),
    )

    some_other_task = EmptyOperator(
        task_id="some-other-task",
    )

    section_2 = SubDagOperator(
        task_id="section-2",
        subdag=subdag(DAG_NAME, "section-2", dag.default_args),
    )

    end = EmptyOperator(
        task_id="end",
    )

    start >> section_1 >> some_other_task >> section_2 >> end

您可以從主 DAG 的圖形檢視放大 SubDagOperator,以顯示 SubDAG 中包含的任務。

../_images/subdag_zoom.png

使用 SubDAGs 的其他一些提示

  • 按照慣例,SubDAG 的 dag_id 應以其父 DAG 的名稱和一個點(parent.child)作為前綴。

  • 您應該透過將引數傳遞給 SubDAG 運算符(如上所示)來在主 DAG 和 SubDAG 之間共享引數。

  • SubDAGs 必須具有排程並啟用。如果 SubDAG 的排程設定為 None@once,則 SubDAG 將在沒有執行任何操作的情況下成功。

  • 清除 SubDagOperator 也會清除其中任務的狀態。

  • SubDagOperator 標記為成功不會影響其中任務的狀態。

  • 避免在 SubDAG 中的任務中使用 Depends On Past,因為這可能會造成混淆。

  • 您可以為 SubDAG 指定執行器。如果您想在進程內運行 SubDAG 並有效地將其平行處理限制為一個,則通常使用 SequentialExecutor。使用 LocalExecutor 可能會有問題,因為它可能會過度訂閱您的工作人員,在單個插槽中運行多個任務。

有關示範,請參閱 airflow/example_dags

注意

SubDagOperator *不遵守* 平行處理,因此 SubdagOperator 可能會消耗超出您可能設定的任何限制的資源。

TaskGroups 與 SubDAGs 的比較

SubDAGs 雖然與 TaskGroups 的用途相似,但由於其實現方式,會引入效能和功能問題。

  • SubDagOperator 會啟動 BackfillJob,這會忽略現有的平行處理配置,可能會過度訂閱工作人員環境。

  • SubDAGs 有自己的 DAG 屬性。當 SubDAG DAG 屬性與其父 DAG 不一致時,可能會發生意外行為。

  • 因為 SubDAGs 作為成熟的 DAG 存在,所以無法在一個視圖中看到「完整」的 DAG。

  • SubDAGs 引入了各種邊緣案例和注意事項。這可能會擾亂使用者體驗和期望。

另一方面,TaskGroups 是一個更好的選擇,因為它純粹是 UI 介面上的分組概念。TaskGroup 內的所有任務的行為仍然與 TaskGroup 外的任何其他任務相同。

您可以看到這兩個結構之間的核心差異。

TaskGroup

SubDAG

作為同一個 DAG 的一部分重複模式

作為單獨的 DAG 重複模式

DAG 的一組視圖和統計資訊

父 DAG 和子 DAG 之間單獨的視圖和統計資訊

一組 DAG 配置

多組 DAG 配置

透過現有的 SchedulerJob 遵循平行處理配置

由於新產生的 BackfillJob 而不遵循平行處理配置

使用上下文管理器進行簡單的結構宣告

具有命名限制的複雜 DAG 工廠

打包 DAGs

雖然較簡單的 DAG 通常只在單個 Python 檔案中,但更複雜的 DAG 可能會分佈在多個檔案中,並且具有應與它們一起發布的依賴項(「vendored」),這並不少見。

您可以將所有內容都放在 DAG_FOLDER 中,採用標準的檔案系統佈局,或者您可以將 DAG 及其所有 Python 檔案打包為單個 zip 檔案。例如,您可以將兩個 DAG 以及它們需要的依賴項作為一個 zip 檔案發布,內容如下:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

請注意,打包的 DAG 帶有一些注意事項

  • 如果您啟用 Pickle 用於序列化,則無法使用它們。

  • 它們不能包含編譯後的函式庫(例如 libz.so),只能包含純 Python。

  • 它們將被插入到 Python 的 sys.path 中,並且可以被 Airflow 進程中的任何其他程式碼導入,因此請確保包名稱不會與系統上已安裝的其他包衝突。

一般來說,如果您有一組複雜的編譯依賴項和模組,您最好使用 Python virtualenv 系統,並使用 pip 在目標系統上安裝必要的包。

.airflowignore

.airflowignore 檔案指定 Airflow 應有意忽略的 DAG_FOLDERPLUGINS_FOLDER 中的目錄或檔案。Airflow 支援檔案中模式的兩種語法風格,由 DAG_IGNORE_FILE_SYNTAX 配置參數指定(在 Airflow 2.3 中新增):regexpglob

注意

預設的 DAG_IGNORE_FILE_SYNTAXregexp,以確保向後相容性。

對於 regexp 模式語法(預設),.airflowignore 中的每一行都指定一個正則表達式模式,名稱(不是 DAG id)與任何模式匹配的目錄或檔案將被忽略(在底層,使用 Pattern.search() 來匹配模式)。使用 # 字元表示註解;以 # 開頭的行上的所有字元都將被忽略。

與 Airflow 中的大多數 regexp 匹配一樣,regexp 引擎是 re2,它明確不支持許多高級功能,請查看其 文件 以獲取更多資訊。

使用 glob 語法,模式的工作方式就像 .gitignore 檔案中的模式一樣。

  • * 字元將匹配任意數量的字元,除了 / 之外。

  • ? 字元將匹配任何單個字元,除了 / 之外。

  • 範圍表示法,例如 [a-zA-Z],可用於匹配範圍內的一個字元。

  • 可以透過以 ! 作為前綴來否定模式。模式按順序評估,因此否定可以覆蓋同一個檔案中先前定義的模式或父目錄中定義的模式。

  • 雙星號 (**) 可用於跨目錄匹配。例如,**/__pycache__/ 將忽略每個子目錄中無限深度的 __pycache__ 目錄。

  • 如果模式的開頭或中間(或兩者都有)有 /,則模式相對於特定 .airflowignore 檔案本身的目錄層級。否則,模式也可能在 .airflowignore 層級以下的任何層級匹配。

.airflowignore 檔案應放在您的 DAG_FOLDER 中。例如,您可以使用 regexp 語法準備一個 .airflowignore 檔案,內容如下:

project_a
tenant_[\d]

或者,等效地,使用 glob 語法。

**/*project_a*
tenant_[0-9]*

然後,在您的 DAG_FOLDER 中,諸如 project_a_dag_1.pyTESTING_project_a.pytenant_1.pyproject_a/dag_1.pytenant_1/dag_1.py 之類的檔案將被忽略(如果目錄名稱與任何模式匹配,則 Airflow 根本不會掃描此目錄及其所有子資料夾。這提高了 DAG 尋找的效率)。

.airflowignore 檔案的範圍是它所在的目錄及其所有子資料夾。您還可以為 DAG_FOLDER 中的子資料夾準備 .airflowignore 檔案,它將僅適用於該子資料夾。

DAG 依賴項

在 Airflow 2.1 中新增。

雖然 DAG 中任務之間的依賴關係是透過上游和下游關係明確定義的,但 DAG 之間的依賴關係有點複雜。一般來說,一個 DAG 可以依賴另一個 DAG 有兩種方式:

另一個困難之處在於,一個 DAG 可以等待或觸發另一個 DAG 的多次運行,且資料間隔不同。「DAG 依賴項」視圖 選單 -> 瀏覽 -> DAG 依賴項 有助於視覺化 DAG 之間的依賴關係。依賴關係由排程器在 DAG 序列化期間計算,Web 伺服器使用它們來建立依賴關係圖。

依賴項偵測器是可配置的,因此您可以實作自己的邏輯,而不是 DependencyDetector 中的預設邏輯。

DAG 暫停、停用和刪除

DAG 在「未運行」時有多種狀態。DAG 可以暫停、停用,最後可以刪除 DAG 的所有元數據。

當 DAG 出現在 DAGS_FOLDER 中,且排程器將其儲存在資料庫中,但使用者選擇透過 UI 停用它時,可以透過 UI 暫停 DAG。「暫停」和「取消暫停」操作可透過 UI 和 API 獲得。排程器不會排程已暫停的 DAG,但您可以透過 UI 觸發它們以進行手動運行。在 UI 中,您可以看到已暫停的 DAG(在「已暫停」標籤中)。取消暫停的 DAG 可以在「活動」標籤中找到。當 DAG 暫停時,任何正在運行的任務都允許完成,並且所有下游任務都將置於「已排程」狀態。當 DAG 取消暫停時,任何「已排程」的任務都將根據 DAG 邏輯開始運行。沒有「已排程」任務的 DAG 將根據其排程開始運行。

可以透過從 DAGS_FOLDER 中移除 DAG 來停用 DAG(不要將其與 UI 中的「活動」標籤混淆)。當排程器解析 DAGS_FOLDER 並錯過之前看到並儲存在資料庫中的 DAG 時,它會將其設定為停用。停用的 DAG 會保留 DAG 的元數據和歷史記錄,當 DAG 重新新增到 DAGS_FOLDER 時,它將再次啟動,並且歷史記錄將可見。您無法透過 UI 或 API 啟動/停用 DAG,這只能透過從 DAGS_FOLDER 中移除檔案來完成。再次強調 - 當 DAG 被排程器停用時,DAG 歷史運行的資料不會遺失。請注意,Airflow UI 中的「活動」標籤指的是既非「已啟動」又非「未暫停」的 DAG,因此這最初可能會有點令人困惑。

您無法在 UI 中看到已停用的 DAG - 有時您可以看到歷史運行記錄,但是當您嘗試查看有關這些記錄的資訊時,您會看到 DAG 遺失的錯誤訊息。

您也可以使用 UI 或 API 從元數據資料庫中刪除 DAG 元數據,但這並不總是導致 DAG 從 UI 中消失 - 這最初可能也有點令人困惑。如果當您刪除元數據時 DAG 仍在 DAGS_FOLDER 中,則 DAG 將重新出現,因為排程器將解析資料夾,只會移除 DAG 的歷史運行資訊。

這一切都意味著,如果您想實際刪除 DAG 及其所有歷史元數據,您需要執行三個步驟:

  • 暫停 DAG

  • 透過 UI 或 API 從資料庫中刪除歷史元數據

  • DAGS_FOLDER 中刪除 DAG 檔案,並等待它變為非活動狀態。

DAG 自動暫停(實驗性功能)

DAG 也可以配置為自動暫停。有一個 Airflow 配置允許在 DAG 連續失敗 N 次後自動停用 DAG。

我們也可以從 DAG 引數中提供和覆蓋這些配置。

這篇文章對您有幫助嗎?