物件儲存¶
版本 2.8.0 新增。
這是一個 實驗性功能。
所有主要的雲端供應商都提供物件儲存中的持久性資料儲存。這些並非傳統的「POSIX」檔案系統。為了儲存數百 PB 的資料而沒有任何單點故障,物件儲存以更簡單的物件名稱 => 資料模型取代了傳統的檔案系統目錄樹。為了實現遠端存取,物件上的操作通常以(較慢的)HTTP REST 操作形式提供。
Airflow 在物件儲存之上提供了通用抽象,例如 s3、gcs 和 Azure Blob 儲存。此抽象讓您可以在 DAG 中使用各種物件儲存系統,而無需變更程式碼來處理每個不同的物件儲存系統。此外,它還允許您使用大多數標準 Python 模組,例如 shutil
,這些模組可以與類檔案物件一起運作。
對特定物件儲存系統的支援取決於您已安裝的供應商。例如,如果您已安裝 apache-airflow-providers-google
供應商,您將能夠將 gcs
方案用於物件儲存。Airflow 開箱即用,提供對 file
方案的支援。
注意
對 s3 的支援需要您安裝 apache-airflow-providers-amazon[s3fs]
。這是因為它依賴於 aiobotocore
,預設情況下未安裝它,因為它可能會與 botocore
產生相依性挑戰。
雲端物件儲存並非真實的檔案系統¶
物件儲存並非真實的檔案系統,儘管它們可能看起來像。它們不支援真實檔案系統的所有操作。主要差異在於
不保證原子性重新命名操作。這表示如果您將檔案從一個位置移動到另一個位置,它將被複製,然後被刪除。如果複製失敗,您將遺失該檔案。
目錄是模擬的,並且可能使它們的運作速度變慢。例如,列出目錄可能需要列出 bucket 中的所有物件,並依前綴進行篩選。
在檔案中搜尋可能需要大量的呼叫開銷,從而降低效能,或者可能根本不支援。
Airflow 依賴 fsspec 在不同的物件儲存系統之間提供一致的體驗。它實作了本機檔案快取以加速存取。但是,在設計 DAG 時,您應該注意物件儲存的限制。
基本使用¶
若要使用物件儲存,您需要使用您要互動的物件 URI 來實例化 Path(請參閱下文)物件。例如,若要指向 s3 中的 bucket,您可以執行以下操作
from airflow.io.path import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
URI 的使用者名稱部分代表 Airflow 連線 ID,並且是選用的。它可以作為單獨的關鍵字引數傳遞
# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
列出檔案物件
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
在目錄樹狀結構內導航
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
開啟檔案
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
利用 XCOM,您可以在任務之間傳遞路徑
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
設定¶
在其基本使用中,物件儲存抽象不需要太多設定,並且依賴於標準 Airflow 連線機制。這表示您可以使用 conn_id
引數來指定要使用的連線。連線的任何設定都會向下推送到底層實作。例如,如果您使用 s3,您可以指定 aws_access_key_id
和 aws_secret_access_key
,但也可以新增額外的引數,例如 endpoint_url
以指定自訂端點。
替代後端¶
可以為方案或協定設定替代後端。這可以透過將 backend
附加到方案來完成。例如,若要為 dbfs
方案啟用 databricks 後端,您可以執行以下操作
from airflow.io.path import ObjectStoragePath
from airflow.io.store import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
注意
為了在任務之間重複使用註冊,請確保將後端附加到 DAG 的最上層。否則,後端將無法在多個任務之間使用。
路徑 API¶
物件儲存抽象實作為 路徑 API。並建立在 Universal Pathlib 之上。這表示您可以使用與本機檔案系統幾乎相同的 API 來與物件儲存互動。在本節中,我們僅列出兩個 API 之間的差異。超出標準路徑 API 的擴充操作,例如複製和移動,將在下一節中列出。有關每個操作的詳細資訊(例如它們接受哪些引數),請參閱 ObjectStoragePath
類別的文件。
mkdir¶
在指定的路徑或 bucket/容器內建立目錄項目。對於沒有真實目錄的系統,它可能僅為此實例建立目錄項目,而不會影響真實的檔案系統。
如果 parents
為 True
,則會根據需要建立此路徑的任何遺失的父目錄。
touch¶
在此給定路徑建立檔案,或更新時間戳記。如果 truncate
為 True
,則檔案會被截斷,這是預設值。如果檔案已存在,則如果 exists_ok
為 true(並且其修改時間更新為目前時間),則函數會成功,否則會引發 FileExistsError
。
stat¶
傳回類似 stat_result
的物件,該物件支援以下屬性:st_size
、st_mtime
、st_mode
,但也像字典一樣運作,可以提供有關物件的其他中繼資料。例如,對於 s3,它將傳回額外的金鑰,例如:['ETag', 'ContentType']
。如果您的程式碼需要在不同的物件儲存之間可移植,請勿依賴擴充的中繼資料。
擴充功能¶
以下操作不是標準路徑 API 的一部分,但物件儲存抽象支援它們。
bucket¶
傳回 bucket 名稱。
checksum¶
傳回檔案的 checksum。
container¶
bucket 的別名
fs¶
用於存取已實例化檔案系統的便利屬性
key¶
傳回物件金鑰。
namespace¶
傳回物件的命名空間。通常這是協定,例如 s3://
以及 bucket 名稱。
path¶
用於檔案系統實例的 fsspec
相容路徑
protocol¶
filesystem_spec 協定。
read_block¶
從此給定路徑的檔案讀取位元組區塊。
從檔案的偏移量開始,讀取長度位元組。如果設定了分隔符,則我們確保讀取在分隔符邊界處開始和停止,這些邊界遵循位置偏移量和偏移量 + 長度。如果偏移量為零,則我們從零開始。傳回的位元組字串將包含結束分隔符字串。
如果偏移量 + 長度超出 eof,則讀取至 eof。
sign¶
建立代表給定路徑的簽署 URL。某些實作允許產生臨時 URL,作為委派憑證的一種方式。
size¶
傳回給定路徑檔案的大小(以位元組為單位)。
storage_options¶
用於實例化底層檔案系統的儲存選項。
ukey¶
檔案屬性的雜湊值,用於判斷檔案是否已變更。
複製與移動¶
本文檔說明了 copy
和 move
操作的預期行為,特別是對於跨物件儲存(例如,檔案 -> s3)的行為。每個方法都會將檔案或目錄從 source
位置複製或移動到 target
位置。預期行為與 fsspec
指定的行為相同。對於跨物件儲存目錄複製,Airflow 需要走訪目錄樹狀結構並個別複製每個檔案。這是透過將每個檔案從來源串流到目標來完成的。
外部整合¶
許多其他專案(例如 DuckDB、Apache Iceberg 等)都可以使用物件儲存抽象。通常,這是透過傳遞底層 fsspec
實作來完成的。為此,ObjectStoragePath
公開了 fs
屬性。例如,以下程式碼適用於 duckdb
,以便使用來自 Airflow 的連線詳細資訊來連線到 s3 和 parquet 檔案(由 ObjectStoragePath
指示)被讀取
import duckdb
from airflow.io.path import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")