airflow.providers.opensearch.log.os_task_handler
¶
模組內容¶
類別¶
OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。 |
屬性¶
- class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]¶
繼承自:
airflow.utils.log.file_task_handler.FileTaskHandler
,airflow.utils.log.logging_mixin.ExternalLoggingMixin
,airflow.utils.log.logging_mixin.LoggingMixin
OpensearchTaskHandler 是一個 Python 日誌處理器,用於讀取和寫入日誌到 OpenSearch。
如同 ElasticsearchTaskHandler,Airflow 本身不處理日誌的索引。相反地,日誌會刷新到本地檔案,可能需要額外的軟體(例如 Filebeat、Logstash)將日誌傳送到 OpenSearch。這個處理器接著啟用從 OpenSearch 提取和顯示日誌的功能。
為了有效率地查詢和排序 Elasticsearch 結果,此處理器假設每個日誌訊息都有一個名為 log_id 的欄位,由任務執行個體的主鍵組成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息會根據 offset 排序,offset 是一個唯一的整數,表示日誌訊息的順序。此處的時間戳記不可靠,因為多個日誌訊息可能具有相同時間戳記。
- 參數
base_log_folder (str) – 本機儲存日誌的基礎資料夾。
end_of_log_mark (str) – 標記日誌結束的字串。
write_stdout (bool) – 是否同時將日誌寫入 stdout。
json_format (bool) – 是否將日誌格式化為 JSON。
json_fields (str) – 要包含在 JSON 日誌輸出中的欄位,以逗號分隔的列表。
host (str) – OpenSearch 主機名稱。
port (int) – OpenSearch 埠號。
username (str) – 用於 OpenSearch 驗證的使用者名稱。
password (str) – 用於 OpenSearch 驗證的密碼。
host_field (str) – 日誌中主機的欄位名稱(預設為 “host”)。
offset_field (str) – 日誌偏移量的欄位名稱(預設為 “offset”)。
index_patterns (str) – 用於儲存日誌的索引模式或範本。
index_patterns_callable (str) – 可調用物件,根據上下文動態產生索引模式。
os_kwargs (dict | None | Literal[default_os_kwargs]) – 額外的 OpenSearch 用戶端選項。可以設定為 “default_os_kwargs” 以載入 Airflow 設定中的預設組態。
- set_context(ti, *, identifier=None)[source]¶
為 airflow 任務處理器提供 task_instance 上下文。
- 參數
ti (airflow.models.taskinstance.TaskInstance) – 任務執行個體物件
identifier (str | None) – 如果設定,則識別從與任務執行個體相關的異常情況中繼日誌的 Airflow 組件