使用 OpenLineage 整合

OpenLineage 是一個開放框架,用於資料譜系收集和分析。其核心是一個可擴展的規範,系統可以使用該規範與譜系元數據互操作。查看 OpenLineage 文件

使用 OpenLineage 不需要變更使用者 DAG 檔案。 只需要基本設定,以便 OpenLineage 知道將事件發送到哪裡。

快速入門

注意

OpenLineage Provider 提供了多樣化的資料傳輸選項(http、kafka、檔案等),包括建立自訂解決方案的彈性。設定可以透過多種方法管理,並且使用者可以使用大量的設定來微調和增強他們對 OpenLineage 的使用。 有關這些功能的完整說明,請參閱本文檔的後續章節。

此範例是 OpenLineage 設定的基本示範。

  1. 安裝 provider 套件或將其新增至 requirements.txt 檔案。

    pip install apache-airflow-providers-openlineage
    
  2. 提供 Transport 設定,以便 OpenLineage 知道將事件發送到哪裡。在 airflow.cfg 檔案中

    [openlineage]
    transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
    

    或使用 AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數

    AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
    
  3. 就這樣! 當 DAG 執行時,OpenLineage 事件應傳送到已設定的後端。

使用方式

當啟用和設定後,整合不需要使用者進一步操作。 它將自動

  • 收集任務輸入/輸出元數據(來源、結構描述等)。

  • 收集任務執行層級元數據(執行時間、狀態、參數等)

  • 收集任務工作層級元數據(擁有者、類型、描述等)

  • 收集任務特定的元數據(bigquery 工作 ID、python 原始碼等) - 取決於 Operator

所有這些資料將作為 OpenLineage 事件傳送到已設定的後端,如 工作階層 中所述。

傳輸設定

設定 OpenLineage Airflow Provider 的主要且推薦的方法是 Airflow 設定(airflow.cfg 檔案)。 所有可能的設定選項和範例值都可以在 設定章節 中找到。

至少,在任何情況下都需要設定一件事是 Transport - 您希望事件最終到達哪裡 - 例如 Marquez

將 Transport 設定為 JSON 字串

Airflow 設定中的 transport 選項用於此目的。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數是等效的。

AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'

如果您想查看 OpenLineage 事件而不將其發送到任何地方,您可以設定 ConsoleTransport - 事件將最終出現在任務日誌中。

[openlineage]
transport = {"type": "console"}

注意

有關內建傳輸類型、特定傳輸的選項或如何實作自訂傳輸的說明,請參閱 Python client 文件

將 Transport 設定為組態檔

您也可以使用 YAML 檔案(例如 openlineage.yml)設定 OpenLineage Transport。 在 Airflow 設定中提供 YAML 檔案的路徑作為 config_path 選項。

[openlineage]
config_path = '/path/to/openlineage.yml'

AIRFLOW__OPENLINEAGE__CONFIG_PATH 環境變數是等效的。

AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'

組態 YAML 檔案的範例內容

transport:
  type: http
  url: https://backend:5000
  endpoint: events/receive
  auth:
    type: api_key
    apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e

注意

有關該設定方法的詳細描述以及範例組態檔,可以在 Python client 文件 中找到。

設定優先順序

由於有多種可能的 OpenLineage 設定方式,因此請務必記住不同設定的優先順序。 OpenLineage Airflow Provider 依以下順序尋找設定:

  1. 檢查 airflow.cfgopenlineage 區段下的 config_path(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 環境變數)

  2. 檢查 airflow.cfgopenlineage 區段下的 transport(或 AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數)

  3. 如果以上所有選項都遺失,則底層使用的 OpenLineage Python client 會按照 文件中描述的順序尋找設定。 請注意,鼓勵使用 Airflow 設定,並且這是唯一面向未來的解決方案。

回溯相容性

警告

以下變數不應使用,並且將來可能會移除。 考慮使用 Airflow 設定(如上所述)以獲得面向未來的解決方案。

為了與 openlineage-airflow 套件回溯相容,某些環境變數仍然可用

  • OPENLINEAGE_DISABLED 等效於 AIRFLOW__OPENLINEAGE__DISABLED

  • OPENLINEAGE_CONFIG 等效於 AIRFLOW__OPENLINEAGE__CONFIG_PATH

  • OPENLINEAGE_NAMESPACE 等效於 AIRFLOW__OPENLINEAGE__NAMESPACE

  • OPENLINEAGE_EXTRACTORS 等效於設定 AIRFLOW__OPENLINEAGE__EXTRACTORS

  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE 等效於 AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE

  • OPENLINEAGE_URL 可用於設定簡單的 http 傳輸。 此方法有一些限制,可能需要使用其他環境變數才能達到所需的輸出。 請參閱 文件

其他選項

命名空間

為此特定實例設定 OpenLineage 命名空間非常有用。 這樣,如果您使用多個 OpenLineage 生產者,則來自它們的事件將在邏輯上分開。 如果未設定,則使用 default 命名空間。 在 Airflow 設定中提供命名空間的名稱作為 namespace 選項。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'

AIRFLOW__OPENLINEAGE__NAMESPACE 環境變數是等效的。

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

逾時

為了在任務執行和 OpenLineage 之間增加一層隔離,增加對 OpenLineage 執行不會以時間以外的方式干擾任務執行的保證,OpenLineage 方法在單獨的進程中執行。 程式碼以預設逾時時間 10 秒執行。 您可以透過設定 execution_timeout 值來增加此時間。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT 環境變數是等效的。

AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60

停用

您可以停用傳送 OpenLineage 事件,而無需解除安裝 OpenLineage provider,方法是在 Airflow 設定中將 disabled 選項設定為 true

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true

