使用 OpenLineage 整合¶
OpenLineage 是一個開放框架,用於資料譜系收集和分析。其核心是一個可擴展的規範,系統可以使用該規範與譜系元數據互操作。查看 OpenLineage 文件。
使用 OpenLineage 不需要變更使用者 DAG 檔案。 只需要基本設定,以便 OpenLineage 知道將事件發送到哪裡。
快速入門¶
注意
OpenLineage Provider 提供了多樣化的資料傳輸選項(http、kafka、檔案等),包括建立自訂解決方案的彈性。設定可以透過多種方法管理,並且使用者可以使用大量的設定來微調和增強他們對 OpenLineage 的使用。 有關這些功能的完整說明,請參閱本文檔的後續章節。
此範例是 OpenLineage 設定的基本示範。
安裝 provider 套件或將其新增至
requirements.txt
檔案。pip install apache-airflow-providers-openlineage
提供
Transport
設定,以便 OpenLineage 知道將事件發送到哪裡。在airflow.cfg
檔案中[openlineage] transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
或使用
AIRFLOW__OPENLINEAGE__TRANSPORT
環境變數AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
就這樣! 當 DAG 執行時,OpenLineage 事件應傳送到已設定的後端。
使用方式¶
當啟用和設定後,整合不需要使用者進一步操作。 它將自動
收集任務輸入/輸出元數據(來源、結構描述等)。
收集任務執行層級元數據(執行時間、狀態、參數等)
收集任務工作層級元數據(擁有者、類型、描述等)
收集任務特定的元數據(bigquery 工作 ID、python 原始碼等) - 取決於 Operator
所有這些資料將作為 OpenLineage 事件傳送到已設定的後端,如 工作階層 中所述。
傳輸設定¶
設定 OpenLineage Airflow Provider 的主要且推薦的方法是 Airflow 設定(airflow.cfg
檔案)。 所有可能的設定選項和範例值都可以在 設定章節 中找到。
至少,在任何情況下都需要設定一件事是 Transport
- 您希望事件最終到達哪裡 - 例如 Marquez。
將 Transport 設定為 JSON 字串¶
Airflow 設定中的 transport
選項用於此目的。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
AIRFLOW__OPENLINEAGE__TRANSPORT
環境變數是等效的。
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
如果您想查看 OpenLineage 事件而不將其發送到任何地方,您可以設定 ConsoleTransport
- 事件將最終出現在任務日誌中。
[openlineage]
transport = {"type": "console"}
注意
有關內建傳輸類型、特定傳輸的選項或如何實作自訂傳輸的說明,請參閱 Python client 文件。
將 Transport 設定為組態檔¶
您也可以使用 YAML 檔案(例如 openlineage.yml
)設定 OpenLineage Transport
。 在 Airflow 設定中提供 YAML 檔案的路徑作為 config_path
選項。
[openlineage]
config_path = '/path/to/openlineage.yml'
AIRFLOW__OPENLINEAGE__CONFIG_PATH
環境變數是等效的。
AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
組態 YAML 檔案的範例內容
transport:
type: http
url: https://backend:5000
endpoint: events/receive
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
注意
有關該設定方法的詳細描述以及範例組態檔,可以在 Python client 文件 中找到。
設定優先順序¶
由於有多種可能的 OpenLineage 設定方式,因此請務必記住不同設定的優先順序。 OpenLineage Airflow Provider 依以下順序尋找設定:
檢查
airflow.cfg
中openlineage
區段下的config_path
(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 環境變數)檢查
airflow.cfg
中openlineage
區段下的transport
(或 AIRFLOW__OPENLINEAGE__TRANSPORT 環境變數)如果以上所有選項都遺失,則底層使用的 OpenLineage Python client 會按照 此 文件中描述的順序尋找設定。 請注意,鼓勵使用 Airflow 設定,並且這是唯一面向未來的解決方案。
回溯相容性¶
警告
以下變數不應使用,並且將來可能會移除。 考慮使用 Airflow 設定(如上所述)以獲得面向未來的解決方案。
為了與 openlineage-airflow
套件回溯相容,某些環境變數仍然可用
OPENLINEAGE_DISABLED
等效於AIRFLOW__OPENLINEAGE__DISABLED
。OPENLINEAGE_CONFIG
等效於AIRFLOW__OPENLINEAGE__CONFIG_PATH
。OPENLINEAGE_NAMESPACE
等效於AIRFLOW__OPENLINEAGE__NAMESPACE
。OPENLINEAGE_EXTRACTORS
等效於設定AIRFLOW__OPENLINEAGE__EXTRACTORS
。OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
等效於AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
。OPENLINEAGE_URL
可用於設定簡單的 http 傳輸。 此方法有一些限制,可能需要使用其他環境變數才能達到所需的輸出。 請參閱 文件。
其他選項¶
命名空間¶
為此特定實例設定 OpenLineage 命名空間非常有用。 這樣,如果您使用多個 OpenLineage 生產者,則來自它們的事件將在邏輯上分開。 如果未設定,則使用 default
命名空間。 在 Airflow 設定中提供命名空間的名稱作為 namespace
選項。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'
AIRFLOW__OPENLINEAGE__NAMESPACE
環境變數是等效的。
AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
逾時¶
為了在任務執行和 OpenLineage 之間增加一層隔離,增加對 OpenLineage 執行不會以時間以外的方式干擾任務執行的保證,OpenLineage 方法在單獨的進程中執行。 程式碼以預設逾時時間 10 秒執行。 您可以透過設定 execution_timeout
值來增加此時間。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT
環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
停用¶
您可以停用傳送 OpenLineage 事件,而無需解除安裝 OpenLineage provider,方法是在 Airflow 設定中將 disabled
選項設定為 true
。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true
AIRFLOW__OPENLINEAGE__DISABLED
環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLED=true
停用原始碼¶
幾個 Operator(例如 Python、Bash)預設會將其原始碼包含在其 OpenLineage 事件中。 為了防止這種情況,請在 Airflow 設定中將 disable_source_code
選項設定為 true
。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true
為 Operators 停用¶
您可以輕鬆地從發出 OpenLineage 事件中排除某些 Operators,方法是將以分號分隔的 Airflow Operators 完整匯入路徑字串傳遞到 Airflow 設定中的 disabled_for_operators
欄位。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS
環境變數是等效的。
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
完整任務資訊¶
預設情況下,OpenLineage 整合的 AirflowRunFacet - 附加在每個任務實例事件的 START 事件上 - 不包含完整的序列化任務資訊(給定 operator 的參數),而僅包含選定的參數。
但是,我們允許使用者設定 OpenLineage 整合以包含完整的任務資訊。 透過這樣做,我們不是僅序列化一些已知的屬性,而是排除某些不可序列化的元素並發送其他所有內容。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO
環境變數是等效的。
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
警告
透過將此變數設定為 true,OpenLineage 整合不會控制您傳送的事件大小。 它可能會包含大小為 MB 甚至更大的元素,具體取決於您傳遞給任務的資料大小。
自訂 Extractor¶
若要使用 自訂 Extractor 功能,請透過將以分號分隔的 Airflow Operators 完整匯入路徑字串傳遞到 Airflow 設定中的 extractors
選項來註冊 extractor。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
自訂 Run Facet¶
若要注入 自訂 run facet,請透過將以分號分隔的自訂 run facet 函數的完整匯入路徑字串傳遞到 Airflow 設定中的 custom_run_facets
選項來註冊自訂 run facet 函數。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS
環境變數是等效的。
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
偵錯模式¶
您可以透過在 Airflow 設定中將 debug_mode
選項設定為 true
,啟用在 OpenLineage 事件中傳送額外資訊,這些資訊對於偵錯和重現您的環境設定可能很有用。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true
AIRFLOW__OPENLINEAGE__DEBUG_MODE
環境變數是等效的。
AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
警告
透過將此變數設定為 true,OpenLineage 整合可能會記錄和發出大量詳細資訊。 僅應在偵錯目的時暫時啟用它。
在 DAG/任務層級啟用 OpenLineage¶
可以使用 selective_enable
策略有選擇地為特定 DAG 和任務啟用 OpenLineage。 若要啟用此策略,請在 Airflow 組態檔的 [openlineage] 區段中將 selective_enable
選項設定為 True
[openlineage]
selective_enable = True
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE
環境變數是等效的。
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
雖然 selective_enable
啟用選擇性控制,但 disabled
選項 仍然具有優先權。 如果您在設定中將 disabled
設定為 True,則無論 selective_enable
設定如何,OpenLineage 都將針對所有 DAG 和任務停用。
一旦啟用 selective_enable
策略,您可以使用 enable_lineage
和 disable_lineage
函數選擇為個別 DAG 和任務啟用 OpenLineage。
在 DAG 上啟用 Lineage
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
MyOperator(...)
AnotherOperator(...)
在任務上啟用 Lineage
雖然在 DAG 上啟用 lineage 會隱含地為該 DAG 內的所有任務啟用它,但您仍然可以有選擇地為特定任務停用它
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)
# Enable lineage for the entire DAG
enable_lineage(dag)
# Disable lineage for task t1
disable_lineage(t1)
在 DAG 層級啟用 lineage 會自動為該 DAG 內的所有任務啟用它,除非明確地為每個任務停用。
在任務層級啟用 lineage 會隱含地在其 DAG 上啟用 lineage。 這是因為每個發出任務都會傳送 ParentRunFacet,這需要在某些 OpenLineage 後端系統中啟用 DAG 層級 lineage。 在啟用任務層級 lineage 的同時停用 DAG 層級 lineage 可能會導致錯誤或不一致。
將父工作資訊傳遞到 Spark 工作¶
OpenLineage 整合可以自動將 Airflow 的資訊(命名空間、工作名稱、執行 ID)作為父工作資訊(spark.openlineage.parentJobNamespace
、spark.openlineage.parentJobName
、spark.openlineage.parentRunId
)注入到 Spark 應用程式屬性中,適用於 支援的 Operators。 它允許 Spark 整合自動在應用程式層級 OpenLineage 事件中包含 parentRunFacet
,在來自不同整合的任務之間建立父子關係。 請參閱 從 Airflow 排程。
警告
如果在 Spark 工作組態中手動指定了上述任何屬性,則整合將避免注入父工作屬性,以確保手動提供的值得以保留。
您可以透過在 Airflow 設定中將 spark_inject_parent_job_info
選項設定為 true
來啟用此自動化。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_parent_job_info = true
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO
環境變數是等效的。
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
為自訂 Operators 新增支援¶
如果您想為特定 Operator 新增 OpenLineage 涵蓋範圍,請查看 在 Operators 中實作 OpenLineage
我可以在哪裡了解更多資訊?¶
查看 OpenLineage 網站。
造訪我們的 GitHub 倉庫。
觀看多個關於 OpenLineage 的 演講。