常見問題¶
排程 / DAG 檔案解析¶
為什麼任務沒有被排程?¶
您的任務可能沒有被排程有很多原因。以下是一些常見的原因
您的腳本是否「編譯」成功?Airflow 引擎是否可以解析它並找到您的 DAG 物件?為了測試這一點,您可以執行
airflow dags list
並確認您的 DAG 出現在列表中。您也可以執行airflow tasks list foo_dag_id --tree
並確認您的任務如預期出現在列表中。如果您使用 CeleryExecutor,您可能需要確認這在排程器運行的位置以及 Worker 運行的位置都有效。包含您 DAG 的檔案內容中是否包含字串 “airflow” 和 “DAG”?在搜尋 DAG 目錄時,Airflow 會忽略不包含 “airflow” 和 “DAG” 的檔案,以防止 DagBag 解析從匯入與使用者 DAG 同位置的所有 Python 檔案。
您的
start_date
是否設定正確?對於基於時間的 DAG,任務只有在開始日期之後的第一個排程間隔過去後才會被觸發。您的
schedule
參數是否設定正確?預設值為一天 (datetime.timedelta(1)
)。您必須直接為您實例化的 DAG 物件指定不同的schedule
,而不是作為default_param
,因為任務實例不會覆寫其父 DAG 的schedule
。您的
start_date
是否超出您在 UI 中可以看到的範圍?如果您將start_date
設定為例如 3 個月前,您將無法在 UI 的主要視圖中看到它,但您應該可以在選單 -> 瀏覽 -> 任務實例
中看到它。任務的依賴項是否已滿足?任務直接上游的任務實例需要處於
success
狀態。此外,如果您已設定depends_on_past=True
,則先前的任務實例需要成功或被跳過(除非它是該任務的首次執行)。此外,如果wait_for_downstream=True
,請確保您了解其含義 - 前一個任務實例的所有直接下游任務都必須成功或被跳過。您可以從任務的任務實例詳細資訊
頁面查看這些屬性是如何設定的。您需要的 DagRun 是否已建立且處於活動狀態?DagRun 代表整個 DAG 的特定執行,並具有狀態(執行中、成功、失敗、...)。排程器會隨著時間推移建立新的 DagRun,但永遠不會回溯時間來建立新的 DagRun。排程器僅評估
running
DagRun 以查看它可以觸發哪些任務實例。請注意,清除任務實例(從 UI 或 CLI)會將 DagRun 的狀態設定回執行中。您可以透過點擊 DAG 的排程標籤來批量查看 DagRun 列表並更改狀態。您的 DAG 的
concurrency
參數是否已達到上限?concurrency
定義了 DAG 允許擁有的running
任務實例數量,超過此數量後,任務將被排入佇列。您的 DAG 的
max_active_runs
參數是否已達到上限?max_active_runs
定義了允許存在的 DAGrunning
並行實例數量。
您可能還想閱讀關於 排程器 的資訊,並確保您完全了解排程器週期。
如何提升 DAG 效能?¶
有一些 Airflow 組態允許更大的排程容量和頻率
DAG 具有可提高效率的組態
max_active_tasks
:覆寫 max_active_tasks_per_dag。max_active_runs
:覆寫 max_active_runs_per_dag。
運算子或任務也具有可提高效率和排程優先順序的組態
max_active_tis_per_dag
:此參數控制每個任務的跨dag_runs
的並行執行任務實例數。pool
:請參閱 Pools。priority_weight
:請參閱 Priority Weights。queue
:僅適用於 CeleryExecutor 部署,請參閱 Queues。
如何減少 DAG 排程延遲 / 任務延遲?¶
Airflow 2.0 具有開箱即用的低 DAG 排程延遲(特別是與 Airflow 1.10.x 相比),但是,如果您需要更高的吞吐量,您可以啟動多個排程器。
如何控制不同 DAG 檔案的 DAG 檔案解析逾時時間?¶
(僅適用於 Airflow >= 2.3.0)
您可以在 airflow_local_settings.py
中新增一個 get_dagbag_import_timeout
函數,該函數在解析 DAG 檔案之前被呼叫。您可以根據 DAG 檔案傳回不同的逾時值。當傳回值小於或等於 0 時,表示在 DAG 解析期間沒有逾時。
def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
"""
This setting allows to dynamically control the DAG file parsing timeout.
It is useful when there are a few DAG files requiring longer parsing times, while others do not.
You can control them separately instead of having one value for all DAG files.
If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
"""
if "slow" in dag_file_path:
return 90
if "no-timeout" in dag_file_path:
return 0
return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
有關如何組態本地設定的詳細資訊,請參閱 組態本地設定。
當有很多 (>1000) DAG 檔案時,如何加速新檔案的解析?¶
(僅適用於 Airflow >= 2.1.1)
將 file_parsing_sort_mode 更改為 modified_time
,將 min_file_process_interval 提高到 600
(10 分鐘)、6000
(100 分鐘) 或更高的值。
如果檔案最近被修改,DAG 解析器將跳過 min_file_process_interval
檢查。
這可能不適用於從單獨檔案匯入/建立 DAG 的情況。範例:dag_file.py
匯入 dag_loader.py
,其中 DAG 檔案的實際邏輯如下所示。在這種情況下,如果 dag_loader.py
已更新,但 dag_file.py
未更新,則更改將在達到 min_file_process_interval
後才會反映,因為 DAG 解析器將尋找 dag_file.py
檔案的修改時間。
from dag_loader import create_dag
globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
from airflow import DAG
from airflow.decorators import task
import pendulum
def create_dag(dag_id, schedule, dag_number, default_args):
dag = DAG(
dag_id,
schedule=schedule,
default_args=default_args,
pendulum.datetime(2021, 9, 13, tz="UTC"),
)
with dag:
@task()
def hello_world():
print("Hello World")
print(f"This is DAG: {dag_number}")
hello_world()
return dag
DAG 建構¶
start_date
到底是什麼?¶
start_date
部分是 DagRun 時代之前的遺留問題,但在許多方面仍然相關。在建立新的 DAG 時,您可能希望為您的任務設定全域 start_date
。這可以透過直接在 DAG()
物件中宣告您的 start_date
來完成。DAG 的第一個 DagRun 將根據 start_date
之後的第一個完整 data_interval
建立。例如,對於 start_date=datetime(2024, 1, 1)
和 schedule="0 0 3 * *"
的 DAG,第一個 DAG 執行將在 2024-02-03 午夜觸發,data_interval_start=datetime(2024, 1, 3)
和 data_interval_end=datetime(2024, 2, 3)
。從那時起,排程器根據您的 schedule
和對應的任務實例(在滿足您的依賴項時)建立新的 DagRun。當向您的 DAG 引入新任務時,您需要特別注意 start_date
,並且可能需要重新啟動非活動的 DagRun 以使新任務正確加入。
我們建議不要使用動態值作為 start_date
,尤其是 datetime.now()
,因為它可能會非常令人困惑。任務會在週期結束後觸發,理論上,@hourly
DAG 永遠不會在現在之後一小時執行,因為 now()
會不斷移動。
以前,我們也建議使用相對於 DAG 的 schedule
的四捨五入 start_date
。這表示 @hourly
將在 00:00
分:秒,@daily
工作在午夜,@monthly
工作在每月的第一天。現在不再需要這樣做。Airflow 現在將透過使用 start_date
作為開始查看的時刻,自動對齊 start_date
和 schedule
。
您可以使用任何感測器或 TimeDeltaSensor
來延遲排程間隔內任務的執行。雖然 schedule
允許指定 datetime.timedelta
物件,但我們建議改用巨集或 Cron 表達式,因為它強制執行四捨五入排程的想法。
當使用 depends_on_past=True
時,特別注意 start_date
非常重要,因為過去的依賴項不僅僅在為任務指定的 start_date
的特定排程上強制執行。引入新的 depends_on_past=True
時,密切關注 DagRun 活動狀態也很重要,除非您計劃為新任務執行回填。
還需要注意的是,在回填 CLI 命令的上下文中,任務的 start_date
會被回填的 start_date
命令覆寫。這允許對具有 depends_on_past=True
的任務進行回填以實際啟動。如果不是這種情況,回填將無法啟動。
使用時區¶
建立時區感知日期時間(例如 DAG 的 start_date
)非常簡單。只需確保使用 pendulum
提供時區感知日期即可。不要嘗試使用標準函式庫 timezone,因為已知它們有局限性,並且我們有意禁止在 DAG 中使用它們。
execution_date
代表什麼意思?¶
執行日期 或 execution_date
是邏輯日期 的歷史名稱,通常也是 DAG 執行所代表的資料間隔的開始時間。
Airflow 的開發是為了滿足 ETL 需求。在 ETL 世界中,您通常會摘要資料。因此,如果您想要摘要 2016-02-19
的資料,您會在 2016-02-20
午夜 UTC 執行,這會在 2016-02-19
的所有資料都可用之後立即執行。從 2016-02-19
午夜到 2016-02-20
午夜之間的間隔稱為資料間隔,並且由於它代表 2016-02-19
日期的資料,因此該日期也稱為執行的邏輯日期,或此 DAG 執行執行的日期,因此稱為執行日期。
為了向後相容性,日期時間值 execution_date
仍然作為 模板變數,在 Jinja 模板化欄位和 Airflow 的 Python API 中具有各種格式。它也包含在提供給運算子執行函式的上下文字典中。
class MyOperator(BaseOperator):
def execute(self, context):
logging.info(context["execution_date"])
但是,如果可能,您應該始終使用 data_interval_start
或 data_interval_end
,因為這些名稱在語義上更正確,並且不太容易引起誤解。
請注意,ds
(data_interval_start
的 YYYY-MM-DD 格式)指的是日期 *字串*,而不是日期 *開始*,這可能會讓某些人感到困惑。
如何動態建立 DAG?¶
Airflow 在您的 DAGS_FOLDER
中尋找在其全域命名空間中包含 DAG
物件的模組,並將其在 DagBag
中找到的物件新增進去。了解這一點後,我們所需要的就是一種在全域命名空間中動態指派變數的方法。這在 Python 中很容易使用標準函式庫的 globals()
函式完成,該函式的行為類似於簡單的字典。
def create_dag(dag_id):
"""
A function returning a DAG object.
"""
return DAG(dag_id)
for i in range(10):
dag_id = f"foo_{i}"
globals()[dag_id] = DAG(dag_id)
# or better, call a function that returns a DAG object!
other_dag_id = f"bar_{i}"
globals()[other_dag_id] = create_dag(other_dag_id)
即使 Airflow 支援每個 Python 檔案多個 DAG 定義(動態產生或其他方式),也不建議這樣做,因為 Airflow 希望 DAG 之間在故障和部署方面有更好的隔離,而同一個檔案中的多個 DAG 與此相悖。
是否允許頂層 Python 程式碼?¶
雖然不建議在定義 Airflow 建構之外編寫任何程式碼,但 Airflow 確實支援任何任意 Python 程式碼,只要它不破壞 DAG 檔案處理器或將檔案處理時間延長到超過 dagbag_import_timeout 值即可。
一個常見的範例是在建立動態 DAG 時違反時間限制,這通常需要從另一個服務(例如資料庫)查詢資料。同時,請求的服務被 DAG 檔案處理器請求資料以處理檔案的要求淹沒。這些意外的互動可能會導致服務惡化,並最終導致 DAG 檔案處理失敗。
有關更多資訊,請參閱 DAG 編寫最佳實務。
巨集會在另一個 Jinja 模板中解析嗎?¶
無法在另一個 Jinja 模板中呈現 巨集 或任何 Jinja 模板。這通常在 user_defined_macros
中嘗試。
dag = DAG(
# ...
user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)
bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)
對於 data_interval_start
為 2020-01-01 00:00:00 的 DAG 執行,這將會回顯 “day={{ ds }}” 而不是 “day=2020-01-01”。
bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)
透過直接在 template_field 中使用 ds 巨集,呈現的值會產生 “day=2020-01-01”。
為什麼 next_ds
或 prev_ds
可能不包含預期的值?¶
在排程 DAG 時,
next_ds
next_ds_nodash
prev_ds
prev_ds_nodash
是使用logical_date
和 DAG 的排程(如果適用)計算的。如果您將schedule
設定為None
或@once
,則next_ds
、next_ds_nodash
、prev_ds
、prev_ds_nodash
值將設定為None
。手動觸發 DAG 時,排程將被忽略,並且
prev_ds == next_ds == ds
。
任務執行互動¶
TemplateNotFound
代表什麼意思?¶
TemplateNotFound
錯誤通常是由於將路徑傳遞給觸發 Jinja 模板化的運算子時,與使用者期望不符所致。常見的情況是 BashOperators。
另一個常被忽略的事實是,檔案是相對於管線檔案所在的位置解析的。您可以將其他目錄新增至 DAG 物件的 template_searchpath
,以允許其他非相對位置。
如何根據另一個任務的失敗觸發任務?¶
對於透過依賴關係相關聯的任務,如果任務執行取決於其所有上游任務的失敗,您可以將 trigger_rule
設定為 TriggerRule.ALL_FAILED
,如果僅取決於其中一個上游任務的失敗,則設定為 TriggerRule.ONE_FAILED
。
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule
@task()
def a_func():
raise AirflowException
@task(
trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
pass
@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
a = a_func()
b = b_func()
a >> b
dag = my_dag()
有關更多資訊,請參閱 觸發規則。
如果任務沒有依賴關係關聯,您將需要 建立自訂運算子。
Airflow UI¶
為什麼我的任務失敗了,但在 UI 中沒有日誌?¶
日誌通常在任務達到終端狀態時提供。有時,任務的正常生命週期會被中斷,並且任務的 Worker 無法寫入任務的日誌。這通常發生在以下兩個原因之一:
殭屍任務.
在佇列中卡住後失敗的任務(Airflow 2.6.0+)。在佇列中停留時間超過 scheduler.task_queued_timeout 的任務將被標記為失敗,並且 Airflow UI 中將不會有任務日誌。
為每個任務設定重試次數可以大大降低這些問題影響工作流程的可能性。
如何停止每個 Web 伺服器多次發生的同步權限?¶
將 airflow.cfg
中 [fab] update_fab_perms
組態的值設定為 False
。
如何減少 Airflow UI 頁面載入時間?¶
如果您的 DAG 載入時間過長,您可以將 airflow.cfg
中的 default_dag_run_display_number
組態的值減小。此可組態項控制 UI 中顯示的 DAG 執行次數,預設值為 25
。
為什麼暫停 DAG 切換按鈕變成紅色?¶
如果由於任何原因暫停或取消暫停 DAG 失敗,DAG 切換按鈕將還原為其先前的狀態並變成紅色。如果您觀察到此行為,請嘗試再次暫停 DAG,或者如果問題再次發生,請檢查控制台或伺服器日誌。
MySQL 和 MySQL 變體資料庫¶
“MySQL Server has gone away” 代表什麼意思?¶
您可能偶爾會遇到帶有訊息 “MySQL Server has gone away” 的 OperationalError
。這是由於連線池將連線保持開啟太長時間,並且您被給予了一個已過期的舊連線。為了確保連線有效,您可以設定 sql_alchemy_pool_recycle (已棄用) 以確保連線在這麼多秒後失效並建立新的連線。
Airflow 是否支援擴充 ASCII 或 Unicode 字元?¶
如果您打算在 Airflow 中使用擴充 ASCII 或 Unicode 字元,則必須為 MySQL 資料庫提供正確的連線字串,因為它們明確定義了字元集。
sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8
您將遇到 WTForms
模板化和其他 Airflow 模組拋出的 UnicodeDecodeError
,如下所示。
'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)
如何修復例外:全域變數 explicit_defaults_for_timestamp
需要開啟 (1)?¶
這表示您的 MySQL 伺服器中停用了 explicit_defaults_for_timestamp
,您需要透過以下方式啟用它
在您的
my.cnf
檔案中,mysqld
區段下,設定explicit_defaults_for_timestamp = 1
。重新啟動 Mysql 伺服器。
Airflow 是否收集任何遙測資料?¶
Airflow 整合了 Scarf,以在運作期間收集基本使用情況資料。這些資料協助 Airflow 維護者更好地理解 Airflow 的使用方式。從這些資料中獲得的見解有助於優先處理修補程式、小版本發布和安全性修正。此外,這些資訊有助於支援與開發藍圖相關的關鍵決策。
部署可以選擇退出資料收集,透過將 [usage_data_collection] enabled 選項設定為 False
,或 SCARF_ANALYTICS=false
環境變數。個別使用者可以透過 Scarf「請勿追蹤」文件中記錄的各種方式輕鬆退出分析。
收集的遙測資料僅限於以下項目
Airflow 版本
Python 版本
作業系統與機器架構
執行器
Metadata 資料庫類型及其版本