最佳實務

建立新的 DAG 是一個三步驟的流程

  • 撰寫 Python 程式碼來建立 DAG 物件,

  • 測試程式碼是否符合您的預期,

  • 設定環境依賴性以執行您的 DAG

本教學將向您介紹這三個步驟的最佳實務。

撰寫 DAG

在 Airflow 中建立新的 DAG 非常簡單。然而,您需要注意許多事項,以確保 DAG 執行或失敗不會產生非預期的結果。

建立自訂運算子/Hook

請參考我們關於 自訂運算子 的指南。

建立任務

您應該將 Airflow 中的任務視為資料庫中的交易。這表示您永遠不應從您的任務中產生不完整的結果。一個例子是在任務結束時,不要在 HDFSS3 中產生不完整的資料。

Airflow 可以在任務失敗時重試。因此,任務在每次重新執行時應產生相同的結果。以下是一些您可以避免產生不同結果的方法 -

  • 在任務重新執行期間不要使用 INSERT,INSERT 語句可能會導致資料庫中出現重複的列。請將其替換為 UPSERT。

  • 在特定的分割區中讀取和寫入。永遠不要在任務中讀取最新的可用資料。有人可能會在重新執行之間更新輸入資料,這會導致不同的輸出。更好的方法是從特定的分割區讀取輸入資料。您可以使用 data_interval_start 作為分割區。在 S3/HDFS 中寫入資料時,您也應該遵循此分割方法。

  • Python datetime now() 函數會給出目前的 datetime 物件。這個函數絕不應在任務內部使用,尤其是在進行關鍵計算時,因為它會在每次執行時導致不同的結果。例如,使用它來產生暫時日誌是可以的。

提示

您應該在 default_args 中定義重複的參數,例如 connection_id 或 S3 路徑,而不是為每個任務宣告它們。default_args 有助於避免諸如印刷錯誤之類的錯誤。此外,大多數連線類型在任務中都有唯一的參數名稱,因此您可以在 default_args 中僅宣告一次連線(例如 gcp_conn_id),並且所有使用此連線類型的運算子都會自動使用它。

刪除任務

從 DAG 中刪除任務時請小心。您將無法在圖形視圖、網格視圖等中看到該任務,這使得從 Webserver 檢查該任務的日誌變得困難。如果這不是您所期望的,請建立一個新的 DAG。

溝通

如果您正在使用 Kubernetes executorCelery executor,Airflow 會在不同的伺服器上執行 DAG 的任務。因此,您不應在本地檔案系統中儲存任何檔案或組態設定,因為下一個任務很可能在沒有存取權限的不同伺服器上執行 — 例如,下載資料檔案的任務,下一個任務會處理該資料檔案。在 Local executor 的情況下,將檔案儲存在磁碟上可能會使重試更加困難,例如,您的任務需要一個組態設定檔,該檔案被 DAG 中的另一個任務刪除。

如果可能,請使用 XCom 在任務之間傳遞小型訊息,而在任務之間傳遞較大資料的好方法是使用遠端儲存空間,例如 S3/HDFS。例如,如果我們有一個任務將處理後的資料儲存在 S3 中,則該任務可以將輸出資料的 S3 路徑推送至 Xcom,而下游任務可以從 XCom 中提取路徑並使用它來讀取資料。

任務也不應在其中儲存任何身份驗證參數,例如密碼或權杖。在任何可能的情況下,請使用 連線 將資料安全地儲存在 Airflow 後端,並使用唯一的連線 ID 檢索它們。

頂層 Python 程式碼

您應該避免撰寫不必要的頂層程式碼來建立運算子和建立它們之間的 DAG 關係。這是因為 Airflow 排程器的設計決策,以及頂層程式碼解析速度對 Airflow 效能和可擴展性的影響。

Airflow 排程器以 min_file_process_interval 秒的最小間隔執行運算子 execute 方法之外的程式碼。這樣做的目的是為了允許 DAG 的動態排程 - 排程和依賴性可能會隨著時間而變化,並影響 DAG 的下一個排程。Airflow 排程器嘗試持續確保 DAG 中的內容正確地反映在排程任務中。

特別是,您不應執行任何資料庫存取、繁重的計算和網路操作。

影響 DAG 載入時間的重要因素之一,Python 開發人員可能會忽略的是,頂層匯入可能需要驚人的長時間,並且它們可能會產生大量的額外負擔,這可以通過將它們轉換為 Python 可調用物件內部的本地匯入來輕鬆避免,例如。

請考慮以下兩個範例。在第一個範例中,DAG 的解析時間將比第二個範例中的功能相同的 DAG 多花 1000 秒,在第二個範例中,expensive_api_call 是從其任務的上下文中執行的。

不避免頂層 DAG 程式碼

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    print("Hello from Airflow!")
    sleep(1000)


my_expensive_response = expensive_api_call()

with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        print(my_expensive_response)

避免頂層 DAG 程式碼

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

在第一個範例中,每次解析 DAG 檔案時都會執行 expensive_api_call,這將導致 DAG 檔案處理的效能次佳。在第二個範例中,expensive_api_call 僅在任務執行時調用,因此能夠在不遭受任何效能損失的情況下進行解析。為了自己測試一下,請實作第一個 DAG 並查看排程器日誌中印出的「Hello from Airflow!」!

