airflow.providers.openlineage.plugins.adapter

模組內容

類別

OpenLineageAdapter

將 Airflow 元數據翻譯成 OpenLineage 事件,而不是從 Airflow 程式碼建立它們。

class airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter(client=None, secrets_masker=None)[原始碼]

基底類別: airflow.utils.log.logging_mixin.LoggingMixin

將 Airflow 元數據翻譯成 OpenLineage 事件,而不是從 Airflow 程式碼建立它們。

get_or_create_openlineage_client()[原始碼]
get_openlineage_config()[原始碼]
static build_dag_run_id(dag_id, logical_date, clear_number)[原始碼]
static build_task_instance_run_id(dag_id, task_id, try_number, logical_date, map_index)[原始碼]
emit(event)[原始碼]

發送 OpenLineage 事件。

參數

event (openlineage.client.event_v2.RunEvent) – 要發送的事件。

回傳值

已編輯的事件。

start_task(run_id, job_name, job_description, event_time, parent_job_name, parent_run_id, code_location, nominal_start_time, nominal_end_time, owners, task, run_facets=None)[原始碼]

發送 START 類型的 openlineage 事件。

參數
  • run_id (str) – dag 執行中任務的全域唯一識別碼

  • job_name (str) – dag 中任務的全域唯一識別碼

  • job_description (str) – 使用者提供的任務描述

  • event_time (str) –

  • parent_job_name (str | None) – 父任務的名稱 (通常是 DAG,但也可能是任務群組)

  • parent_run_id (str | None) – 產生此任務的任務識別碼

  • code_location (str | None) – DAG 檔案的路徑或 URL

  • nominal_start_time (str | None) – dag 執行的排程時間

  • nominal_end_time (str | None) – dag 執行排程之後的時間

  • owners (list[str]) – DAG 的擁有者清單

  • task (airflow.providers.openlineage.extractors.OperatorLineage | None) – 包含從運算子提取的資訊的中繼資料容器

  • run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – 自訂執行 facet

complete_task(run_id, job_name, parent_job_name, parent_run_id, end_time, task, run_facets=None)[原始碼]

發送 COMPLETE 類型的 openlineage 事件。

參數
  • run_id (str) – dag 執行中任務的全域唯一識別碼

  • job_name (str) – dag 之間任務的全域唯一識別碼

  • parent_job_name (str | None) – 父任務的名稱 (通常是 DAG,但也可能是任務群組)

  • parent_run_id (str | None) – 產生此任務的任務識別碼

  • end_time (str) – 任務完成時間

  • task (airflow.providers.openlineage.extractors.OperatorLineage) – 包含從運算子提取的資訊的中繼資料容器

  • run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – 額外執行 facet

fail_task(run_id, job_name, parent_job_name, parent_run_id, end_time, task, error=None, run_facets=None)[原始碼]

發送 FAIL 類型的 openlineage 事件。

參數
  • run_id (str) – dag 執行中任務的全域唯一識別碼

  • job_name (str) – dag 之間任務的全域唯一識別碼

  • parent_job_name (str | None) – 父任務的名稱 (通常是 DAG,但也可能是任務群組)

  • parent_run_id (str | None) – 產生此任務的任務識別碼

  • end_time (str) – 任務完成時間

  • task (airflow.providers.openlineage.extractors.OperatorLineage) – 包含從運算子提取的資訊的中繼資料容器

  • run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – 自訂執行 facet

  • error (str | BaseException | None) – 錯誤

  • run_facets – 額外執行 facet

dag_started(dag_id, logical_date, start_date, nominal_start_time, nominal_end_time, owners, run_facets, clear_number, description=None, job_facets=None)[原始碼]
dag_success(dag_id, run_id, end_date, logical_date, clear_number, dag_run_state, task_ids)[原始碼]
dag_failed(dag_id, run_id, end_date, logical_date, clear_number, dag_run_state, task_ids, msg)[原始碼]

這個條目是否有幫助?