偵錯 Airflow DAG

使用 dag.test() 測試 DAG

若要在 IDE 中偵錯 DAG,您可以在您的 DAG 檔案中設定 dag.test 命令,並在單一序列化的 Python 處理程序中執行您的 DAG。

此方法可以與任何支援的資料庫 (包括本機 SQLite 資料庫) 一起使用,並且會快速失敗,因為所有任務都在單一處理程序中執行。

若要設定 dag.test,請將這兩行程式碼新增到您的 DAG 檔案底部

if __name__ == "__main__":
    dag.test()

就是這樣!您可以新增選用的引數來微調測試,否則您可以根據需要執行或偵錯 DAG。以下是一些引數範例

  • 如果您想要測試特定引數的 DAG 執行,可以使用 execution_date

  • 如果您想要使用執行器測試 DAG,可以使用 use_executor。預設情況下,dag.test 在沒有執行器的情況下執行 DAG,它只會在本機執行所有任務。透過提供此引數,DAG 將使用 Airflow 環境中設定的執行器執行。

有條件地跳過任務

如果您不希望在本機環境中執行某些任務子集 (例如,相依性檢查感測器或清理步驟),您可以自動將它們標記為成功,並在 mark_success_pattern 引數中提供符合其 task_id 的模式。

在以下範例中,測試 DAG 不會等待任一上游 DAG 完成。相反地,測試資料是手動擷取的。清理步驟也會被跳過,使得中繼 CSV 檔案可用於檢查。

with DAG("example_dag", default_args=default_args) as dag:
    sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
    sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
    collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
    # ... run other tasks
    cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])

    [sensor, sensor2] >> collect_stats >> cleanup

if __name__ == "__main__":
    ingest_testing_data()
    run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
    print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")

與 DebugExecutor 的比較

dag.test 命令相較於現已棄用的 DebugExecutor 類別,具有以下優勢

  1. 它完全不需要執行執行器。任務一次執行一個,沒有執行器或排程器日誌。

  2. 它比使用 DebugExecutor 執行程式碼快得多,因為它不需要經過排程器迴圈。

  3. 它不會執行回填。

在命令列上偵錯 Airflow DAG

透過與上述章節中提到的相同的兩行新增程式碼,您現在也可以輕鬆地使用 pdb 偵錯 DAG。執行 python -m pdb <DAG 檔案的路徑>.py 以在命令列上獲得互動式偵錯體驗。

root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>

Debug Executor (已棄用)

DebugExecutor 旨在作為偵錯工具,並且可以從 IDE 使用。它是一個單一處理程序執行器,會將 TaskInstance 排入佇列,並透過執行 _run_raw_task 方法來執行它們。

由於其特性,執行器可以與 SQLite 資料庫一起使用。當與感測器一起使用時,執行器會將感測器模式變更為 reschedule 以避免封鎖 DAG 的執行。

此外,DebugExecutor 可以用於快速失敗模式,這會使所有其他正在執行或排程的任務立即失敗。若要啟用此選項,請設定 AIRFLOW__DEBUG__FAIL_FAST=True 或調整您的 airflow.cfg 中的 fail_fast 選項。如需設定組態的更多資訊,請參閱 設定組態選項

IDE 設定步驟

  1. 在您的 DAG 檔案末尾新增 main 區塊,使其可執行。

它將執行回填作業

if __name__ == "__main__":
    from airflow.utils.state import State

    dag.clear()
    dag.run()
  1. 在您的 IDE 的執行組態中設定 AIRFLOW__CORE__EXECUTOR=DebugExecutor。在此步驟中,您也應該設定 DAG 所需的所有環境變數。

  2. 執行/偵錯 DAG 檔案。

這個條目是否有幫助?