Lineage(血緣關係)¶
注意
Lineage(血緣關係)支援仍處於實驗階段,隨時可能變更。
Airflow 可以協助追蹤資料的來源、資料的處理過程以及資料隨時間的流向。這有助於建立稽核追蹤和資料治理,同時也能夠除錯資料流。
Airflow 透過任務的 inlets(輸入)和 outlets(輸出)來追蹤資料。讓我們從一個範例開始,看看它是如何運作的。
import datetime
import pendulum
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)
Inlets(輸入)可以是(上游)任務 ID 的清單,也可以靜態定義為 attr annotated 物件,例如 File
物件。Outlets(輸出)只能是 attr annotated 物件。兩者都會在執行時渲染。然而,任務的 outlets(輸出),如果它們是另一個任務的 inlets(輸入),則不會為下游任務重新渲染。
注意
如果 operator(運算子)支援,它可以自動新增 inlets(輸入)和 outlets(輸出)。
在範例 DAG 任務 run_this
( task_id=run_me_first
) 中,是一個 BashOperator,它接受 3 個 inlets(輸入): CAT1
、 CAT2
、 CAT3
,這些輸入是從清單中產生的。請注意, data_interval_start
是一個範本化的欄位,將在任務執行時渲染。
注意
在幕後,Airflow 會準備 lineage(血緣關係)metadata(元數據)作為任務 pre_execute
方法的一部分。當任務完成執行後,會呼叫 post_execute
,並且 lineage(血緣關係)metadata(元數據)會被推送到 XCOM 中。因此,如果您正在建立自己的 operators(運算子)來覆寫這個方法,請確保使用 prepare_lineage
和 apply_lineage
分別裝飾您的方法。
速記符號¶
速記符號也可用,它的運作方式幾乎與 unix 命令列管道、輸入和輸出相同。請注意, operator(運算子) precedence(優先順序) 仍然適用。此外,只有當左側定義了 outlets(輸出)(例如透過使用 add_outlets(..)
)或具有現成的 lineage(血緣關係)支援 operator.supports_lineage == True
時, |
運算子才會運作。
f_in > run_this | (run_this_last > outlets)
Hook Lineage(Hook 血緣關係)¶
Airflow 提供了一個強大的功能,不僅可以追蹤任務之間的資料 lineage(血緣關係),還可以追蹤任務中使用的 hooks(鉤子)的資料 lineage(血緣關係)。此功能可協助您了解資料如何在 Airflow pipelines(管線)中流動。
一個全域的 HookLineageCollector
實例作為收集 lineage(血緣關係)資訊的中心樞紐。Hooks(鉤子)可以將它們互動的 datasets(資料集)的詳細資訊發送到這個 collector(收集器)。然後,collector(收集器)使用這些資料來建構符合 AIP-60 標準的 Datasets(資料集),這是一種描述 datasets(資料集)的標準格式。
from airflow.lineage.hook_lineage import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_dataset(self, dataset_kwargs={"scheme": "file", "path": "/tmp/out"})
由 HookLineageCollector
收集的 lineage(血緣關係)資料可以使用 HookLineageReader
的實例來存取,該實例已在 Airflow plugin(外掛程式)中註冊。
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_datasets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
如果 Airflow 中沒有註冊 HookLineageReader
,則會改為使用預設的 NoOpCollector
。這個 collector(收集器)不會建立符合 AIP-60 標準的 datasets(資料集)或收集 lineage(血緣關係)資訊。
Lineage Backend(血緣關係後端)¶
可以透過在 config(配置)中提供 LineageBackend 的實例,將 lineage(血緣關係)metrics(指標)推送到自訂的 backend(後端)。
[lineage]
backend = my.lineage.CustomBackend
backend(後端)應該繼承自 airflow.lineage.LineageBackend
。
from airflow.lineage.backend import LineageBackend
class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service