指標配置¶
Airflow 可以設定為將指標發送到 StatsD 或 OpenTelemetry。
設定 - StatsD¶
若要使用 StatsD,您必須先安裝必要的套件
pip install 'apache-airflow[statsd]'
然後將以下幾行新增至您的設定檔,例如 airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
如果您想要使用自訂的 StatsD 用戶端,而不是 Airflow 提供的預設用戶端,則必須將以下金鑰新增至設定檔中,並同時加入自訂 StatsD 用戶端的模組路徑。此模組必須在您的 PYTHONPATH
中可用。
[metrics]
statsd_custom_client_path = x.y.customclient
有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱模組管理。
設定 - OpenTelemetry¶
若要使用 OpenTelemetry,您必須先安裝必要的套件
pip install 'apache-airflow[otel]'
將以下幾行新增至您的設定檔,例如 airflow.cfg
[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000 # The interval between exports, defaults to 60000
otel_ssl_active = False
啟用 Https¶
若要建立與 OpenTelemetry 收集器的 HTTPS 連線,您需要在 OpenTelemetry 收集器的 config.yml
檔案中設定 SSL 憑證和金鑰。
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
tls:
cert_file: "/path/to/cert/cert.crt"
key_file: "/path/to/key/key.pem"
允許/封鎖清單¶
如果您想要避免發送所有可用的指標,您可以設定允許清單或封鎖清單的前綴,以僅發送或封鎖以清單元素開頭的指標
[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery
重新命名指標¶
如果您想要將指標重新導向到不同的名稱,您可以在 [metrics]
區段中設定 stat_name_handler
選項。它應該指向一個函數,該函數驗證 stat 名稱,在必要時對 stat 名稱應用變更,並傳回轉換後的 stat 名稱。該函數可能如下所示
def my_custom_stat_name_handler(stat_name: str) -> str:
return stat_name.lower()[:32]
指標描述¶
計數器¶
名稱 |
描述 |
---|---|
|
已啟動的 |
|
已結束的 |
|
|
|
在執行 DAG |
|
在執行 DAG |
|
Operator |
|
Operator |
|
Operator |
|
Operator |
|
整體任務實例失敗次數。具有 dag_id 和 task_id 標籤的指標。 |
|
整體任務實例成功次數。具有 dag_id 和 task_id 標籤的指標。 |
|
先前成功任務實例的數量。具有 dag_id 和 task_id 標籤的指標。 |
|
已終止的殭屍任務。具有 dag_id 和 task_id 標籤的指標。 |
|
排程器心跳 |
|
目前正在執行的 DAG 解析程序之相對數量(即,自上次發送指標以來,當程序已完成時,此增量為負數)。具有 file_path 和 action 標籤的指標。 |
|
由於耗時過長而被終止的檔案處理器數量。具有 file_path 標籤的指標。 |
|
收到的 SLA 回呼數量 |
|
收到的非 SLA 回呼數量 |
|
我們掃描檔案系統並將所有現有 DAG 排入佇列的次數 |
|
(已棄用) 行為與 |
|
停滯的 DagFileProcessorManager 數量 |
|
載入任何 DAG 檔案的失敗次數 |
|
外部終止的任務數量。具有 dag_id 和 task_id 標籤的指標。 |
|
排程器清除的孤立任務數量 |
|
排程器採用的孤立任務數量 |
|
排程器程序嘗試取得關鍵區段鎖定(將任務發送到執行器所需)並發現它被另一個程序鎖定的次數計數。 |
|
SLA 錯失次數。具有 dag_id 和 task_id 標籤的指標。 |
|
失敗的 SLA 錯失回呼通知嘗試次數。具有 dag_id 和 func_name 標籤的指標。 |
|
失敗的 SLA 錯失電子郵件通知嘗試次數。具有 dag_id 標籤的指標。 |
|
給定 DAG 中已啟動的任務數量。與 <job_name>_start 類似,但用於任務 |
|
給定 DAG 中已啟動的任務數量。與 <job_name>_start 類似,但用於任務。具有 dag_id 和 task_id 標籤的指標。 |
|
給定 DAG 中已完成的任務數量。與 <job_name>_end 類似,但用於任務 |
|
給定 DAG 中已完成的任務數量。與 <job_name>_end 類似,但用於任務。具有 dag_id 和 task_id 標籤的指標。 |
|
從 DAG 回呼引發的例外數量。當發生這種情況時,表示 DAG 回呼無法運作。具有 dag_id 標籤的指標 |
|
將任務發布到 Celery Broker 時引發的 AirflowTaskTimeout 錯誤數量。 |
|
Celery 任務的非零結束代碼數量。 |
|
給定 DAG 中已移除的任務數量(即任務不再存在於 DAG 中)。 |
|
給定 DAG 中已移除的任務數量(即任務不再存在於 DAG 中)。具有 dag_id 和 run_type 標籤的指標。 |
|
給定 DAG 中已還原的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務實例已新增至 DAG 檔案) |
|
給定 DAG 中已還原的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務實例已新增至 DAG 檔案)。具有 dag_id 和 run_type 標籤的指標。 |
|
為給定的 Operator 建立的任務實例數量 |
|
為給定的 Operator 建立的任務實例數量。具有 dag_id 和 run_type 標籤的指標。 |
|
觸發器心跳 |
|
封鎖主執行緒的觸發器數量(可能是由於未完全非同步) |
|
在觸發事件之前發生錯誤的觸發器數量 |
|
已觸發至少一個事件的觸發器數量 |
|
已更新資料集的數量 |
|
由於不再在 DAG 排程參數或任務輸出口中引用,而被標記為孤立的資料集數量 |
|
由資料集更新觸發的 DAG 執行次數 |
儀表¶
名稱 |
描述 |
---|---|
|
當排程器根據其設定執行掃描時找到的 DAG 數量 |
|
嘗試解析 DAG 檔案時發生的錯誤數量 |
|
掃描和匯入 |
|
要在下次掃描中考慮的 DAG 檔案數量 |
|
自上次處理 |
|
每次解析 |
|
由於資源池中沒有可用槽位而無法排程的任務數量 |
|
考慮到資源池限制、DAG 並行性、執行器狀態和優先順序,準備好執行(設定為已排隊)的任務數量。 |
|
特定執行器上的可用槽位數量。僅在設定多個執行器時發出。 |
|
執行器上的可用槽位數量 |
|
特定執行器上已排隊的任務數量。僅在設定多個執行器時發出。 |
|
執行器上已排隊的任務數量 |
|
特定執行器上正在執行的任務數量。僅在設定多個執行器時發出。 |
|
執行器上正在執行的任務數量 |
|
資源池中的可用槽位數量 |
|
資源池中的可用槽位數量。具有 pool_name 標籤的指標。 |
|
資源池中已排隊的槽位數量 |
|
資源池中已排隊的槽位數量。具有 pool_name 標籤的指標。 |
|
資源池中正在執行的槽位數量 |
|
資源池中正在執行的槽位數量。具有 pool_name 標籤的指標。 |
|
資源池中延遲的槽位數量 |
|
資源池中延遲的槽位數量。具有 pool_name 標籤的指標。 |
|
資源池中已排程的槽位數量 |
|
資源池中已排程的槽位數量。具有 pool_name 標籤的指標。 |
|
資源池中飢餓的任務數量 |
|
資源池中飢餓的任務數量。具有 pool_name 標籤的指標。 |
|
任務使用的 CPU 百分比 |
|
任務使用的記憶體百分比 |
|
目前為觸發器執行的觸發器數量(由主機名稱描述) |
|
目前為觸發器執行的觸發器數量(由主機名稱描述)。具有 hostname 標籤的指標。 |
計時器¶
名稱 |
描述 |
---|---|
|
檢查 DAG 相依性所花費的毫秒數 |
|
檢查 DAG 相依性所花費的毫秒數。具有 dag_id 標籤的指標。 |
|
執行任務所花費的毫秒數 |
|
執行任務所花費的毫秒數。具有 dag_id 和 task-id 標籤的指標。 |
|
任務在排程狀態中花費的毫秒數,然後才被排入佇列 |
|
任務在排程狀態中花費的毫秒數,然後才被排入佇列。具有 dag_id 和 task_id 標籤的指標。 |
|
任務在佇列狀態中花費的毫秒數,然後才變成執行中 |
|
任務在佇列狀態中花費的毫秒數,然後才變成執行中。具有 dag_id 和 task_id 標籤的指標。 |
|
載入給定 DAG 檔案所花費的毫秒數 |
|
載入給定 DAG 檔案所花費的毫秒數。具有 file_name 標籤的指標。 |
|
DagRun 達到成功狀態所花費的毫秒數 |
|
DagRun 達到成功狀態所花費的毫秒數。具有 dag_id 和 run_type 標籤的指標。 |
|
DagRun 達到失敗狀態所花費的毫秒數 |
|
DagRun 達到失敗狀態所花費的毫秒數。具有 dag_id 和 run_type 標籤的指標。 |
|
排程的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數 |
|
排程的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數。具有 dag_id 標籤的指標。 |
|
在排程器迴圈的關鍵區段中花費的毫秒數 – 一次只有一個排程器可以進入此迴圈 |
|
執行關鍵區段任務實例查詢所花費的毫秒數 |
|
執行一個排程器迴圈所花費的毫秒數 |
|
第一個任務開始日期與 dagrun 預期開始時間之間經過的毫秒數 |
|
第一個任務開始日期與 dagrun 預期開始時間之間經過的毫秒數。具有 dag_id 和 run_type 標籤的指標。 |
|
從資料庫擷取所有序列化 DAG 所花費的毫秒數 |
|
在 Kubernetes Executor 中清除未啟動的佇列任務所花費的毫秒數 |
|
在 Kubernetes Executor 中採用任務實例所花費的毫秒數 |