請注意,import 語句也計為頂層程式碼。因此,如果您有一個耗時很長的 import 語句,或者匯入的模組本身在頂層執行程式碼,那麼這也會影響排程器的效能。以下範例說明如何處理耗時的匯入。

# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum

# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#

...


@task()
def do_stuff_with_pandas_and_torch():
    import pandas
    import torch

    # do some operations using pandas and torch


@task()
def do_stuff_with_tensorflow():
    import tensorflow

    # do some operations using tensorflow

如何檢查我的程式碼是否為「頂層」程式碼

為了理解您的程式碼是否為「頂層」程式碼,您需要了解 Python 解析工作原理的許多複雜性。一般來說,當 Python 解析 python 檔案時,它會執行它看到的程式碼,除了(一般而言)它不執行的內部方法程式碼。

它有許多不明顯的特殊情況 - 例如,頂層程式碼也表示用於確定方法預設值的任何程式碼。

但是,有一種簡單的方法可以檢查您的程式碼是否為「頂層」程式碼。您只需解析您的程式碼,看看程式碼片段是否被執行。

想像一下這段程式碼

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    return "print_array_task"  # <- is that code going to be executed?


def get_array():
    return [1, 2, 3]  # <- is that code going to be executed?


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

您可以做的是在您要檢查的程式碼中新增一些 print 語句,然後執行 python <my_dag_file>.py

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    print("Executing 1")
    return "print_array_task"  # <- is that code going to be executed? YES


def get_array():
    print("Executing 2")
    return [1, 2, 3]  # <- is that code going to be executed? NO


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

當您執行該程式碼時,您將看到

root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1

這表示 get_array 沒有作為頂層程式碼執行,但 get_task_id 是。

動態 DAG 生成

有時手動撰寫 DAG 並不切實際。也許您有很多 DAG 做著類似的事情,只是它們之間的參數有所不同。或者,也許您需要一組 DAG 來載入表格,但不希望每次這些表格更改時都手動更新 DAG。在這些和其他情況下,動態生成 DAG 可能更有用。

避免前一章描述的頂層程式碼中的過度處理在動態 DAG 組態設定的情況下尤其重要,動態 DAG 組態設定基本上可以通過以下方式之一進行設定

  • 透過 環境變數(不要與 Airflow 變數 混淆)

  • 透過外部提供的、生成的 Python 程式碼,在 DAG 資料夾中包含元數據

  • 透過外部提供的、生成的組態設定元數據檔案在 DAG 資料夾中

動態 DAG 生成的一些案例在 動態 DAG 生成 章節中進行了描述。

Airflow 變數

使用 Airflow 變數會產生網路呼叫和資料庫存取,因此應盡可能避免在 DAG 的頂層 Python 程式碼中使用它們,如前一章 頂層 Python 程式碼 中所述。如果必須在頂層 DAG 程式碼中使用 Airflow 變數,則可以通過 啟用實驗性快取 來減輕它們對 DAG 解析的影響,並使用合理的 ttl 進行組態設定。

您可以在運算子的 execute() 方法中自由使用 Airflow 變數,但您也可以通過 Jinja 模板將 Airflow 變數傳遞給現有的運算子,這將延遲讀取值直到任務執行。

執行此操作的模板語法是

{{ var.value.<variable_name> }}

或者,如果您需要從變數中反序列化 json 物件

{{ var.json.<variable_name> }}

在頂層程式碼中,使用 jinja 模板的變數在任務運行之前不會產生請求,而如果未啟用快取,Variable.get() 則會在排程器每次解析 dag 檔案時產生請求。不 啟用快取 而使用 Variable.get() 將導致 dag 檔案處理的效能次佳。在某些情況下,這可能會導致 dag 檔案在完全解析之前逾時。

不良範例

from airflow.models import Variable

foo_var = Variable.get("foo")  # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
    task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)

bash_use_variable_bad_2 = BashOperator(
    task_id="bash_use_variable_bad_2",
    bash_command=f"echo variable foo=${Variable.get('foo')}",  # AVOID THAT
)

bash_use_variable_bad_3 = BashOperator(
    task_id="bash_use_variable_bad_3",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": Variable.get("foo")},  # AVOID THAT
)

良好範例

bash_use_variable_good = BashOperator(
    task_id="bash_use_variable_good",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
    var = Variable.get("foo")  # This is ok since my_task is called only during task run, not during DAG scan.
    print(var)

為了安全起見,建議您對任何包含敏感資料的變數使用 Secrets Backend

時間表

避免在時間表程式碼的頂層使用 Airflow 變數/連線或存取 airflow 資料庫。資料庫存取應延遲到 DAG 的執行時間。這表示您不應將變數/連線檢索作為時間表類別初始化的引數,或者在自訂時間表模組的頂層具有變數/連線。

不良範例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get("something"), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)

良好範例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something="something", **kwargs):
        self._something = Variable.get(something)
        super().__init__(*args, **kwargs)

變更後觸發 DAG

