排程器¶
Airflow 排程器監控所有任務和 DAG,然後在其依賴項完成後觸發任務實例。在幕後,排程器啟動一個子進程,該子進程監控並與指定 DAG 目錄中的所有 DAG 保持同步。預設情況下,排程器每分鐘收集 DAG 解析結果,並檢查是否可以觸發任何活動任務。
Airflow 排程器設計為在 Airflow 生產環境中作為持久性服務運行。要啟動它,您只需執行 airflow scheduler
命令即可。它使用 airflow.cfg
中指定的組態。
排程器使用已配置的 Executor 來執行已準備好的任務。
要啟動排程器,只需執行命令
airflow scheduler
一旦排程器成功運行,您的 DAG 將開始執行。
注意
第一個 DAG 運行是根據 DAG 中任務的最小 start_date
建立的。後續的 DAG 運行會根據 DAG 的 時間表 建立。
對於具有 cron 或 timedelta 排程的 DAG,排程器在它涵蓋的期間結束之前不會觸發您的任務。例如,schedule
設定為 @daily
的作業會在一天結束後運行。此技術確保該期間所需的任何資料在 DAG 執行之前都已完全可用。在 UI 中,Airflow 似乎讓您的任務延遲一天運行
注意
如果您以一天的 schedule
運行 DAG,則資料間隔從 2019-11-21
開始的運行會在 2019-11-21T23:59
之後觸發。
讓我們重複一遍,排程器會在間隔結束時,在開始日期之後的 schedule
運行您的作業。
您應該參考 DAG 運行 以取得有關排程 DAG 的詳細資訊。
注意
排程器設計用於高吞吐量。這是一個經過深思熟慮的設計決策,旨在盡快排程任務。排程器檢查池中有多少可用插槽,並在一次迭代中最多排程該數量的任務實例。這表示只有在等待排程的任務多於佇列插槽時,任務優先權才會生效。因此,如果低優先權任務與高優先權任務共享同一個批次,則可能會在後者之前排程前者。如需更多資訊,您可以參考 此 GitHub 討論。
DAG 檔案處理¶
您可以讓 Airflow 排程器負責啟動將 DAG 資料夾中包含的 Python 檔案轉換為 DAG 物件的進程,這些物件包含要排程的任務。
請參考 DAG 檔案處理 以取得有關如何實現此目標的詳細資訊
使用未來日期觸發 DAG¶
如果您想使用「外部觸發器」來運行未來日期的資料間隔,請在 airflow.cfg
的 scheduler
區段中設定 allow_trigger_in_future = True
。這僅在您的 DAG 定義為 schedule=None
時才有效。當設定為 False
(預設值)時,如果您手動觸發具有未來日期資料間隔的運行,排程器將不會執行它,直到其 data_interval_start
在過去。
執行多個排程器¶
Airflow 支援同時運行多個排程器 – 既為了效能原因,也為了彈性。
概觀¶
高可用性 (HA) 排程器旨在利用現有的元資料資料庫。這主要是為了操作簡單性而完成的:每個組件都必須與此資料庫對話,並且通過不在排程器之間使用直接通信或共識演算法(Raft、Paxos 等)或另一個共識工具(例如 Apache Zookeeper 或 Consul),我們將「操作介面」保持在最低限度。
排程器現在使用序列化的 DAG 表示來做出其排程決策,排程迴圈的粗略輪廓是
檢查是否有任何 DAG 需要新的 DagRun,並建立它們
檢查一批 DagRun 的可排程 TaskInstance 或已完成的 DagRun
選擇可排程的 TaskInstance,並在尊重池限制和其他並行限制的同時,將它們排入佇列以供執行
但是,這確實對資料庫提出了一些要求。
資料庫需求¶
簡而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的使用者都已準備就緒 – 您可以開始運行任意多個排程器副本 – 無需進一步設定或配置選項。如果您使用的是不同的資料庫,請繼續閱讀。
為了保持效能和吞吐量,排程迴圈有一個部分在記憶體中執行許多計算(因為對於每個 TaskInstance 都必須往返資料庫會太慢),因此我們需要確保一次只有一個排程器在此關鍵區段中 - 否則限制將無法正確地被尊重。為了實現這一點,我們使用資料庫列級鎖定(使用 SELECT ... FOR UPDATE
)。
此關鍵區段是 TaskInstance 從已排程狀態轉換並排入執行器佇列的地方,同時確保各種並行和池限制得到尊重。通過請求池表每一列的列級寫入鎖定來獲得關鍵區段(大致相當於 SELECT * FROM slot_pool FOR UPDATE NOWAIT
,但確切的查詢略有不同)。
以下資料庫完全支援並提供「最佳」體驗
PostgreSQL 12+
MySQL 8.0+
警告
MariaDB 直到 10.6.0 版本才實作 SKIP LOCKED
或 NOWAIT
SQL 子句。沒有這些功能,不支持運行多個排程器,並且已報告死鎖錯誤。MariaDB 10.6.0 及更高版本可能適用於多個排程器,但尚未經過測試。
注意
Microsoft SQL Server 尚未針對 HA 進行測試。
微調您的排程器效能¶
哪些因素影響排程器效能¶
排程器負責兩個操作
持續解析 DAG 檔案並與資料庫中的 DAG 同步
持續排程任務以供執行
這兩個任務由排程器並行執行,並在不同的進程中彼此獨立運行。為了微調您的排程器,您需要考慮許多因素
- 您擁有的部署類型
您有哪種檔案系統來共享 DAG(影響持續讀取 DAG 的效能)
檔案系統的速度有多快(在許多分佈式雲檔案系統的情況下,您可以額外付費以獲得更高的吞吐量/更快的檔案系統
您有多少記憶體用於處理
您有多少可用的 CPU
您有多少可用的網路吞吐量
- 您的 DAG 結構的邏輯和定義
您有多少 DAG 檔案
您的檔案中有多少 DAG
DAG 檔案有多大(請記住 DAG 解析器需要每 n 秒讀取和解析檔案)
它們有多複雜(即它們可以多快被解析,它們有多少任務和依賴項)
解析您的 DAG 檔案是否涉及在頂層匯入大量程式庫或繁重處理(提示!不應該這樣。請參閱 頂層 Python 程式碼)
- 排程器配置
您有多少個排程器
您的排程器中有多少個解析進程
排程器在重新解析同一個 DAG 之間等待多長時間(它持續發生)
排程器在一個迴圈中處理多少個任務實例
每個迴圈應該建立/排程多少個新的 DAG 運行
排程器應該多久執行一次清理並檢查孤立任務/採用它們
為了執行微調,最好了解排程器在底層是如何運作的。您可以查看 Airflow Summit 2021 演講 深入探討 Airflow 排程器演講 以執行微調。
如何進行排程器微調¶
Airflow 為您提供了許多「旋鈕」來微調效能,但這是一個獨立的任務,取決於您的特定部署、您的 DAG 結構、硬體可用性和期望,以決定轉動哪些旋鈕以獲得最佳效果。管理部署的部分工作是決定您要優化什麼。有些使用者可以接受新 DAG 解析延遲 30 秒,但以較低的 CPU 使用率為代價,而另一些使用者則希望 DAG 在 DAG 資料夾中出現時幾乎立即被解析,但以較高的 CPU 使用率為代價。
Airflow 讓您可以靈活地決定,但您應該找出對您而言最重要的效能方面,並決定您想要朝哪個方向轉動哪些旋鈕。
通常,對於微調,您的方法應該與任何效能改進和優化相同(我們不會推薦任何特定工具 - 只需使用您通常用於觀察和監控系統的工具)
使用您通常用於監控系統的正確工具集監控您的系統非常重要。本文檔不會詳細介紹您可以使用的特定指標和工具,它只是描述您應該監控哪種類型的資源,但您應該遵循監控的最佳實務來獲取正確的資料。
決定哪個效能方面對您最重要(您想要改進什麼)
觀察您的系統以查看您的瓶頸在哪裡:CPU、記憶體、I/O 是常見的限制因素
根據您的期望和觀察 - 決定您的下一個改進,然後回到效能、瓶頸的觀察。效能改進是一個迭代過程。
哪些資源可能限制排程器效能¶
您應該注意以下幾個資源使用領域
檔案系統效能。Airflow 排程器嚴重依賴解析(有時很多)Python 檔案,這些檔案通常位於共享檔案系統上。Airflow 排程器持續讀取和重新解析這些檔案。相同檔案必須提供給工作人員,因此它們通常儲存在分佈式檔案系統中。您可以為這些檔案系統使用各種參數並微調其效能,但這超出了本文檔的範圍。您應該觀察檔案系統的統計資訊和使用情況,以確定問題是否來自檔案系統效能。例如,有傳聞證據表明,當使用 EFS 時,增加 EFS 效能的 IOPS(並支付更多費用)可以顯著提高 Airflow DAG 解析的穩定性和速度。
如果檔案系統效能成為您的瓶頸,另一種解決方案是轉向分發 DAG 的替代機制。將 DAG 嵌入到您的映像和 GitSync 分發中都具有以下特性:檔案在本地可用於排程器,並且它不必使用分佈式檔案系統來讀取檔案,檔案在本地可用於排程器,並且通常盡可能快,特別是如果您的機器使用快速 SSD 磁碟進行本機儲存。這些分發機制具有其他特性,可能使其不是您的最佳選擇,但如果您的效能問題來自於分佈式檔案系統效能,它們可能是最佳方法。
當您想要提高效能並並行處理更多事物時,資料庫連線和資料庫使用量可能會成為問題。Airflow 以「資料庫連線飢渴」而聞名 – 您擁有的 DAG 越多,您想要並行處理的越多,將開啟的資料庫連線就越多。這通常對於 MySQL 來說不是問題,因為其處理連線的模型是基於線程的,但對於 Postgres 來說,這可能是一個問題,因為連線處理是基於進程的。普遍認為,如果您有即使是中等規模的基於 Postgres 的 Airflow 安裝,最佳解決方案是使用 PGBouncer 作為資料庫的代理。Helm Chart for Apache Airflow 開箱即用地支援 PGBouncer。
CPU 使用率對於 FileProcessor 最為重要 – 這些是解析和執行 Python DAG 檔案的進程。由於排程器持續觸發這種解析,當您有很多 DAG 時,處理可能會佔用大量 CPU。您可以通過增加 min_file_process_interval 來緩解它,但這是提到的權衡之一,結果是此類檔案的更改將被較慢地拾取,並且您將看到提交檔案與在 Airflow UI 中獲得它們並由排程器執行之間存在延遲。優化 DAG 的建構方式,避免外部資料來源是提高 CPU 使用率的最佳方法。如果您有更多可用的 CPU,您可以增加處理線程數 parsing_processes。此外,Airflow 排程器幾乎可以隨著多個實例線性擴展,因此如果您的排程器效能受 CPU 限制,您也可以新增更多排程器。
當您嘗試從 Airflow 獲得更高效能時,Airflow 可能會使用相當大量的記憶體。通常,通過增加處理負載的進程數來在 Airflow 中實現更高的效能,並且每個進程都需要載入整個 Python 解譯器、匯入大量類別、臨時記憶體儲存。Airflow 通過使用分叉和寫時複製記憶體來優化很多,但如果在分叉後匯入新類別,這可能會導致額外的記憶體壓力。您需要觀察您的系統是否正在使用超過其擁有的記憶體 – 這會導致使用交換磁碟,從而顯著降低效能。請注意,在
2.1.4
之前的版本中,Airflow 排程器產生大量日誌檔案使用的Page Cache
記憶體(當日誌檔案未被刪除時)。這通常是無害的,因為記憶體只是快取,並且可以隨時被系統回收,但是,在2.1.4
及更高版本中,寫入日誌將不會產生過多的Page Cache
記憶體。無論如何 - 確保在查看記憶體使用情況時,注意您正在觀察的記憶體類型。通常您應該查看working memory``(名稱 可能會 因您的部署而異) 而不是 ``total memory used
。
您可以做些什麼來提升排程器效能¶
當您知道您的資源使用情況時,您可以考慮的改進可能是
改進邏輯、解析效率並降低頂層 DAG Python 程式碼的複雜性。它會持續解析,因此優化該程式碼可能會帶來巨大的改進,特別是當您嘗試在解析 DAG 時接觸某些外部資料庫等時(應不惜一切代價避免這種情況)。頂層 Python 程式碼 解釋了編寫頂層 Python 程式碼的最佳實務。降低 DAG 複雜性 文件提供了一些您可能希望在降低程式碼複雜性時查看的領域。
提高資源利用率。當您的系統中有似乎未充分利用的空閒容量時(再次強調 CPU、記憶體 I/O、網路是主要候選者) - 您可以採取諸如增加排程器數量、解析進程或減少間隔以進行更頻繁的操作等措施,這可能會以更高的資源利用率為代價帶來效能提升。
增加硬體容量(例如,如果您發現 CPU 限制了您或您用於 DAG 檔案系統的 I/O 已達到其限制)。通常,排程器效能的問題僅僅是因為您的系統不夠「有能力」,這可能是唯一的方法。例如,如果您看到您正在使用機器上的所有 CPU,您可能希望在新機器上新增另一個排程器 - 在大多數情況下,當您新增第二個或第三個排程器時,排程容量會線性增長(除非共享資料庫或檔案系統是瓶頸)。
試驗「排程器可調參數」的不同值。通常,您可能通過簡單地將一個效能方面換成另一個效能方面來獲得更好的效果。例如,如果您想降低 CPU 使用率,您可以增加檔案處理間隔(但結果是新 DAG 將以更大的延遲出現)。通常,效能調優是一門平衡不同方面的藝術。
有時您會稍微更改排程器行為(例如,更改解析排序順序),以便為您的特定部署獲得更好的微調結果。
排程器配置選項¶
以下配置設定可用於控制排程器的各個方面。但是,您也可以查看 配置參考 的 [scheduler]
區段中提供的其他與效能無關的排程器配置參數。
max_dagruns_to_create_per_loop
這會更改每個排程器在建立 DAG 運行時鎖定的 DAG 數量。降低此設定的一個可能原因是,如果您有巨大的 DAG(每個 DAG 的任務數超過 10k),並且正在運行多個排程器,您不希望一個排程器完成所有工作。
max_dagruns_per_loop_to_schedule
在排程和排隊任務時,排程器應該檢查(和鎖定)多少個 DagRun。增加此限制將允許較小 DAG 的更高吞吐量,但可能會降低較大(例如超過 500 個任務)DAG 的吞吐量。當使用多個排程器時,將此設定設定得太高也可能導致一個排程器佔用所有 DAG 運行,而沒有為其他排程器留下任何工作。
-
排程器是否應該在相關查詢中發出
SELECT ... FOR UPDATE
。如果將其設定為 False,則您不應一次運行多個排程器。 -
池使用情況統計資訊應多久(以秒為單位)發送到 StatsD(如果已啟用 statsd_on)。這是一個相對昂貴的查詢來計算此值,因此應將其設定為與您的 StatsD 彙總週期相同的週期。
-
排程器應多久(以秒為單位)檢查孤立任務或已失效的 SchedulerJob。
此設定控制如何注意到已失效的排程器以及它「監督」的任務被另一個排程器拾取。任務將保持運行,因此在一段時間內未檢測到此情況沒有任何危害。
當 SchedulerJob 被檢測為「已失效」(由 scheduler_health_check_threshold 確定)時,由此失效進程啟動的任何正在運行或已排隊的任務將被「採用」並改由這個排程器監控。
dag_dir_list_interval 多久(以秒為單位)掃描 DAG 目錄以查找新檔案。
file_parsing_sort_mode 排程器將列出和排序 DAG 檔案以決定解析順序。
max_tis_per_query 排程主迴圈中查詢的批次大小。這不應大於
core.parallelism
。如果此值太高,則 SQL 查詢效能可能會受到查詢謂詞的複雜性和/或過度鎖定的影響。此外,您可能會達到資料庫允許的最大查詢長度。將此設定為 0 以使用
core.parallelism
的值。min_file_process_interval DAG 檔案重新解析後的秒數。DAG 檔案每 min_file_process_interval 秒數解析一次。對 DAG 的更新將在此間隔後反映出來。保持此數字較低會增加 CPU 使用率。
parsing_processes 排程器可以並行運行多個進程來解析 DAG 檔案。這定義了將運行多少個進程。
scheduler_idle_sleep_time 控制排程器在迴圈之間休眠多長時間,但如果迴圈中沒有任何事情要做。即,如果它排程了某些內容,那麼它將立即開始下一個迴圈迭代。此參數名稱不佳(歷史原因),並且將在未來重命名,並棄用當前名稱。
schedule_after_task_execution 任務監管進程是否應執行「迷你排程器」以嘗試排程同一 DAG 的更多任務。保持此設定開啟將意味著同一 DAG 中的任務執行得更快,但在某些情況下可能會使其他 DAG 陷入困境。