指標配置

Airflow 可以設定為將指標發送到 StatsDOpenTelemetry

設定 - StatsD

若要使用 StatsD,您必須先安裝必要的套件

pip install 'apache-airflow[statsd]'

然後將以下幾行新增至您的設定檔,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果您想要使用自訂的 StatsD 用戶端,而不是 Airflow 提供的預設用戶端,則必須將以下金鑰新增至設定檔中,並同時加入自訂 StatsD 用戶端的模組路徑。此模組必須在您的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱模組管理

設定 - OpenTelemetry

若要使用 OpenTelemetry,您必須先安裝必要的套件

pip install 'apache-airflow[otel]'

將以下幾行新增至您的設定檔,例如 airflow.cfg

[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

啟用 Https

若要建立與 OpenTelemetry 收集器的 HTTPS 連線,您需要在 OpenTelemetry 收集器的 config.yml 檔案中設定 SSL 憑證和金鑰。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允許/封鎖清單

如果您想要避免發送所有可用的指標,您可以設定允許清單或封鎖清單的前綴,以僅發送或封鎖以清單元素開頭的指標

[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery

重新命名指標

如果您想要將指標重新導向到不同的名稱,您可以在 [metrics] 區段中設定 stat_name_handler 選項。它應該指向一個函數,該函數驗證 stat 名稱,在必要時對 stat 名稱應用變更,並傳回轉換後的 stat 名稱。該函數可能如下所示

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他設定選項

注意

有關指標設定選項的詳細列表,請參閱設定參考文件 - [metrics]

指標描述

計數器

名稱

描述

<job_name>_start

已啟動的 <job_name> 工作數量,例如 SchedulerJobLocalTaskJob

<job_name>_end

已結束的 <job_name> 工作數量,例如 SchedulerJobLocalTaskJob

<job_name>_heartbeat_failure

<job_name> 工作的心跳失敗次數,例如 SchedulerJobLocalTaskJob

local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>

在執行 DAG <dag_id> 的任務 <task_id> 時,<return_code>LocalTaskJob 終止次數。

local_task_job.task_exit

在執行 DAG <dag_id> 的任務 <task_id> 時,<return_code>LocalTaskJob 終止次數。具有 job_id、dag_id、task_id 和 return_code 標籤的指標。

operator_failures_<operator_name>

Operator <operator_name> 失敗次數

operator_failures

Operator <operator_name> 失敗次數。具有 operator_name 標籤的指標。

operator_successes_<operator_name>

Operator <operator_name> 成功次數

operator_successes

Operator <operator_name> 成功次數。具有 operator_name 標籤的指標。

ti_failures

整體任務實例失敗次數。具有 dag_id 和 task_id 標籤的指標。

ti_successes

整體任務實例成功次數。具有 dag_id 和 task_id 標籤的指標。

previously_succeeded

先前成功任務實例的數量。具有 dag_id 和 task_id 標籤的指標。

zombies_killed

已終止的殭屍任務。具有 dag_id 和 task_id 標籤的指標。

scheduler_heartbeat

排程器心跳

dag_processing.processes

目前正在執行的 DAG 解析程序之相對數量(即,自上次發送指標以來,當程序已完成時,此增量為負數)。具有 file_path 和 action 標籤的指標。

dag_processing.processor_timeouts

由於耗時過長而被終止的檔案處理器數量。具有 file_path 標籤的指標。

dag_processing.sla_callback_count

收到的 SLA 回呼數量

dag_processing.other_callback_count

收到的非 SLA 回呼數量

dag_processing.file_path_queue_update_count

我們掃描檔案系統並將所有現有 DAG 排入佇列的次數

dag_file_processor_timeouts

(已棄用) 行為與 dag_processing.processor_timeouts 相同

dag_processing.manager_stalls

停滯的 DagFileProcessorManager 數量

dag_file_refresh_error

載入任何 DAG 檔案的失敗次數

scheduler.tasks.killed_externally

外部終止的任務數量。具有 dag_id 和 task_id 標籤的指標。

scheduler.orphaned_tasks.cleared

排程器清除的孤立任務數量

scheduler.orphaned_tasks.adopted

排程器採用的孤立任務數量

scheduler.critical_section_busy

排程器程序嘗試取得關鍵區段鎖定(將任務發送到執行器所需)並發現它被另一個程序鎖定的次數計數。

sla_missed

SLA 錯失次數。具有 dag_id 和 task_id 標籤的指標。

sla_callback_notification_failure

失敗的 SLA 錯失回呼通知嘗試次數。具有 dag_id 和 func_name 標籤的指標。

sla_email_notification_failure

失敗的 SLA 錯失電子郵件通知嘗試次數。具有 dag_id 標籤的指標。

ti.start.<dag_id>.<task_id>

給定 DAG 中已啟動的任務數量。與 <job_name>_start 類似,但用於任務

ti.start

給定 DAG 中已啟動的任務數量。與 <job_name>_start 類似,但用於任務。具有 dag_id 和 task_id 標籤的指標。

ti.finish.<dag_id>.<task_id>.<state>

給定 DAG 中已完成的任務數量。與 <job_name>_end 類似,但用於任務

ti.finish

給定 DAG 中已完成的任務數量。與 <job_name>_end 類似,但用於任務。具有 dag_id 和 task_id 標籤的指標。

dag.callback_exceptions

從 DAG 回呼引發的例外數量。當發生這種情況時,表示 DAG 回呼無法運作。具有 dag_id 標籤的指標

celery.task_timeout_error

將任務發布到 Celery Broker 時引發的 AirflowTaskTimeout 錯誤數量。

celery.execute_command.failure

Celery 任務的非零結束代碼數量。

task_removed_from_dag.<dag_id>

給定 DAG 中已移除的任務數量(即任務不再存在於 DAG 中)。

task_removed_from_dag

給定 DAG 中已移除的任務數量(即任務不再存在於 DAG 中)。具有 dag_id 和 run_type 標籤的指標。

task_restored_to_dag.<dag_id>

給定 DAG 中已還原的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務實例已新增至 DAG 檔案)

task_restored_to_dag.<dag_id>

給定 DAG 中已還原的任務數量(即先前在資料庫中處於 REMOVED 狀態的任務實例已新增至 DAG 檔案)。具有 dag_id 和 run_type 標籤的指標。

task_instance_created_<operator_name>

為給定的 Operator 建立的任務實例數量

task_instance_created

為給定的 Operator 建立的任務實例數量。具有 dag_id 和 run_type 標籤的指標。

triggerer_heartbeat

觸發器心跳

triggers.blocked_main_thread

封鎖主執行緒的觸發器數量(可能是由於未完全非同步)

triggers.failed

在觸發事件之前發生錯誤的觸發器數量

triggers.succeeded

已觸發至少一個事件的觸發器數量

dataset.updates

已更新資料集的數量

dataset.orphaned

由於不再在 DAG 排程參數或任務輸出口中引用,而被標記為孤立的資料集數量

dataset.triggered_dagruns

由資料集更新觸發的 DAG 執行次數

儀表

名稱

描述

dagbag_size

當排程器根據其設定執行掃描時找到的 DAG 數量

dag_processing.import_errors

嘗試解析 DAG 檔案時發生的錯誤數量

dag_processing.total_parse_time

掃描和匯入 dag_processing.file_path_queue_size 個 DAG 檔案所花費的秒數

dag_processing.file_path_queue_size

要在下次掃描中考慮的 DAG 檔案數量

dag_processing.last_run.seconds_ago.<dag_file>

自上次處理 <dag_file> 以來的秒數

dag_processing.last_num_of_db_queries.<dag_file>

每次解析 <dag_file> 時對 Airflow 資料庫的查詢次數

scheduler.tasks.starving

由於資源池中沒有可用槽位而無法排程的任務數量

scheduler.tasks.executable

考慮到資源池限制、DAG 並行性、執行器狀態和優先順序,準備好執行(設定為已排隊)的任務數量。

executor.open_slots.<executor_class_name>

特定執行器上的可用槽位數量。僅在設定多個執行器時發出。

executor.open_slots

執行器上的可用槽位數量

executor.queued_tasks.<executor_class_name>

特定執行器上已排隊的任務數量。僅在設定多個執行器時發出。

executor.queued_tasks

執行器上已排隊的任務數量

executor.running_tasks.<executor_class_name>

特定執行器上正在執行的任務數量。僅在設定多個執行器時發出。

executor.running_tasks

執行器上正在執行的任務數量

pool.open_slots.<pool_name>

資源池中的可用槽位數量

pool.open_slots

資源池中的可用槽位數量。具有 pool_name 標籤的指標。

pool.queued_slots.<pool_name>

資源池中已排隊的槽位數量

pool.queued_slots

資源池中已排隊的槽位數量。具有 pool_name 標籤的指標。

pool.running_slots.<pool_name>

資源池中正在執行的槽位數量

pool.running_slots

資源池中正在執行的槽位數量。具有 pool_name 標籤的指標。

pool.deferred_slots.<pool_name>

資源池中延遲的槽位數量

pool.deferred_slots

資源池中延遲的槽位數量。具有 pool_name 標籤的指標。

pool.scheduled_slots.<pool_name>

資源池中已排程的槽位數量

pool.scheduled_slots

資源池中已排程的槽位數量。具有 pool_name 標籤的指標。

pool.starving_tasks.<pool_name>

資源池中飢餓的任務數量

pool.starving_tasks

資源池中飢餓的任務數量。具有 pool_name 標籤的指標。

task.cpu_usage.<dag_id>.<task_id>

任務使用的 CPU 百分比

task.mem_usage.<dag_id>.<task_id>

任務使用的記憶體百分比

triggers.running.<hostname>

目前為觸發器執行的觸發器數量(由主機名稱描述)

triggers.running

目前為觸發器執行的觸發器數量(由主機名稱描述)。具有 hostname 標籤的指標。

計時器

名稱

描述

dagrun.dependency-check.<dag_id>

檢查 DAG 相依性所花費的毫秒數

dagrun.dependency-check

檢查 DAG 相依性所花費的毫秒數。具有 dag_id 標籤的指標。

dag.<dag_id>.<task_id>.duration

執行任務所花費的毫秒數

task.duration

執行任務所花費的毫秒數。具有 dag_id 和 task-id 標籤的指標。

dag.<dag_id>.<task_id>.scheduled_duration

任務在排程狀態中花費的毫秒數,然後才被排入佇列

task.scheduled_duration

任務在排程狀態中花費的毫秒數,然後才被排入佇列。具有 dag_id 和 task_id 標籤的指標。

dag.<dag_id>.<task_id>.queued_duration

任務在佇列狀態中花費的毫秒數,然後才變成執行中

task.queued_duration

任務在佇列狀態中花費的毫秒數,然後才變成執行中。具有 dag_id 和 task_id 標籤的指標。

dag_processing.last_duration.<dag_file>

載入給定 DAG 檔案所花費的毫秒數

dag_processing.last_duration

載入給定 DAG 檔案所花費的毫秒數。具有 file_name 標籤的指標。

dagrun.duration.success.<dag_id>

DagRun 達到成功狀態所花費的毫秒數

dagrun.duration.success

DagRun 達到成功狀態所花費的毫秒數。具有 dag_id 和 run_type 標籤的指標。

dagrun.duration.failed.<dag_id>

DagRun 達到失敗狀態所花費的毫秒數

dagrun.duration.failed

DagRun 達到失敗狀態所花費的毫秒數。具有 dag_id 和 run_type 標籤的指標。

dagrun.schedule_delay.<dag_id>

排程的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數

dagrun.schedule_delay

排程的 DagRun 開始日期與實際 DagRun 開始日期之間的延遲毫秒數。具有 dag_id 標籤的指標。

scheduler.critical_section_duration

在排程器迴圈的關鍵區段中花費的毫秒數 – 一次只有一個排程器可以進入此迴圈

scheduler.critical_section_query_duration

執行關鍵區段任務實例查詢所花費的毫秒數

scheduler.scheduler_loop_duration

執行一個排程器迴圈所花費的毫秒數

dagrun.<dag_id>.first_task_scheduling_delay

第一個任務開始日期與 dagrun 預期開始時間之間經過的毫秒數

dagrun.first_task_scheduling_delay

第一個任務開始日期與 dagrun 預期開始時間之間經過的毫秒數。具有 dag_id 和 run_type 標籤的指標。

collect_db_dags

從資料庫擷取所有序列化 DAG 所花費的毫秒數

kubernetes_executor.clear_not_launched_queued_tasks.duration

在 Kubernetes Executor 中清除未啟動的佇列任務所花費的毫秒數

kubernetes_executor.adopt_task_instances.duration

在 Kubernetes Executor 中採用任務實例所花費的毫秒數

這個條目有幫助嗎?