避免在變更 DAG 或您在 DAG 資料夾中變更的任何其他附帶檔案後立即觸發 DAG。

您應該給系統足夠的時間來處理變更的檔案。這需要幾個步驟。首先,檔案必須分發到排程器 - 通常通過分散式檔案系統或 Git-Sync,然後排程器必須解析 Python 檔案並將它們儲存在資料庫中。根據您的組態設定、分散式檔案系統的速度、檔案數量、DAG 數量、檔案中的變更數量、檔案大小、排程器數量、CPU 速度,這可能需要幾秒鐘到幾分鐘,在極端情況下需要很多分鐘。您應該等待您的 DAG 出現在 UI 中才能觸發它。

如果您看到更新它與準備好觸發之間存在長時間延遲,您可以查看以下組態設定參數並根據您的需求進行微調(通過點擊連結查看每個參數的詳細資訊)

使用觸發規則的觀察者模式範例

觀察者模式是我們如何調用具有「觀察」其他任務狀態的任務的 DAG。其主要目的是在任何其他任務失敗時使 DAG Run 失敗。這種需求來自 Airflow 系統測試,這些測試是具有不同任務的 DAG(類似於包含步驟的測試)。

通常,當任何任務失敗時,所有其他任務都不會執行,並且整個 DAG Run 也會獲得失敗狀態。但是當我們使用觸發規則時,我們可以中斷運行任務的正常流程,並且整個 DAG 可能表示我們期望的不同狀態。例如,我們可以有一個拆解任務(觸發規則設定為 TriggerRule.ALL_DONE),無論其他任務的狀態如何都將執行該任務(例如,清理資源)。在這種情況下,DAG 將始終運行此任務,並且 DAG Run 將獲得此特定任務的狀態,因此我們可能會遺失有關失敗任務的資訊。如果我們想要確保具有拆解任務的 DAG 在任何任務失敗時都會失敗,我們需要使用觀察者模式。觀察者任務是一個在觸發時始終會失敗的任務,但它只需要在任何其他任務失敗時觸發。它需要將觸發規則設定為 TriggerRule.ONE_FAILED,並且它還需要是 DAG 中所有其他任務的下游任務。正因為如此,如果每個其他任務都通過,則觀察者將被略過,但是當某些任務失敗時,觀察者任務將被執行並失敗,從而也使 DAG Run 失敗。

注意

請注意,觸發規則僅依賴於直接上游(父)任務,例如 TriggerRule.ONE_FAILED 將忽略任何失敗的(或 upstream_failed)不是參數化任務直接父任務的任務。

通過一個範例更容易掌握這個概念。假設我們有以下 DAG

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule


@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
    raise AirflowException("Failing task because one or more upstream tasks failed.")


