基本概念

本教學課程將引導您了解一些基本的 Airflow 概念、物件及其用法,同時撰寫您的第一個 DAG。

範例管線定義

以下是一個基本管線定義的範例。如果看起來很複雜,請別擔心,下面會逐行解釋。

airflow/example_dags/tutorial.py[原始碼]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

這是一個 DAG 定義檔

一件需要理解的事情(一開始可能對大家來說不是那麼直觀)是這個 Airflow Python 腳本實際上只是一個組態檔,以程式碼形式指定 DAG 的結構。這裡定義的實際任務將在與此腳本上下文不同的上下文中執行。不同的任務在不同的時間點在不同的 worker 上執行,這表示此腳本不能用於任務之間的跨通訊。請注意,為了這個目的,我們有一個更進階的功能叫做 XComs

人們有時會將 DAG 定義檔視為他們可以進行實際資料處理的地方 - 事實並非如此!腳本的目的是定義一個 DAG 物件。它需要快速評估(秒,而不是分鐘),因為排程器會定期執行它以反映任何變更。

導入模組

Airflow 管線只是一個 Python 腳本,恰好定義了一個 Airflow DAG 物件。讓我們先導入我們需要的函式庫。

airflow/example_dags/tutorial.py[原始碼]

import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

請參閱 模組管理 以了解有關 Python 和 Airflow 如何管理模組的詳細資訊。

預設參數

我們即將建立一個 DAG 和一些任務,我們可以選擇顯式地將一組參數傳遞給每個任務的建構函式(這將變得冗餘),或者(更好!)我們可以定義一個預設參數字典,我們可以在建立任務時使用。

airflow/example_dags/tutorial.py[原始碼]

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    "depends_on_past": False,
    "email": ["airflow@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'on_skipped_callback': another_function, #or list of functions
    # 'trigger_rule': 'all_success'
},

有關 BaseOperator 的參數及其作用的更多資訊,請參閱 airflow.models.baseoperator.BaseOperator 文件。

另請注意,您可以輕鬆定義不同的參數集,以服務不同的目的。一個例子是在生產和開發環境之間使用不同的設定。

實例化 DAG

我們需要一個 DAG 物件來將我們的任務巢狀放入其中。在這裡,我們傳遞一個字串,定義 dag_id,它作為您的 DAG 的唯一識別符。我們也傳遞了我們剛剛定義的預設參數字典,並為 DAG 定義了 1 天的 schedule

airflow/example_dags/tutorial.py[原始碼]

with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

運算子

運算子定義了 Airflow 要完成的工作單元。使用運算子是在 Airflow 中定義工作的經典方法。對於某些使用案例,最好使用 TaskFlow API 在 Pythonic 上下文中定義工作,如 使用 TaskFlow 中所述。目前,使用運算子有助於在我們的 DAG 程式碼中視覺化任務相依性。

所有運算子都繼承自 BaseOperator,其中包含在 Airflow 中執行工作所需的所有參數。從這裡開始,每個運算子都包含其完成的工作類型獨有的參數。一些最受歡迎的運算子是 PythonOperator、BashOperator 和 KubernetesPodOperator。

Airflow 根據您傳遞給運算子的參數完成工作。在本教學課程中,我們使用 BashOperator 執行一些 bash 腳本。

任務

若要在 DAG 中使用運算子,您必須將其例項化為任務。任務決定如何在 DAG 的上下文中執行運算子的工作。

在以下範例中,我們將 BashOperator 例項化為兩個不同的任務,以便執行兩個不同的 bash 腳本。每個例項化的第一個參數 task_id 作為任務的唯一識別符。

airflow/example_dags/tutorial.py[原始碼]

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

請注意,我們如何將運算子特定參數(bash_command)和所有運算子通用的參數(retries),從 BaseOperator 繼承而來,混合傳遞給運算子的建構函式。這比為每個建構函式呼叫傳遞每個參數更簡單。此外,請注意,在第二個任務中,我們使用 3 覆寫了 retries 參數。

任務的優先順序規則如下

  1. 顯式傳遞的參數

  2. 存在於 default_args 字典中的值

  3. 運算子的預設值(如果存在)

注意

任務必須包含或繼承參數 task_idowner,否則 Airflow 將引發例外。全新安裝的 Airflow 將為 owner 設定預設值 ‘airflow’,因此您真正需要擔心的只是確保 task_id 有值。

使用 Jinja 模板

Airflow 利用 Jinja 模板 的強大功能,並為管線作者提供一組內建參數和巨集。Airflow 也為管線作者提供鉤子,以定義他們自己的參數、巨集和模板。

本教學課程僅略微觸及您可以在 Airflow 中使用模板執行的操作,但本節的目標是讓您了解此功能的存在,讓您熟悉雙大括號,並指向最常見的模板變數:{{ ds }}(今天的「日期戳記」)。

airflow/example_dags/tutorial.py[原始碼]

templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

請注意,templated_command{% %} 區塊中包含程式碼邏輯,參考參數,例如 {{ ds }},並呼叫一個函式,如 {{ macros.ds_add(ds, 7)}}

檔案也可以傳遞給 bash_command 參數,例如 bash_command='templated_command.sh',其中檔案位置相對於包含管線檔案的目錄(在本例中為 tutorial.py)。這可能因許多原因而令人嚮往,例如分離您的腳本邏輯和管線程式碼、允許在以不同語言組成的檔案中進行適當的程式碼醒目提示,以及管線結構中的一般靈活性。也可以將您的 template_searchpath 定義為指向 DAG 建構函式呼叫中的任何資料夾位置。

