airflow.providers.opensearch.log.os_task_handler

模組內容

類別

OpensearchTaskHandler

OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。

屬性

USE_PER_RUN_LOG_ID

OsLogMsgType

LOG_LINE_DEFAULTS

airflow.providers.opensearch.log.os_task_handler.USE_PER_RUN_LOG_ID[source]
airflow.providers.opensearch.log.os_task_handler.OsLogMsgType[source]
airflow.providers.opensearch.log.os_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.opensearch.log.os_task_handler.get_os_kwargs_from_config()[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]

繼承自: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。

如同 ElasticsearchTaskHandler,Airflow 本身不處理日誌的索引。相反地,日誌會刷新到本地檔案,可能需要額外的軟體(例如 Filebeat、Logstash)將日誌傳送到 OpenSearch。這個處理器接著啟用從 OpenSearch 提取和顯示日誌的功能。

為了有效率地查詢和排序 Elasticsearch 結果,此處理器假設每個日誌訊息都有一個名為 log_id 的欄位,由任務執行個體的主鍵組成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息會根據 offset 排序,offset 是一個唯一的整數,表示日誌訊息的順序。此處的時間戳記不可靠,因為多個日誌訊息可能具有相同時間戳記。

參數
  • base_log_folder (str) – 本機儲存日誌的基礎資料夾。

  • end_of_log_mark (str) – 標記日誌結束的字串。

  • write_stdout (bool) – 是否同時將日誌寫入 stdout。

  • json_format (bool) – 是否將日誌格式化為 JSON。

  • json_fields (str) – 要包含在 JSON 日誌輸出中的欄位,以逗號分隔的列表。

  • host (str) – OpenSearch 主機名稱。

  • port (int) – OpenSearch 埠號。

  • username (str) – 用於 OpenSearch 驗證的使用者名稱。

  • password (str) – 用於 OpenSearch 驗證的密碼。

  • host_field (str) – 日誌中主機的欄位名稱(預設為 “host”)。

  • offset_field (str) – 日誌偏移量的欄位名稱(預設為 “offset”)。

  • index_patterns (str) – 用於儲存日誌的索引模式或範本。

  • index_patterns_callable (str) – 可調用物件,根據上下文動態產生索引模式。

  • os_kwargs (dict | None | Literal[default_os_kwargs]) – 額外的 OpenSearch 用戶端選項。可以設定為 “default_os_kwargs” 以載入 Airflow 設定中的預設組態。

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Opensearch'[source]
trigger_should_wrap = True[source]
set_context(ti, *, identifier=None)[source]

為 airflow 任務處理器提供 task_instance 上下文。

參數
emit(record)[source]

執行任何必要操作以實際記錄指定的日誌記錄。

此版本旨在由子類別實作,因此會引發 NotImplementedError。

close()[source]

整理處理器使用的任何資源。

此版本從處理器的內部地圖 _handlers 中移除處理器,該地圖用於依名稱查找處理器。子類別應確保從覆寫的 close() 方法中呼叫此方法。

這個條目有幫助嗎?