with DAG(
    dag_id="watcher_example",
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
    passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
    teardown = BashOperator(
        task_id="teardown",
        bash_command="echo teardown",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    failing_task >> passing_task >> teardown
    list(dag.tasks) >> watcher()

此 DAG 在執行後的視覺表示如下所示

_images/watcher.png

我們有幾個任務用於不同的目的

  • failing_task 總是失敗,

  • passing_task 總是成功(如果執行),

  • teardown 始終被觸發(無論其他任務的狀態如何),並且它應該始終成功,

  • watcher 是每個其他任務的下游任務,即,當任何任務失敗時它將被觸發,從而使整個 DAG Run 失敗,因為它是葉任務。

重要的是要注意,如果沒有 watcher 任務,整個 DAG Run 將獲得 success 狀態,因為唯一失敗的任務不是葉任務,並且 teardown 任務將以 success 結束。如果我們希望 watcher 監控所有任務的狀態,我們需要使其單獨依賴於所有任務。正因為如此,如果任何任務失敗,我們可以使 DAG Run 失敗。請注意,觀察者任務的觸發規則設定為 "one_failed"。另一方面,如果沒有 teardown 任務,則不需要 watcher 任務,因為 failing_task 將其 failed 狀態傳播到下游任務 passed_task,並且整個 DAG Run 也將獲得 failed 狀態。

在叢集政策中使用 AirflowClusterPolicySkipDag 例外來略過特定的 DAG

版本 2.7 新增功能。

Airflow DAG 通常可以通過 git-sync 使用 Git 儲存庫的特定分支進行部署和更新。但是,當您必須出於某些操作原因運行多個 Airflow 叢集時,維護多個 Git 分支非常麻煩。特別是,當您需要定期同步兩個獨立的分支(例如 prodbeta)時,您會遇到一些困難,並使用適當的分支策略。

  • cherry-pick 對於維護 Git 儲存庫來說太麻煩了。

  • hard-reset 不是 GitOps 的建議方式

因此,您可以考慮將多個 Airflow 叢集與相同的 Git 分支(例如 main)連線,並使用不同的環境變數和不同的連線組態設定以及相同的 connection_id 來維護它們。如果需要,您還可以在叢集政策中引發 AirflowClusterPolicySkipDag 例外,以僅在特定 Airflow 部署上將特定 DAG 載入到 DagBag 中。

def dag_policy(dag: DAG):
    """Skipping the DAG with `only_for_beta` tag."""

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )

上面的範例顯示了 dag_policy 程式碼片段,以根據 DAG 的標籤略過 DAG。

降低 DAG 複雜度

雖然 Airflow 擅長處理具有大量任務及其之間依賴關係的大量 DAG,但當您有很多複雜的 DAG 時,它們的複雜性可能會影響排程效能。保持 Airflow 實例高效能且充分利用的方法之一是,您應該盡可能簡化和最佳化您的 DAG - 您必須記住,DAG 解析過程和建立只是執行 Python 程式碼,這取決於您使其盡可能高效能。沒有使您的 DAG「不那麼複雜」的魔術配方 - 因為這是 Python 程式碼,所以是 DAG 撰寫者控制其程式碼的複雜性。

沒有 DAG 複雜度的「指標」,特別是,沒有指標可以告訴您您的 DAG 是否「足夠簡單」。但是,與任何 Python 程式碼一樣,當您的 DAG 程式碼經過最佳化時,您肯定可以判斷它是否「更簡單」或「更快」。如果您想最佳化您的 DAG,您可以採取以下操作

  • 使您的 DAG 載入更快。這是一個單一的改進建議,可以通過各種方式實作,但這是對排程器效能影響最大的建議。每當您有機會使您的 DAG 載入更快時 - 如果您的目標是提高效能,請執行它。查看 頂層 Python 程式碼 以獲得有關如何執行此操作的一些提示。另請參閱 DAG 載入器測試,了解如何評估您的 DAG 載入時間。

  • 使您的 DAG 生成更簡單的結構。每個任務依賴性都會增加排程和執行的額外處理負擔。具有簡單線性結構 A -> B -> C 的 DAG 將比具有深度巢狀樹狀結構且依賴任務數量呈指數增長的 DAG 在任務排程中遇到更少的延遲。如果您可以使您的 DAG 更線性 - 在執行中的單個點上,任務之間只有盡可能少的潛在候選者可以運行,這可能會提高整體排程效能。

  • 減少每個檔案的 DAG 數量。雖然 Airflow 2 針對在一個檔案中包含多個 DAG 的情況進行了最佳化,但系統的某些部分有時會使其效能降低,或者比將這些 DAG 分散到多個檔案中引入更多的延遲。例如,僅一個檔案只能由一個 FileProcessor 解析這一事實,就使其可擴展性降低。如果您從一個檔案生成了許多 DAG,如果您觀察到在 Airflow 的 UI 中反映 DAG 檔案中的變更需要很長時間,請考慮拆分它們。

  • 撰寫高效的 Python 程式碼。必須在每個檔案的 DAG 數量較少(如上所述)與總體上撰寫較少程式碼之間取得平衡。建立描述 DAG 的 Python 檔案應遵循最佳程式設計實務,而不應視為組態設定。如果您的 DAG 共用相似的程式碼,您不應一遍又一遍地將它們複製到大量幾乎相同的原始程式碼檔案中,因為這將導致不必要地重複匯入相同的資源。相反,您應該旨在最大限度地減少所有 DAG 中重複的程式碼,以便應用程式可以高效運行並且可以輕鬆除錯。請參閱 動態 DAG 生成,了解如何使用相似的程式碼建立多個 DAG。

測試 DAG

Airflow 使用者應將 DAG 視為生產級程式碼,並且 DAG 應具有各種相關的測試,以確保它們產生預期的結果。您可以為 DAG 撰寫各種各樣的測試。讓我們看一下其中的一些。

DAG 載入器測試

此測試應確保您的 DAG 不包含在載入時引發錯誤的程式碼片段。使用者無需撰寫額外的程式碼即可運行此測試。

python your-dag-file.py

運行上述命令而沒有任何錯誤可確保您的 DAG 不包含任何未安裝的依賴項、語法錯誤等。確保您在與您的排程器環境相對應的環境中載入您的 DAG - 具有相同的依賴項、環境變數、從 DAG 引用的通用程式碼。

如果您想嘗試最佳化 DAG 載入時間,這也是檢查您的 DAG 在最佳化後是否載入更快的絕佳方法。只需運行 DAG 並測量它所需的時間,但同樣您必須確保您的 DAG 在相同的依賴項、環境變數、通用程式碼下運行。

有很多方法可以測量處理時間,Linux 環境中的一種方法是使用內建的 time 命令。確保連續運行幾次以考慮快取效應。比較最佳化之前和之後的結果(在相同的條件下 - 使用相同的機器、環境等),以便評估最佳化的影響。

time python airflow/example_dags/example_python_operator.py

結果

real    0m0.699s
user    0m0.590s
sys     0m0.108s

重要的指標是「實際時間」 - 它告訴您處理 DAG 花費了多長時間。請注意,當以這種方式載入檔案時,您正在啟動一個新的直譯器,因此存在 Airflow 解析 DAG 時不存在的初始載入時間。您可以通過運行來評估初始化時間

time python -c ''

結果

real    0m0.073s
user    0m0.037s
sys     0m0.039s

在這種情況下,初始直譯器啟動時間約為 0.07 秒,約佔上面 example_python_operator.py 解析所需時間的 10%,因此範例 DAG 的實際解析時間約為 0.62 秒。

您可以查看 測試 DAG,以獲取有關如何測試個別運算子的詳細資訊。

單元測試

單元測試確保您的 DAG 中沒有不正確的程式碼。您可以為您的任務和您的 DAG 撰寫單元測試。

載入 DAG 的單元測試

import pytest

from airflow.models import DagBag


@pytest.fixture()
def dagbag():
    return DagBag()


def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

單元測試 DAG 結構: 這是一個範例測試,想要驗證程式碼生成的 DAG 的結構是否與 dict 物件一致

def assert_dag_dict_equal(source, dag):
    assert dag.task_dict.keys() == source.keys()
    for task_id, downstream_list in source.items():
        assert dag.has_task(task_id)
        task = dag.get_task(task_id)
        assert task.downstream_task_ids == set(downstream_list)


def test_dag():
    assert_dag_dict_equal(
        {
            "DummyInstruction_0": ["DummyInstruction_1"],
            "DummyInstruction_1": ["DummyInstruction_2"],
            "DummyInstruction_2": ["DummyInstruction_3"],
            "DummyInstruction_3": [],
        },
        dag,
    )

自訂運算子的單元測試

import datetime

import pendulum
import pytest

from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"


@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        schedule="@daily",
        start_date=DATA_INTERVAL_START,
    ) as dag:
        MyCustomOperator(
            task_id=TEST_TASK_ID,
            prefix="s3://bucket/some/prefix",
        )
    return dag


