任務日誌

Airflow 以允許您在 Airflow UI 中分別查看每個任務的日誌的方式寫入任務日誌。核心 Airflow 提供了一個介面 FileTaskHandler,它將任務日誌寫入檔案,並包含一個機制,以便在任務執行時從工作節點提供這些日誌。Apache Airflow 社群也發布了許多服務的提供者(提供者套件),其中一些提供者提供了擴展 Apache Airflow 日誌記錄功能的處理器。您可以在寫入日誌中查看所有這些提供者。

當使用 S3、GCS、WASB 或 OSS 遠端日誌記錄服務時,您可以在將本機日誌檔案上傳到遠端位置後將其刪除,方法是設定組態

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

設定日誌記錄

對於預設處理器 FileTaskHandler,您可以使用 base_log_folderairflow.cfg 中指定放置日誌檔案的目錄。預設情況下,日誌放置在 AIRFLOW_HOME 目錄中。

注意

有關設定組態的更多資訊,請參閱設定組態選項

在命名任務的日誌檔案時,遵循預設模式

  • 對於一般任務:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 對於動態映射任務:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

這些模式可以透過log_filename_template進行調整。

此外,您可以提供遠端位置來儲存目前的日誌和備份。

從您的程式碼寫入任務日誌

Airflow 使用標準 Python logging 框架來寫入日誌,並且在任務執行期間,根記錄器配置為寫入任務的日誌。

大多數運算子會自動將日誌寫入任務日誌。這是因為它們有一個 log 記錄器,您可以利用它來寫入任務日誌。此記錄器由所有運算子繼承的 LoggingMixin 建立和配置。但由於根記錄器處理,任何將日誌記錄傳播到根目錄的標準記錄器(使用預設設定)也會寫入任務日誌。

因此,如果您想從您的自訂程式碼記錄到任務日誌,您可以執行以下任何操作

  • 使用 BaseOperator 中的 self.log 記錄器進行記錄

  • 使用標準 print 語句列印到 stdout(不建議,但在某些情況下可能很有用)

  • 使用標準記錄器方法,即使用 Python 模組名稱建立記錄器,並使用它寫入任務日誌

這是記錄器直接在 Python 程式碼中使用的常用方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日誌行的分組

2.9.0 版本新增。

像 CI 管道一樣,Airflow 日誌也可能非常龐大,並且變得難以閱讀。因此,有時將日誌區域的部分分組並提供文字區域的摺疊以隱藏不相關的內容會很有用。因此,Airflow 實作了相容的日誌訊息分組,如 GithubAzure DevOps,以便可以摺疊文字區域。實作的方案是相容的,因此在 CI 中產生輸出的工具可以直接在 Airflow 中利用相同的體驗。

透過新增帶有起始和結束位置的日誌標記(例如,如下所示),可以對日誌訊息進行分組

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

在 Web UI 中顯示日誌時,日誌的顯示將會被簡化

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果您點擊日誌文字標籤,將會顯示詳細的日誌內容。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日誌的交錯

Airflow 的遠端任務日誌記錄處理器大致可以分為兩類:串流處理器(例如 ElasticSearch、AWS Cloudwatch 和 GCP operations logging,前身為 stackdriver)和 Blob 儲存處理器(例如 S3、GCS、WASB)。

對於 Blob 儲存處理器,根據任務的狀態,日誌可能位於許多不同的位置和多個不同的檔案中。因此,我們需要檢查所有位置並交錯我們找到的內容。為此,我們需要能夠解析每行的時間戳記。如果您使用的是自訂格式器,您可能需要透過在 Airflow 設定 [logging] interleave_timestamp_parser 中提供可呼叫的名稱來覆寫預設解析器。

對於串流處理器,無論任務階段或執行位置如何,所有日誌訊息都可以使用相同的識別碼傳送到日誌記錄服務,因此一般來說,不需要檢查多個來源並交錯。

疑難排解

如果您想檢查目前設定的任務處理器,您可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 輸出被截斷,僅顯示與日誌記錄組態相關的部分。您也可以執行 airflow config list 以檢查日誌記錄組態選項是否具有有效值。

進階設定

您可以設定進階功能 - 包括新增您自己的自訂任務日誌處理器(以及所有 airflow 組件的日誌處理器),以及為每個運算子、Hook 和任務建立自訂日誌處理器。

從工作節點和觸發器伺服日誌

大多數任務處理器會在任務完成後傳送日誌。為了即時查看日誌,Airflow 會啟動 HTTP 伺服器以在以下情況下伺服日誌

  • 如果使用 SequentialExecutorLocalExecutor,則在 airflow scheduler 執行時。

  • 如果使用 CeleryExecutor,則在 airflow worker 執行時。

在觸發器中,除非服務以 --skip-serve-logs 選項啟動,否則會伺服日誌。

伺服器在 [logging] 區段中的 worker_log_server_port 選項以及觸發器的 triggerer_log_server_port 選項指定的埠上執行。預設值分別為 8793 和 8794。Webserver 和工作節點之間的通訊使用 [webserver] 區段中 secret_key 選項指定的金鑰簽署。您必須確保金鑰匹配,以便通訊可以順利進行。

我們使用 Gunicorn 作為 WSGI 伺服器。其組態選項可以使用 GUNICORN_CMD_ARGS 環境變數覆寫。有關詳細資訊,請參閱 Gunicorn 設定

實作自訂檔案任務處理器

注意

這是一個進階主題,大多數使用者應該能夠直接使用 寫入日誌 中的現有處理器。

在我們的提供者中,我們有各種健全的選項,包括所有主要的雲端提供者。但是,如果您需要使用不同的服務實作日誌記錄,並且您決定實作自訂 FileTaskHandler,則需要注意一些設定,尤其是在觸發器日誌記錄的上下文中。

觸發器需要改變日誌記錄的設定方式。與任務相反,許多觸發器在同一個進程中執行,並且對於觸發器,由於它們在 asyncio 中執行,我們必須注意不要透過日誌記錄處理器引入封鎖呼叫。並且由於處理器行為的變化(有些寫入檔案,有些上傳到 Blob 儲存,有些在訊息到達時透過網路傳送訊息,有些在執行緒中執行此操作),我們需要某種方法讓觸發器知道如何使用它們。

為了實現這一點,我們在處理器上設定了一些屬性,可以是實例或類別。繼承不適用於這些參數,因為 FileTaskHandler 的子類別可能在相關特性上與其不同。這些參數描述如下

  • trigger_should_wrap:控制此處理器是否應由 TriggerHandlerWrapper 包裝。當處理器的每個實例建立一個檔案處理器,並將所有訊息寫入其中時,這是必要的。

  • trigger_should_queue:控制觸發器是否應在事件迴圈和處理器之間放置 QueueListener,以確保處理器中的封鎖 IO 不會中斷事件迴圈。

  • trigger_send_end_marker:控制是否應在觸發器完成時將 END 訊號傳送到記錄器。它用於告知包裝器關閉並移除特定於剛完成的觸發器的個別檔案處理器。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 不是 True,我們通常假設處理器不支援觸發器。但是如果在這種情況下,處理器將 trigger_supported 設定為 True,那麼我們仍然會在觸發器啟動時將處理器移動到根目錄,以便它處理觸發器訊息。基本上,對於「原生」支援觸發器的處理器,這應該為 True。其中一個範例是 StackdriverTaskHandler。

這個條目有幫助嗎?