使用相同的 DAG 建構函式呼叫,可以定義 user_defined_macros,讓您可以指定自己的變數。例如,將 dict(foo='bar') 傳遞給此參數可讓您在模板中使用 {{ foo }}。此外,指定 user_defined_filters 可讓您註冊自己的篩選器。例如,將 dict(hello=lambda name: 'Hello %s' % name) 傳遞給此參數可讓您在模板中使用 {{ 'world' | hello }}。有關自訂篩選器的更多資訊,請查看 Jinja 文件

有關可以在模板中參考的變數和巨集的更多資訊,請務必閱讀 模板參考

新增 DAG 和任務文件

我們可以為 DAG 或每個單一任務新增文件。DAG 文件目前僅支援 markdown,而任務文件支援純文字、markdown、reStructuredText、json 和 yaml。DAG 文件可以寫成 DAG 檔案開頭的文件字串(推薦),或檔案中的任何其他位置。您可以在下面找到一些關於如何實作任務和 DAG 文件以及螢幕截圖的範例

airflow/example_dags/tutorial.py[原始碼]

t1.doc_md = textwrap.dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this
../_images/task_doc.png ../_images/dag_doc.png

設定相依性

我們有任務 t1t2t3 彼此相依。以下是一些您可以定義它們之間相依性的方法

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

請注意,當執行您的腳本時,Airflow 會在 DAG 中發現循環或相依性被參考多次時引發例外。

使用時區

建立時區感知 DAG 非常簡單。只需確保使用 pendulum 提供時區感知日期。請勿嘗試使用標準函式庫 timezone,因為已知它們有局限性,並且我們刻意禁止在 DAG 中使用它們。

回顧

好的,所以我們有一個非常基本的 DAG。此時,您的程式碼應如下所示

airflow/example_dags/tutorial.py[原始碼]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

測試

執行腳本

是時候執行一些測試了。首先,讓我們確保管線已成功解析。

假設我們將上一步的程式碼儲存在 airflow.cfg 中引用的 DAG 資料夾中的 tutorial.py 中。DAG 的預設位置是 ~/airflow/dags

python ~/airflow/dags/tutorial.py

如果腳本沒有引發例外,則表示您沒有犯下任何可怕的錯誤,並且您的 Airflow 環境在某種程度上是健全的。

命令列元數據驗證

讓我們執行一些命令來進一步驗證此腳本。

# initialize the database tables
airflow db migrate

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree

測試

讓我們透過針對特定日期執行實際的任務實例進行測試。在此上下文中指定的日期稱為邏輯日期(基於歷史原因也稱為執行日期),它模擬排程器針對特定日期和時間執行您的任務或 DAG,即使它實際上現在(或在其相依性滿足後立即)執行。

我們說排程器針對特定日期和時間(而不是)執行您的任務。這是因為 DAG 的每次執行在概念上代表的不是特定日期和時間,而是兩個時間之間的間隔,稱為 資料間隔。DAG 執行的邏輯日期是其資料間隔的開始。

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

現在回想一下我們之前對模板做了什麼?看看這個模板是如何透過執行此命令來呈現和執行的

# testing templated
airflow tasks test tutorial templated 2015-06-01

這應該會導致顯示事件的詳細日誌,並最終執行您的 bash 命令並印出結果。

請注意,airflow tasks test 命令在本地執行任務實例,將其日誌輸出到 stdout(螢幕上),不理會相依性,並且不與資料庫通訊狀態(執行中、成功、失敗、…)。它僅允許測試單個任務實例。

同樣適用於 airflow dags test,但在 DAG 層級。它執行給定 DAG ID 的單個 DAG 執行。雖然它確實考慮了任務相依性,但沒有狀態註冊在資料庫中。如果您的一個任務期望在某個位置有資料,並且該資料可用,則它對於在本地測試 DAG 的完整執行非常方便。

回填

一切看起來都運作良好,所以讓我們執行回填。backfill 將尊重您的相依性,將日誌發送到檔案並與資料庫通訊以記錄狀態。如果您啟用了 webserver,您將能夠追蹤進度。airflow webserver 將啟動一個 web 伺服器,如果您有興趣以視覺方式追蹤回填進度。

請注意,如果您使用 depends_on_past=True,則個別任務實例將相依於其先前的任務實例的成功(也就是說,根據邏輯日期)。邏輯日期等於 start_date 的任務實例將忽略此相依性,因為不會為它們建立先前的任務實例。

當使用 depends_on_past=True 時,您可能還想考慮 wait_for_downstream=True。雖然 depends_on_past=True 會導致任務實例相依於其先前 task_instance 的成功,但 wait_for_downstream=True 也會導致任務實例等待先前任務實例的所有直接下游任務實例成功。

此上下文中的日期範圍是 start_date 和可選的 end_date,用於使用來自此 DAG 的任務實例填充執行排程。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

下一步?

就這樣!您已經撰寫、測試和回填了您的第一個 Airflow 管線。將您的程式碼合併到針對它運行排程器的儲存庫中,應該會導致每天觸發和運行。

以下是您接下來可能想做的一些事情

另請參閱

  • 繼續進行本教學課程的下一步:使用 TaskFlow

  • 跳到 核心概念 章節,以詳細說明 Airflow 概念,例如 DAG、任務、運算子等等

這篇文章有幫助嗎?