def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results.

自我檢查

您還可以在 DAG 中實作檢查,以確保任務產生預期的結果。例如,如果您有一個將資料推送至 S3 的任務,您可以在下一個任務中實作檢查。例如,檢查可以確保在 S3 中建立分割區,並執行一些簡單的檢查以確定資料是否正確。

同樣地,如果您有一個在 Kubernetes 或 Mesos 中啟動微服務的任務,您應該使用 airflow.providers.http.sensors.http.HttpSensor 檢查服務是否已啟動。

task = PushToS3(...)
check = S3KeySensor(
    task_id="check_parquet_exists",
    bucket_key="s3://bucket/key/foo.parquet",
    poke_interval=0,
    timeout=0,
)
task >> check

預演環境

如果可能,請保留一個預演環境,以在部署到生產環境之前測試完整的 DAG 運行。確保您的 DAG 已參數化以變更變數,例如,S3 操作的輸出路徑或用於讀取組態設定的資料庫。不要在 DAG 內部硬式編碼值,然後根據環境手動變更它們。

您可以使用環境變數來參數化 DAG。

import os

dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")

模擬變數與連線

當您為使用變數或連線的程式碼撰寫測試時,您必須確保它們在您運行測試時存在。顯而易見的解決方案是將這些物件儲存到資料庫中,以便在您的程式碼執行時可以讀取它們。但是,讀取和寫入物件到資料庫會增加額外的時間負擔。為了加快測試執行速度,值得模擬這些物件的存在,而無需將它們儲存到資料庫中。為此,您可以使用模擬 os.environ 的環境變數,使用 unittest.mock.patch.dict()

對於變數,請使用 AIRFLOW_VAR_{KEY}

with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
    assert "env-value" == Variable.get("key")

對於連線,請使用 AIRFLOW_CONN_{CONN_ID}

