airflow.providers.elasticsearch.log.es_task_handler

模組內容

類別

ElasticsearchTaskHandler

ElasticsearchTaskHandler 是一個 Python 日誌處理器,用於從 Elasticsearch 讀取日誌。

屬性

LOG_LINE_DEFAULTS

EsLogMsgType

USE_PER_RUN_LOG_ID

VALID_ES_CONFIG_KEYS

airflow.providers.elasticsearch.log.es_task_handler.LOG_LINE_DEFAULTS[原始碼]
airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[原始碼]
airflow.providers.elasticsearch.log.es_task_handler.USE_PER_RUN_LOG_ID[原始碼]
airflow.providers.elasticsearch.log.es_task_handler.VALID_ES_CONFIG_KEYS[原始碼]
airflow.providers.elasticsearch.log.es_task_handler.get_es_kwargs_from_config()[原始碼]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host_field='host', offset_field='offset', host='https://127.0.0.1:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[原始碼]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler 是一個 Python 日誌處理器,用於從 Elasticsearch 讀取日誌。

請注意,Airflow 不處理將日誌索引到 Elasticsearch 中。相反地,Airflow 將日誌刷新到本地檔案中。需要額外的軟體設定才能將日誌索引到 Elasticsearch 中,例如使用 Filebeat 和 Logstash。

為了有效率地查詢和排序 Elasticsearch 結果,此處理器假設每個日誌訊息都有一個名為 log_id 的欄位,其中包含任務執行個體的主鍵:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日誌訊息會根據 offset 排序,offset 是一個唯一的整數,表示日誌訊息的順序。此處的時間戳記不可靠,因為多個日誌訊息可能具有相同時間戳記。

參數
  • base_log_folder (str) – 用於本地儲存日誌的基礎資料夾

  • log_id_template – 日誌 ID 模板

  • host (str) – Elasticsearch 主機名稱

property log_name: str[原始碼]

日誌名稱。

是否支援外部連結。

PAGE = 0[原始碼]
MAX_LINE_PER_PAGE = 1000[原始碼]
LOG_NAME = 'Elasticsearch'[原始碼]
trigger_should_wrap = True[原始碼]
static format_url(host)[原始碼]

格式化給定的主機字串,以確保它以 ‘http’ 開頭,並檢查它是否代表有效的 URL。

Params host

要格式化和檢查的主機字串。

emit(record)[原始碼]

執行任何必要的操作以實際記錄指定的日誌記錄。

此版本旨在由子類別實作,因此會引發 NotImplementedError。

set_context(ti, *, identifier=None)[原始碼]

向 Airflow 任務處理器提供 task_instance 上下文。

參數
close()[原始碼]

清理處理器使用的任何資源。

此版本從處理器的內部映射 _handlers 中移除處理器,該映射用於按名稱查找處理器。子類別應確保從覆寫的 close() 方法中呼叫此方法。

get_external_log_url(task_instance, try_number)[原始碼]

為外部日誌收集服務建立位址。

參數
Returns

外部日誌收集服務的 URL

Return type

str

這個頁面內容對您有幫助嗎?