物件儲存

本教學展示如何使用物件儲存 API 來管理位於物件儲存上的物件,例如 S3、GCS 和 Azure Blob Storage。此 API 作為 Airflow 2.8 的一部分引入。

本教學涵蓋一個簡單的模式,該模式經常在資料工程和資料科學工作流程中使用:存取 Web API、儲存和分析結果。

先決條件

要完成本教學,您需要一些東西

  • DuckDB,一個內建的分析型資料庫,可以透過執行 pip install duckdb 安裝。

  • 一個 S3 儲存桶,以及包含 s3fs 的 Amazon provider。您可以透過執行 pip install apache-airflow-providers-amazon[s3fs] 安裝 provider 套件。或者,您可以透過在 create_object_storage_path 函數中將 URL 更改為適合您 provider 的 URL 來使用不同的儲存 provider,例如將 s3:// 替換為 gs:// 以用於 Google Cloud Storage,並安裝不同的 provider。

  • pandas,您可以透過執行 pip install pandas 安裝。

建立 ObjectStoragePath

ObjectStoragePath 是一個類似路徑的物件,表示物件儲存上的路徑。 它是物件儲存 API 的基本建構區塊。

airflow/example_dags/tutorial_objectstorage.py[source]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

給定 ObjectStoragePath 的 URL 的使用者名稱部分應該是連線 ID。 指定的連線將用於取得存取後端的正確憑證。 如果省略,將使用後端的預設連線。

連線 ID 也可以透過關鍵字引數傳入

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

當重複使用為另一個目的(例如 Dataset)定義的 URL 時,這非常有用,該 URL 通常不包含使用者名稱部分。 如果同時指定了 URL 的使用者名稱值和明確的關鍵字引數,則明確的關鍵字引數優先於 URL 的使用者名稱值。

在 DAG 的根目錄中實例化 ObjectStoragePath 是安全的。 在使用路徑之前,不會建立連線。 這表示您可以在 DAG 的全域範圍內建立路徑,並在多個任務中使用它。

將資料儲存到物件儲存

ObjectStoragePath 的行為與 pathlib.Path 物件非常相似。 您可以使用它來直接將資料儲存和載入到物件儲存以及從物件儲存載入和儲存資料。 因此,典型的流程可能如下所示

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

get_air_quality_data 呼叫芬蘭氣象研究所的 API 以取得赫爾辛基地區的空氣品質資料。 它從產生的 json 建立 Pandas DataFrame。 然後,它將資料儲存到物件儲存,並將其即時轉換為 parquet。

物件的索引鍵會從任務的邏輯日期自動產生,因此我們可以每天執行此操作,並且每天都會建立一個新物件。 我們將此索引鍵與基本路徑串連,以建立物件的完整路徑。 最後,在將物件寫入儲存後,我們傳回物件的路徑。 這允許我們在下一個任務中使用該路徑。

分析資料

在理解資料時,您通常想要分析它。 Duck DB 是一個很棒的工具。 它是一個內建的分析型資料庫,可讓您在記憶體中的資料上執行 SQL 查詢。

由於資料已經是 parquet 格式,因此我們可以使用 read_parquet,並且由於 Duck DB 和 ObjectStoragePath 都使用 fsspec,因此我們可以向 Duck DB 註冊 ObjectStoragePath 的後端。 ObjectStoragePath 公開 fs 屬性以執行此操作。 然後,我們可以使用 Duck DB 的 register_filesystem 函數向 Duck DB 註冊後端。

在 Duck DB 中,我們可以從資料建立表格,並對其執行查詢。 查詢以 dataframe 的形式傳回,可用於進一步分析或儲存到物件儲存。

airflow/example_dags/tutorial_objectstorage.py[source]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

您可能會注意到 analyze 函數不知道物件的原始路徑,但它是作為參數傳入並透過 XCom 取得的。 您不需要重新實例化 Path 物件。 連線詳細資訊也會以透明方式處理。

整合所有內容

最終的 DAG 如下所示,它包裝了所有內容,以便我們可以執行它

airflow/example_dags/tutorial_objectstorage.py[source]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.dev.org.tw/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

這個條目有幫助嗎?