conn = Connection(
    conn_type="gcpssh",
    login="cat",
    host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
    assert "cat" == Connection.get_connection_from_secrets("my_conn").login

Metadata DB 維護

隨著時間的推移,隨著更多 DAG 和任務運行以及事件日誌的累積,元數據資料庫將增加其儲存空間。

您可以使用 Airflow CLI 通過命令 airflow db clean 清除舊資料。

有關更多詳細資訊,請參閱 db clean 用法

升級與降級

備份您的資料庫

在進行任何修改資料庫的操作之前,備份元數據資料庫始終是一個明智的想法。

停用排程器

您可能會考慮在執行此類維護時停用 Airflow 叢集。

一種方法是將參數 [scheduler] > use_job_schedule 設定為 False,並等待任何正在運行的 DAG 完成;在此之後,除非外部觸發,否則不會建立新的 DAG 運行。

一個更佳的方式(雖然有點手動)是使用 dags pause 指令。您需要追蹤在您開始此操作之前哪些 DAG 被暫停,以便您知道維護完成後要取消暫停哪些 DAG。首先執行 airflow dags list 並儲存未暫停 DAG 的列表。然後使用相同的列表來執行 dags pause (在維護之前針對每個 DAG)和 dags unpause (在維護之後)。這樣做的好處是,您可以嘗試在升級後僅取消暫停一或兩個 DAG(也許是專用的 測試 DAG),以確保一切運作正常,然後再重新開啟所有項目。

新增「整合測試」DAG

新增幾個「整合測試」DAG 會很有幫助,這些 DAG 使用您生態系統中的所有常見服務(例如 S3、Snowflake、Vault),但使用測試資源或「開發」帳戶。這些測試 DAG 可以是您在升級後首先開啟的 DAG,因為如果它們失敗了,那也沒關係,您可以還原到備份而不會產生負面後果。但是,如果它們成功了,它們應該證明您的叢集能夠使用您需要使用的程式庫和服務來執行任務。

例如,如果您使用外部密鑰後端,請確保您有一個任務可以檢索連線。如果您使用 KubernetesPodOperator,請新增一個執行 sleep 30; echo "hello" 的任務。如果您需要寫入 s3,請在測試任務中執行此操作。如果您需要存取資料庫,請新增一個執行 select 1 (從伺服器)的任務。

升級前清理資料

有些資料庫遷移可能很耗時。如果您的元數據資料庫非常大,請考慮在執行升級之前使用 db clean 指令清理一些舊資料。請謹慎使用。

處理衝突/複雜的 Python 依賴項

Airflow 有許多 Python 依賴項,有時 Airflow 依賴項會與您的任務程式碼預期的依賴項衝突。由於預設情況下,Airflow 環境只是一組 Python 依賴項和單一 Python 環境,因此通常也可能發生某些任務需要的依賴項與其他任務不同,並且這些任務之間的依賴項基本上會發生衝突的情況。

如果您使用預先定義的 Airflow Operators 與外部服務通訊,則沒有太多選擇,但通常這些 operators 會具有與基本 Airflow 依賴項不衝突的依賴項。Airflow 使用約束機制,這表示您擁有一組「固定的」依賴項,社群保證可以使用這些依賴項安裝 Airflow(包括所有社群提供者),而不會觸發衝突。但是,您可以獨立升級提供者,並且它們的約束不會限制您,因此發生依賴項衝突的機會較低(您仍然需要測試這些依賴項)。因此,當您使用預先定義的 operators 時,發生依賴項衝突問題的機率很小,甚至沒有問題。

但是,當您以更「現代的方式」使用 Airflow 時,也就是當您使用 TaskFlow Api 並且您的大多數 operators 是使用自訂 Python 程式碼編寫的,或者當您想要編寫自己的 Custom Operator 時,您可能會遇到您的自訂程式碼所需的依賴項與 Airflow 的依賴項衝突,甚至您的幾個 Custom Operators 的依賴項之間也產生衝突的情況。

有許多策略可以用來減輕這個問題。雖然處理自訂 operators 中的依賴項衝突很困難,但當使用 airflow.operators.python.PythonVirtualenvOperatorairflow.operators.python.ExternalPythonOperator 時,實際上會容易得多——可以直接使用經典的「operator」方法,或者使用以 @task.virtualenv@task.external_python 修飾器裝飾的任務(如果您使用 TaskFlow)。

讓我們從最容易實作的策略開始(有一些限制和額外負擔),我們將逐步介紹那些需要在您的 Airflow 部署中進行一些變更的策略。

使用 PythonVirtualenvOperator

這是最簡單使用且限制最多的策略。PythonVirtualenvOperator 允許您動態建立一個 virtualenv,您的 Python 可調用函數將在其中執行。在 使用 TaskFlow 中描述的現代 TaskFlow 方法中,也可以透過使用 @task.virtualenv 修飾器裝飾您的可調用物件來完成(建議使用 operator 的方式)。每個 airflow.operators.python.PythonVirtualenvOperator 任務都可以擁有自己獨立的 Python virtualenv(每次執行任務時動態建立),並且可以指定需要為該任務執行而安裝的細緻要求集。

此 operator 負責

  • 根據您的環境建立 virtualenv

  • 序列化您的 Python 可調用物件並將其傳遞給 virtualenv Python 解釋器執行

  • 執行它並檢索可調用物件的結果,並在指定的情況下透過 xcom 推送它

此 operator 的優點是

  • 無需預先準備 venv。它將在任務執行前動態建立,並在任務完成後移除,因此沒有任何特殊之處(除了在您的 airflow 依賴項中具有 virtualenv 套件)可以利用多個虛擬環境

  • 您可以在相同的 worker 上執行具有不同依賴項集的任務——因此記憶體資源可以重複使用(但請參閱下文關於建立 venvs 所涉及的 CPU 額外負擔)。

  • 在較大的安裝中,DAG 作者不需要請求任何人為您建立 venvs。作為 DAG 作者,您只需要安裝 virtualenv 依賴項,並且您可以根據需要指定和修改環境。

  • 部署需求沒有變更——無論您使用 Local virtualenv、Docker 或 Kubernetes,任務都可以在不向您的部署新增任何內容的情況下運作。

  • DAG 作者無需學習更多關於容器、Kubernetes 的知識。只需要 Python 需求的知識即可透過這種方式編寫 DAG。

此 operator 引入了一些限制和額外負擔

  • 您的 python 可調用物件必須是可序列化的。有許多 python 物件無法使用標準 pickle 程式庫進行序列化。您可以使用 dill 程式庫來減輕其中一些限制,但即使是該程式庫也無法解決所有序列化限制。

  • Airflow 環境中不可用的所有依賴項都必須在您使用的可調用物件中本地匯入,並且您的 DAG 的頂層 Python 程式碼不應匯入/使用這些程式庫。

  • 虛擬環境在相同的作業系統中執行,因此它們不能具有衝突的系統級依賴項(aptyum 可安裝套件)。只有 Python 依賴項可以獨立安裝在這些環境中。

  • 此 operator 為執行每個任務增加了 CPU、網路和經過時間的額外負擔——Airflow 必須為每個任務從頭開始重新建立虛擬環境

  • worker 需要存取 PyPI 或私有儲存庫才能安裝依賴項

  • 動態建立虛擬環境容易發生暫時性故障(例如,當您的儲存庫不可用或網路連線到儲存庫時出現問題)

  • 很容易陷入「過於」動態的環境——由於您安裝的依賴項可能會升級,並且它們的傳遞依賴項可能會獲得獨立升級,您最終可能會遇到這樣的情況:您的任務將停止運作,因為有人發布了新版本的依賴項,或者您可能會成為「供應鏈」攻擊的受害者,其中新版本的依賴項可能變得惡意

  • 任務僅透過在不同的環境中執行來彼此隔離。這使得正在執行的任務仍然可能彼此干擾——例如,在同一 worker 上執行的後續任務可能會受到先前任務建立/修改檔案等的影響。

您可以在 Taskflow Virtualenv 範例 中查看使用 airflow.operators.python.PythonVirtualenvOperator 的詳細範例

使用 ExternalPythonOperator

2.4 版本新增功能。

使用 airflow.operators.python.ExternalPythonOperator` 會更複雜一些,但額外負擔、安全性、穩定性問題會顯著減少。在 使用 TaskFlow 中描述的現代 TaskFlow 方法中,也可以透過使用 @task.external_python 修飾器裝飾您的可調用物件來完成(建議使用 operator 的方式)。但是,這需要您擁有預先存在的、不可變的 Python 環境,該環境是預先準備好的。與 airflow.operators.python.PythonVirtualenvOperator 不同,您無法將新的依賴項新增到此類預先存在的環境中。您需要的所有依賴項都應預先新增到您的環境中,並且在您的 Airflow 在分散式環境中執行的情況下,在所有 worker 中都可用。

這樣做可以避免重新建立虛擬環境的額外負擔和問題,但它們必須與 Airflow 安裝一起準備和部署。通常需要管理 Airflow 安裝的人員參與,而在較大的安裝中,這些人員通常與 DAG 作者(DevOps/系統管理員)不同。

可以透過各種方式準備這些虛擬環境——如果您使用 LocalExecutor,它們只需要安裝在執行排程器的機器上;如果您使用分散式 Celery 虛擬環境安裝,則應該有一個管線在多台機器上安裝這些虛擬環境;最後,如果您使用 Docker 映像(例如透過 Kubernetes),則應該將虛擬環境建立新增到您的自訂映像建置管線中。

此 operator 的優點是

  • 執行任務時沒有設定額外負擔。當您開始執行任務時,虛擬環境已準備就緒。

  • 您可以在相同的 worker 上執行具有不同依賴項集的任務——因此所有資源都可以重複使用。

  • worker 無需存取 PyPI 或私有儲存庫。因網路連線導致暫時性錯誤的機率較小。

  • 依賴項可以由管理員和您的安全團隊預先審查,不會動態新增意外的新程式碼。這對安全性與穩定性都有好處。

  • 對您的部署影響有限——您無需切換到 Docker 容器或 Kubernetes 即可充分利用此 operator。

  • DAG 作者無需學習更多關於容器、Kubernetes 的知識。只需要 Python、需求的知識即可透過這種方式編寫 DAG。

缺點

  • 您的環境需要預先準備好虛擬環境。這通常表示您無法即時變更它,新增新的或變更需求至少需要重新部署 Airflow,並且當您處理新版本時,迭代時間可能會更長。

  • 您的 python 可調用物件必須是可序列化的。有許多 python 物件無法使用標準 pickle 程式庫進行序列化。您可以使用 dill 程式庫來減輕其中一些限制,但即使是該程式庫也無法解決所有序列化限制。

  • Airflow 環境中不可用的所有依賴項都必須在您使用的可調用物件中本地匯入,並且您的 DAG 的頂層 Python 程式碼不應匯入/使用這些程式庫。

  • 虛擬環境在相同的作業系統中執行,因此它們不能具有衝突的系統級依賴項(aptyum 可安裝套件)。只有 Python 依賴項可以獨立安裝在這些環境中

  • 任務僅透過在不同的環境中執行來彼此隔離。這使得正在執行的任務仍然可能彼此干擾——例如,在同一 worker 上執行的後續任務可能會受到先前任務建立/修改檔案等影響

您可以將 PythonVirtualenvOperatorExternalPythonOperator 視為對應物——它們使從開發階段轉移到生產階段更加順暢。作為 DAG 作者,您通常會使用依賴項進行迭代,並使用 PythonVirtualenvOperator 開發您的 DAG(因此使用 @task.virtualenv 修飾器裝飾您的任務),而在迭代和變更之後,您可能希望在生產中將其變更為切換到 ExternalPythonOperator (和 @task.external_python ),在您的 DevOps/系統管理員團隊將您的新依賴項部署到生產環境中預先存在的 virtualenv 中之後。這樣做的好處是,您可以隨時將修飾器切換回來,並繼續使用 PythonVirtualenvOperator 「動態地」開發它。

您可以在 Taskflow External Python 範例 中查看使用 airflow.operators.python.ExternalPythonOperator 的詳細範例

使用 DockerOperator 或 Kubernetes Pod Operator

另一種策略是使用 airflow.providers.docker.operators.docker.DockerOperator airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator。這些 operators 需要 Airflow 能夠存取 Docker 引擎或 Kubernetes 叢集。

與 Python operators 的情況類似,如果您想使用這些 operators 執行您的可調用 Python 程式碼,taskflow 修飾器會很方便。

但是,這會更複雜——如果您想使用這種方法,您需要了解 Docker/Kubernetes Pods 的運作方式,但任務之間是完全隔離的,而且您甚至不限於執行 Python 程式碼。您可以使用任何您想要的程式設計語言編寫您的任務。此外,您的依賴項完全獨立於 Airflow 的依賴項(包括系統級依賴項),因此如果您的任務需要非常不同的環境,這是最佳選擇。

2.2 版本新增功能。

從 Airflow 2.2 版本開始,您可以使用 @task.docker 修飾器來使用 DockerOperator 執行您的函數。

2.4 版本新增功能。

從 Airflow 2.2 版本開始,您可以使用 @task.kubernetes 修飾器來使用 KubernetesPodOperator 執行您的函數。

使用這些 operators 的優點是

  • 您可以執行具有不同 Python 和系統級依賴項集的任務,甚至可以使用完全不同的語言或甚至不同的處理器架構(x86 與 arm)編寫的任務。

  • 用於執行任務的環境享有容器的最佳化和不可變性,其中一組相似但不同的依賴項可以有效地重複使用映像的許多快取層,因此該環境針對您擁有多個相似但不同的環境的情況進行了最佳化。

  • 依賴項可以由管理員和您的安全團隊預先審查,不會動態新增意外的新程式碼。這對安全性與穩定性都有好處。

  • 任務之間的完全隔離。除了使用標準 Airflow XCom 機制外,它們不能以其他方式相互影響。

缺點

  • 啟動任務會產生額外負擔。通常不如動態建立虛擬環境時那麼大,但仍然很顯著(尤其是對於 KubernetesPodOperator)。

  • 在 TaskFlow 修飾器的情況下,需要序列化整個要調用的方法並將其傳送到 Docker 容器或 Kubernetes Pod,並且對方法的大小存在系統級限制。在遠端序列化、傳送以及最終反序列化方法也會增加額外負擔。

  • 來自多個所需程序的資源額外負擔。在這些 operators 的情況下執行任務至少需要兩個程序——一個程序(在 Docker 容器或 Kubernetes Pod 中執行)執行任務,以及 Airflow worker 中的一個監管程序,該程序將作業提交到 Docker/Kubernetes 並監控執行。

  • 您的環境需要預先準備好容器映像。這通常表示您無法即時變更它們。新增系統依賴項、修改或變更 Python 需求需要重新建置和發布映像(通常在您的私有登錄檔中)。當您處理新的依賴項時,迭代時間通常會更長,並且如果開發人員變更依賴項,則需要迭代的開發人員建置和使用他們自己的映像。適當的部署管線對於能夠可靠地維護您的部署至關重要。

  • 如果您想透過修飾器執行您的 python 可調用物件,則它必須是可序列化的,在這種情況下,Airflow 環境中不可用的所有依賴項都必須在您使用的可調用物件中本地匯入,並且您的 DAG 的頂層 Python 程式碼不應匯入/使用這些程式庫。

  • 您需要了解更多關於 Docker 容器或 Kubernetes 如何運作的詳細資訊。這兩者提供的抽象是「洩漏的」,因此您需要更多地了解資源、網路、容器等,才能編寫使用這些 operators 的 DAG。

您可以在 Taskflow Docker 範例 中查看使用 airflow.operators.providers.Docker 的詳細範例,以及 airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator Taskflow Kubernetes 範例

使用多個 Docker 映像和 Celery 佇列

有可能(雖然這需要深入了解 Airflow 部署)使用多個獨立的 Docker 映像執行 Airflow 任務。這可以透過將不同的任務分配到不同的佇列,並將您的 Celery worker 配置為針對不同的佇列使用不同的映像來實現。然而,這(至少目前)需要大量手動部署配置以及 Airflow、Celery 和 Kubernetes 如何運作的內在知識。此外,它為執行任務引入了相當多的額外負擔——資源重複使用的機會更少,並且在不影響效能和穩定性的情況下,更難以針對資源成本微調此類部署。

使其更有用的一種可能方法是 AIP-46 Runtime isolation for Airflow tasks and DAG parsingAIP-43 DAG Processor Separation 的完成。在這些 AIP 實作之前,使用這種方法的好處很少,因此不建議使用。

然而,當這些 AIP 實作後,這將開啟更具多租戶方法的可能性,多個團隊將能夠擁有完全隔離的依賴項集,這些依賴項將在 DAG 的完整生命週期(從剖析到執行)中使用。

此條目是否有幫助?