AIRFLOW__OPENLINEAGE__DISABLED 環境變數是等效的。

AIRFLOW__OPENLINEAGE__DISABLED=true

停用原始碼

幾個 Operator(例如 Python、Bash)預設會將其原始碼包含在其 OpenLineage 事件中。 為了防止這種情況,請在 Airflow 設定中將 disable_source_code 選項設定為 true

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE 環境變數是等效的。

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true

為 Operators 停用

您可以輕鬆地從發出 OpenLineage 事件中排除某些 Operators,方法是將以分號分隔的 Airflow Operators 完整匯入路徑字串傳遞到 Airflow 設定中的 disabled_for_operators 欄位。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS 環境變數是等效的。

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'

完整任務資訊

預設情況下,OpenLineage 整合的 AirflowRunFacet - 附加在每個任務實例事件的 START 事件上 - 不包含完整的序列化任務資訊(給定 operator 的參數),而僅包含選定的參數。

但是,我們允許使用者設定 OpenLineage 整合以包含完整的任務資訊。 透過這樣做,我們不是僅序列化一些已知的屬性,而是排除某些不可序列化的元素並發送其他所有內容。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO 環境變數是等效的。

AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true

警告

透過將此變數設定為 true,OpenLineage 整合不會控制您傳送的事件大小。 它可能會包含大小為 MB 甚至更大的元素,具體取決於您傳遞給任務的資料大小。

自訂 Extractor

若要使用 自訂 Extractor 功能,請透過將以分號分隔的 Airflow Operators 完整匯入路徑字串傳遞到 Airflow 設定中的 extractors 選項來註冊 extractor。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS 環境變數是等效的。

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

自訂 Run Facet

若要注入 自訂 run facet,請透過將以分號分隔的自訂 run facet 函數的完整匯入路徑字串傳遞到 Airflow 設定中的 custom_run_facets 選項來註冊自訂 run facet 函數。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 環境變數是等效的。

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

偵錯模式

您可以透過在 Airflow 設定中將 debug_mode 選項設定為 true,啟用在 OpenLineage 事件中傳送額外資訊,這些資訊對於偵錯和重現您的環境設定可能很有用。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true

AIRFLOW__OPENLINEAGE__DEBUG_MODE 環境變數是等效的。

AIRFLOW__OPENLINEAGE__DEBUG_MODE=true

警告

透過將此變數設定為 true,OpenLineage 整合可能會記錄和發出大量詳細資訊。 僅應在偵錯目的時暫時啟用它。

在 DAG/任務層級啟用 OpenLineage

可以使用 selective_enable 策略有選擇地為特定 DAG 和任務啟用 OpenLineage。 若要啟用此策略,請在 Airflow 組態檔的 [openlineage] 區段中將 selective_enable 選項設定為 True

[openlineage]
selective_enable = True

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE 環境變數是等效的。

AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true

雖然 selective_enable 啟用選擇性控制,但 disabled 選項 仍然具有優先權。 如果您在設定中將 disabled 設定為 True,則無論 selective_enable 設定如何,OpenLineage 都將針對所有 DAG 和任務停用。

一旦啟用 selective_enable 策略,您可以使用 enable_lineagedisable_lineage 函數選擇為個別 DAG 和任務啟用 OpenLineage。

  1. 在 DAG 上啟用 Lineage

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
    # Tasks within this DAG will have lineage tracking enabled
    MyOperator(...)

    AnotherOperator(...)
  1. 在任務上啟用 Lineage

雖然在 DAG 上啟用 lineage 會隱含地為該 DAG 內的所有任務啟用它,但您仍然可以有選擇地為特定任務停用它

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
    t1 = MyOperator(...)
    t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

在 DAG 層級啟用 lineage 會自動為該 DAG 內的所有任務啟用它,除非明確地為每個任務停用。

在任務層級啟用 lineage 會隱含地在其 DAG 上啟用 lineage。 這是因為每個發出任務都會傳送 ParentRunFacet,這需要在某些 OpenLineage 後端系統中啟用 DAG 層級 lineage。 在啟用任務層級 lineage 的同時停用 DAG 層級 lineage 可能會導致錯誤或不一致。

將父工作資訊傳遞到 Spark 工作

OpenLineage 整合可以自動將 Airflow 的資訊(命名空間、工作名稱、執行 ID)作為父工作資訊(spark.openlineage.parentJobNamespacespark.openlineage.parentJobNamespark.openlineage.parentRunId)注入到 Spark 應用程式屬性中,適用於 支援的 Operators。 它允許 Spark 整合自動在應用程式層級 OpenLineage 事件中包含 parentRunFacet,在來自不同整合的任務之間建立父子關係。 請參閱 從 Airflow 排程

警告

如果在 Spark 工作組態中手動指定了上述任何屬性,則整合將避免注入父工作屬性,以確保手動提供的值得以保留。

您可以透過在 Airflow 設定中將 spark_inject_parent_job_info 選項設定為 true 來啟用此自動化。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_parent_job_info = true

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO 環境變數是等效的。

AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true

疑難排解

有關如何疑難排解 OpenLineage 的詳細資訊,請參閱 疑難排解

為自訂 Operators 新增支援

如果您想為特定 Operator 新增 OpenLineage 涵蓋範圍,請查看 在 Operators 中實作 OpenLineage

我可以在哪裡了解更多資訊?

意見回饋

您可以在 slack 上與我們聯繫並留下您的意見回饋!

如何貢獻

我們歡迎您的貢獻! OpenLineage 是一個正在積極開發中的開放原始碼專案,我們很樂意得到您的幫助!

聽起來很有趣嗎? 查看我們的 新貢獻者指南 以開始。

此條目是否有幫助?