在 Operators 中實作 OpenLineage¶
OpenLineage 透過支援直接修改 Airflow Operators,讓您輕鬆地將血統資訊新增至資料管線。當可以修改 Operator 時,新增血統擷取可以像在其中新增一個方法一樣簡單。詳情請參閱OpenLineage 方法。
可能有些 Operators 您無法修改(例如第三方供應商),但仍然希望從中擷取血統資訊。為了處理這種情況,OpenLineage 允許您為任何 Operator 提供自訂的 Extractor。詳情請參閱自訂 Extractor。
如果以上所有方法都無法實作,作為後備方案,還有手動註解血統資訊的方法。Airflow 允許 Operators 透過指定 Operators 的輸入和輸出(透過 inlets 和 outlets)來追蹤血統資訊。詳情請參閱手動註解血統資訊。
擷取優先順序¶
由於有多種可能的方法可以為 Operator 實作 OpenLineage 支援,因此務必記住 OpenLineage 尋找血統資料的順序
Extractor - 檢查是否為 Operator 類別名稱指定了自訂的 Extractor。使用者註冊的任何自訂 Extractor 都將優先於 Airflow Provider 原始碼中定義的預設 Extractors(例如 BashExtractor)。
OpenLineage 方法 - 如果沒有為 Operator 類別名稱明確指定 Extractor,則會使用 DefaultExtractor,它會在 Operator 中尋找 OpenLineage 方法。
Inlets 和 Outlets - 如果 Operator 中未定義 OpenLineage 方法,則會檢查 inlets 和 outlets。
如果以上所有選項都遺失,則不會從 Operator 擷取任何血統資料。您仍然會收到豐富了諸如一般 Airflow facets、正確的事件時間和類型等資訊的 OpenLineage 事件,但輸入/輸出將為空,並且會遺失 Operator 特定的 facets。
OpenLineage 方法¶
當處理您自己的 Operators 時,建議使用此方法,您可以直接實作 OpenLineage 方法。當處理您無法修改的 Operators(例如第三方供應商),但仍然希望從中擷取血統資訊時,請參閱自訂 Extractor。
OpenLineage 定義了一些在 Operators 中實作的方法。這些方法被稱為 OpenLineage 方法。
def get_openlineage_facets_on_start() -> OperatorLineage: ...
def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage: ...
def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage: ...
當任務實例狀態變更為以下狀態時,會分別呼叫 OpenLineage 方法
RUNNING ->
get_openlineage_facets_on_start()
SUCCESS ->
get_openlineage_facets_on_complete()
FAILED ->
get_openlineage_facets_on_failure()
至少必須實作以下方法之一:get_openlineage_facets_on_start()
或 get_openlineage_facets_on_complete()
。有關在缺少其他方法時呼叫哪些方法的更多詳細資訊,請參閱如何正確實作 OpenLineage 方法?。
提供者不是傳回完整的 OpenLineage 事件,而是定義了要由 Operators 傳回的 OperatorLineage
結構
@define
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage 整合本身會負責豐富它,使其包含諸如一般 Airflow facets、正確的事件時間和類型等資訊,並建立適當的 OpenLineage RunEvent。
如何正確實作 OpenLineage 方法?¶
在 Operators 中實作 OpenLineage 時,有幾件事值得注意。
首先,不要在頂層匯入 OpenLineage 相關的物件,而是在 OL 方法本身中匯入。這允許使用者即使沒有安裝 OpenLineage provider 也能使用您的 provider。
第二個重點是確保您的 provider 傳回符合 OpenLineage 標準的資料集名稱。這允許 OpenLineage 消費者正確地比對從不同來源收集的有關資料集的資訊。命名慣例在OpenLineage 命名文件中有所描述。
第三,OpenLineage 實作不應浪費不使用它的使用者的時間。這表示不要在 execute
方法中執行未被其使用的繁重處理或網路呼叫。更好的選擇是將相關資訊儲存在 Operator 屬性中 - 然後在 OpenLineage 方法中使用它。一個好的例子是 BigQueryExecuteQueryOperator
。它儲存了已執行的查詢的 job_ids
。get_openlineage_facets_on_complete
然後可以呼叫 BigQuery API,要求這些表格的血統資訊,並將其轉換為 OpenLineage 格式。
第四,沒有必要實作所有方法。如果在呼叫 execute
之前已知所有資料集,並且沒有相關的執行階段資料,則可能沒有必要實作 get_openlineage_facets_on_complete
- get_openlineage_facets_on_start
方法可以提供所有資料。反之亦然,如果在執行之前一切都是未知的,則可能沒有必要編寫 _on_start
方法。同樣地,如果沒有相關的失敗資料 - 或者失敗條件未知,則實作 get_openlineage_facets_on_failure
可能不值得。總體而言:如果沒有 on_failure
方法,則會改為呼叫 on_complete
方法。如果沒有 on_failure
和 on_complete
方法,則會改為呼叫 on_start
方法(在任務開始和任務完成時都會呼叫)。如果沒有 on_start
方法,血統資訊將不會包含在 START 事件中,並且 on_complete
方法將在任務完成時呼叫。
如何測試 OpenLineage 方法?¶
在 Operators 中單元測試 OpenLineage 整合與測試 Operators 本身非常相似。這些測試的目標是確保 get_openlineage_*
方法傳回具有相關欄位填寫的正確 OperatorLineage
資料結構。建議模擬任何外部呼叫。測試的作者需要記住呼叫不同 OL 方法的條件是不同的。get_openlineage_facets_on_start
在 execute
之前呼叫,因此,不得依賴在那裡設定的值。
有關如何在本地端疑難排解 OpenLineage 的詳細資訊,請參閱疑難排解。
目前沒有用於系統測試 OpenLineage 整合的現有框架,但可以實現的最簡單方法是將發出的事件(例如使用 FileTransport
)與預期的事件進行比較。OpenLineage 系統測試作者的目標是提供預期的事件金鑰字典。事件金鑰識別從特定 Operator 和方法發送的事件:它們具有結構 <dag_id>.<task_id>.event.<event_type>
;始終可以透過這種方式識別從特定任務發送的特定事件。提供的事件結構不必包含結果事件中的所有欄位。只能比較測試作者提供的欄位;這允許僅檢查特定測試關心的欄位。它還允許跳過(半)隨機產生的欄位,例如 runId
或 eventTime
,或在 Airflow 中的 OpenLineage 環境中始終相同的欄位,例如 producer
。
範例¶
這是針對 GcsToGcsOperator 正確實作的 get_openlineage_facets_on_complete
方法的範例。由於在 execute
方法中進行了一些處理,並且沒有相關的失敗資料,因此實作此單一方法就足夠了。
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
如需更多已實作 OpenLineage 方法的範例,請查看支援的類別的原始碼。
自訂 Extractors¶
當處理您無法修改的 Operators(例如第三方供應商),但仍然希望從中擷取血統資訊時,建議使用此方法。如果您想從自己的 Operators 中擷取血統資訊,您可能更喜歡直接實作 OpenLineage 方法,如OpenLineage 方法中所述。
此方法的工作原理是偵測您的 DAG 正在使用的 Airflow Operators,並使用對應的 Extractors 類別從中擷取血統資料。
介面¶
自訂 Extractors 必須衍生自 BaseExtractor
並實作至少兩種方法:_execute_extraction
和 get_operator_classnames
。
BaseExtractor 定義了兩種方法:extract
和 extract_on_complete
,它們被呼叫並用於提供實際的血統資料。區別在於 extract
在 Operator 的 execute
方法之前呼叫,而 extract_on_complete
在之後呼叫。預設情況下,extract
呼叫在自訂 Extractor 中實作的 _execute_extraction
方法,而 extract_on_complete
呼叫 extract
方法。如果您想提供任務執行後可用的其他資訊,您可以覆寫 extract_on_complete
方法。這可以用於擷取 Operator 在其自身屬性上設定的任何其他資訊。一個好的例子是 SnowflakeOperator
,它在執行後設定 query_ids
。
get_operator_classnames
是一個類別方法,用於提供您的 Extractor 可以從中取得血統資訊的 Operators 列表。
例如
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
如果 Operator 的名稱與列表上的名稱之一相符,則 Extractor 將被實例化 - Operator 在 Extractor 的 self.operator
屬性中提供 - 並且 extract
和 extract_on_complete
方法都將被呼叫。
這兩種方法都傳回 OperatorLineage
結構
@define
class OperatorLineage:
"""Structure returned from lineage extraction."""
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
輸入和輸出是純 OpenLineage 資料集列表 (openlineage.client.event_v2.Dataset)。
run_facets
和 job_facets
是可選 RunFacets 和 JobFacets 的字典,它們將附加到 job - 例如,如果您的 Operator 正在執行 SQL,您可能想要附加 SqlJobFacet
。
若要了解有關 OpenLineage 中 facets 的更多資訊,請參閱自訂 Facets。
註冊自訂 Extractor¶
OpenLineage 整合不知道您已提供 Extractor,除非您註冊它。
這可以使用 Airflow 組態中的 extractors
選項來完成。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
環境變數是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
或者,您可以使用空白字元分隔它們。如果您將它們作為某些 YAML 檔案的一部分提供,這很有用。
AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
請記住確保排程器和 worker 可以匯入該路徑。
偵錯自訂 Extractor¶
與自訂 Extractors 相關聯的兩個常見問題。
第一個是為 Airflow 組態中的 extractors
選項提供的路徑錯誤。路徑需要與您從程式碼中使用的路徑完全相同。如果路徑錯誤或無法從 worker 匯入,則外掛程式將無法載入 Extractors,並且不會為該 Operator 發出正確的 OpenLineage 事件。
第二個問題,可能更隱蔽,是從 Airflow 匯入。由於 OpenLineage 程式碼在 Airflow worker 本身啟動時實例化,因此從 Airflow 進行的任何匯入都可能是不易察覺的循環。這會導致 OpenLineage 擷取失敗。
為了避免此問題,請僅在本地端(在 _execute_extraction
或 extract_on_complete
方法中)從 Airflow 匯入。如果您需要匯入以進行類型檢查,請將它們置於 typing.TYPE_CHECKING 後面。
測試自訂 Extractor¶
與所有程式碼一樣,應測試自訂 Extractors。本節將提供有關編寫測試的最重要資料結構的一些資訊,以及有關疑難排解的一些注意事項。我們假設事先了解編寫自訂 Extractors 的知識。若要了解有關 Operators 和 Extractors 如何在底層協同工作的更多資訊,請查看自訂 Extractors。
在測試 Extractor 時,我們首先要驗證是否正在建立 OperatorLineage
物件,特別是驗證物件是否使用正確的輸入和輸出資料集以及相關的 facets 建立。這是在 OpenLineage 中透過 pytest 完成的,並針對連線和物件進行適當的模擬和修補。查看範例測試。
測試每個 facet 也很重要,因為如果 facets 錯誤,UI 中的資料或圖形可能會錯誤地呈現。例如,如果在 Extractor 中錯誤地建立了 facet 名稱,則 Operator 的任務將不會顯示在血統圖中,從而在管線可觀察性中造成間隙。
即使進行了單元測試,Extractor 也可能仍然無法按預期運作。判斷資料是否未正確傳遞的最簡單方法是 UI 元素是否未正確顯示在「血統」標籤中。
有關如何在本地端疑難排解 OpenLineage 的詳細資訊,請參閱疑難排解。
範例¶
這是一個簡單 Extractor 的範例,適用於在 BigQuery 中執行匯出查詢並將結果儲存到 S3 檔案的 Operator。在呼叫 Operator 的 execute
方法之前已知某些資訊,並且我們已經可以在 _execute_extraction
方法中擷取一些血統資訊。在呼叫 Operator 的 execute
方法之後,在 extract_on_complete
中,我們可以簡單地將一些額外的 Facets(例如包含 Bigquery Job ID 的 Facets)附加到我們之前準備的內容。這樣,我們就可以從 Operator 取得所有可能的資訊。
請注意,這只是一個範例。有一些 OpenLineage 內建功能可以促進不同的流程,例如使用 SQL 解析器從 SQL 查詢中擷取欄位層級的血統資訊和輸入/輸出。
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
SQLJobFacet,
)
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self._job_id = None
def execute(self, context) -> Any:
self._job_id = run_query(query=self.query)
class ExampleExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ["ExampleOperator"]
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
job_facets={
"sql": SQLJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self, task_instance) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
}
return lineage_metadata
如需更多 OpenLineage Extractors 的範例,請查看 BashExtractor 或 PythonExtractor 的原始碼。
手動註解血統資訊¶
不建議經常使用此方法,僅在非常特殊的情況下,當無法從 Operator 本身擷取某些血統資訊時才建議使用。如果您想從自己的 Operators 中擷取血統資訊,您可能更喜歡直接實作 OpenLineage 方法,如OpenLineage 方法中所述。當處理您無法修改的 Operators(例如第三方供應商),但仍然希望從中擷取血統資訊時,請參閱自訂 Extractors。
Airflow 允許 Operators 透過指定 Operators 的輸入和輸出(透過 inlets 和 outlets)來追蹤血統資訊。預設情況下,如果 OpenLineage 無法從 OpenLineage 方法或 Extractors 中找到任何成功的擷取,則會將 inlets 和 outlets 用作輸入/輸出資料集。
Airflow 支援 inlets 和 outlets 為 Table、Column、File 或 User 實體,OpenLineage 也是如此。
範例¶
Airflow DAG 內部的 Operator 可以使用 inlets 和 outlets 進行註解,如下例所示
"""Example DAG demonstrating the usage of the extraction via Inlets and Outlets."""
import pendulum
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.lineage.entities import Table, File, Column, User
t1 = Table(
cluster="c1",
database="d1",
name="t1",
owners=[User(email="jdoe@ok.com", first_name="Joe", last_name="Doe")],
)
t2 = Table(
cluster="c1",
database="d1",
name="t2",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="mike@company.com", first_name="Mike", last_name="Smith"),
User(email="theo@company.com", first_name="Theo"),
User(email="smith@company.com", last_name="Smith"),
User(email="jane@company.com"),
],
)
t3 = Table(
cluster="c1",
database="d1",
name="t3",
columns=[
Column(name="col3", description="desc3", data_type="type3"),
Column(name="col4", description="desc4", data_type="type4"),
],
)
t4 = Table(cluster="c1", database="d1", name="t4")
f1 = File(url="s3://bucket/dir/file1")
with DAG(
dag_id="example_operator",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
task1 = BashOperator(
task_id="task_1_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t1, t2],
outlets=[t3],
)
task2 = BashOperator(
task_id="task_2_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t3, f1],
outlets=[t4],
)
task1 >> task2
if __name__ == "__main__":
dag.cli()
從 Airflow Table 實體到 OpenLineage Dataset 的轉換方式如下:- 表格實體的 CLUSTER
變成 OpenLineage Dataset 的命名空間 - 資料集的名稱由 {{DATABASE}}.{{NAME}}
形成,其中 DATABASE
和 NAME
是 Airflow Table 實體指定的屬性。
自訂 Facets¶
若要了解有關 OpenLineage 中 facets 的更多資訊,請參閱facet 文件。另請查看可用的 facets 和有關使用 facets 擴充的部落格文章。
OpenLineage 規格可能不包含您編寫 extractor 所需的所有 facets,在這種情況下,您將必須製作自己的自訂 facets。
您也可以使用 custom_run_facets
Airflow 組態將您自己的自訂 facets 注入血統事件的 run facet 中。
要採取的步驟,
編寫一個傳回自訂 facets 的函數。您可以根據需要編寫任意數量的自訂 facet 函數。
使用
custom_run_facets
Airflow 組態註冊函數。
Airflow OpenLineage 監聽器將在血統事件產生期間自動執行這些函數,並將它們的傳回值附加到血統事件中的 run facet。
編寫自訂 facet 函數¶
輸入引數: 函數應接受兩個輸入引數:
TaskInstance
和TaskInstanceState
。函數主體: 執行產生自訂 facets 所需的邏輯。自訂 facets 必須繼承自
RunFacet
,以便自動為 facet 新增_producer
和_schemaURL
。傳回值: 要新增至血統事件的自訂 facets。傳回類型應為
dict[str, RunFacet]
或None
。如果您不想為某些準則新增自訂 facets,您可以選擇傳回None
。
自訂 facet 函數範例
import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
註冊自訂 facet 函數¶
使用 custom_run_facets
Airflow 組態透過傳遞以分號分隔的函數完整匯入路徑字串來註冊自訂 run facet 函數。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS
環境變數是等效的。
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
注意
自訂 facet 函數在 TaskInstance 的 START 和 COMPLETE/FAIL 時都會執行,並新增至對應的 OpenLineage 事件。
在 TaskInstance 狀態上建立條件時,您應該使用提供的第二個引數 (
TaskInstanceState
),其中將包含任務應處於的狀態。這可能與 ti.current_state() 不同,因為 OpenLineage 監聽器可能會在 Airflow 資料庫中更新 TaskInstance 的狀態之前被呼叫。當多次註冊單個函數的路徑時,它仍然只會執行一次。
當多個註冊的函數傳回重複的自訂 facet 金鑰時,隨機函數結果的結果將新增至血統事件。請避免使用重複的 facet 金鑰,因為它可能會產生非預期的行為。
Job Hierarchy¶
Apache Airflow 具有固有的 job hierarchy:DAGs,大型且獨立可排程的單元,包含較小的、可執行的任務。
OpenLineage 在其 Job Hierarchy 模型中反映了這種結構。
在 DAG 排程時,會發出 START 事件。
隨後,依照 Airflow 的任務順序,每個任務都會觸發
TaskInstance 開始時的 START 事件。
完成時的 COMPLETE/FAILED 事件。
最後,在 DAG 終止時,會發出完成事件(COMPLETE 或 FAILED)。
TaskInstance 事件的 ParentRunFacet 參考了原始 DAG 執行。
疑難排解¶
在本地端測試程式碼時,可以使用Marquez來檢查正在發出或未發出的資料。使用 Marquez 將讓您判斷錯誤是由 Extractor 還是 API 引起的。如果資料如預期從 Extractor 發出,但未傳遞到 UI,則 Extractor 沒有問題,應在 OpenLineage 中開啟一個問題。但是,如果資料未正確發出,則可能需要更多單元測試來涵蓋 Extractor 行為。Marquez 可以協助您精確找出哪些 facets 未正確形成,以便您知道在哪裡新增測試覆蓋率。
偵錯設定¶
為了進行偵錯,請確保Airflow 日誌記錄層級設定為 DEBUG
,並且為 OpenLineage 整合啟用debug_mode。這將增加 Airflow 日誌中的詳細程度,並在 OpenLineage 事件中包含額外的環境資訊。
在尋求偵錯方面的協助時,請務必嘗試提供以下資訊
Airflow 排程器日誌,日誌記錄層級設定為 DEBUG
Airflow worker 日誌(任務日誌),日誌記錄層級設定為 DEBUG
啟用 debug_mode 的 OpenLineage 事件
在哪裡可以了解更多資訊?¶
造訪我們的GitHub 儲存庫。
觀看多個關於 OpenLineage 的演講。