在 Google Cloud Storage 中傳輸資料¶
Google Cloud Storage (GCS) 用於儲存來自各種應用程式的大量資料。請注意,在 GCS 術語中,檔案稱為物件,因此本指南中「物件」和「檔案」這兩個術語可以互換使用。有多個運算子的目的是複製資料,作為 Google Cloud 服務的一部分。此頁面說明如何使用這些運算子。另請參閱 Google Cloud Storage 運算子,以了解用於管理 Google Cloud Storage 儲存區的運算子。
Cloud Storage Transfer Service¶
有許多運算子可以管理 Google Cloud Data Transfer Service。如果您想建立新的資料傳輸任務,請使用運算子 CloudDataTransferServiceCreateJobOperator
您也可以使用此服務的先前運算子 - CloudDataTransferServiceGCSToGCSOperator
這些運算子不會在本機控制複製過程,而是使用 Google 資源,這使得它們能夠更快、更經濟地執行此任務。當 Airflow 未託管在 Google Cloud 中時,經濟效益尤其顯著,因為這些運算子減少了輸出流量。
如果啟用指定是否應在物件傳輸到接收器後從來源刪除物件的選項,這些運算子會修改來源物件。
當您使用 Google Cloud Data Transfer Service 時,您可以指定是否允許覆寫接收器中已存在的物件、是否應刪除僅存在於接收器中的物件,以及是否應在物件傳輸到接收器後從來源刪除物件。
來源物件可以使用包含和排除前綴來指定,也可以根據檔案修改日期來指定。
如果您需要有關如何使用的資訊,請查看指南:Google Cloud Transfer Service 運算子
Google Cloud Storage 的專用傳輸運算子¶
請參閱 Google 傳輸運算子,以取得往返 Google Cloud Storage 的專用傳輸運算子列表。
本機傳輸¶
有兩個運算子用於複製資料,整個過程都在本機控制。
在下一節中將對它們進行描述。
先決條件任務¶
若要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選取或建立 Cloud Platform 專案。
為您的專案啟用計費,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
運算子¶
GCSToGCSOperator¶
GCSToGCSOperator
允許您在 GCS 內複製一個或多個檔案。檔案可以在兩個不同的儲存區之間或在一個儲存區內複製。複製始終在不考慮目標儲存區初始狀態的情況下進行。
僅當檔案移動選項處於啟用狀態時,此運算子才會刪除來源儲存區中的物件。在兩個不同儲存區之間複製檔案時,此運算子永遠不會刪除目標儲存區中的資料。
當您使用此運算子時,您可以指定是否應在物件傳輸到接收器後從來源刪除物件。來源物件可以使用單一萬用字元來指定,也可以根據檔案修改日期來指定。
可以透過使用 match_glob 欄位 來完成根據路徑篩選物件。您應避免在來源物件的路徑中使用 delimiter
欄位或萬用字元,因為這兩種做法都已棄用。此外,可以根據檔案的建立日期 (is_older_than
) 或修改日期 (last_modified_time
和 maximum_modified_time
) 達成篩選。
此運算子的預設運作方式可以比作 cp
命令。當檔案移動選項處於啟用狀態時,此運算子的功能類似於 mv
命令。
以下是使用 GCSToGCSOperator 複製單一檔案、使用萬用字元複製多個檔案、複製多個檔案、移動單一檔案和移動多個檔案的範例。
複製單一檔案¶
以下範例會將單一檔案 OBJECT_1
從 BUCKET_1_SRC
GCS 儲存區複製到 BUCKET_1_DST
儲存區。請注意,如果旗標 exact_match=False
,則 source_object
將被視為在 BUCKET_1_SRC
GCS 儲存區中搜尋物件的前綴。這就是為什麼如果找到任何物件,它們也會被複製。為了防止這種情況發生,請使用 exact_match=False
。
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
複製多個檔案¶
有多種方法可以複製多個檔案,以下列出各種範例。
如先前所述,delimiter
欄位以及在來源物件中使用萬用字元 (*
) 都已棄用。因此,不建議使用它們 - 而是使用 match_glob
,如下所示
以下範例會將與 data/
資料夾中的 glob 模式相符的檔案從 BUCKET_1_SRC
GCS 儲存區複製到 BUCKET_1_DST
儲存區中的 backup/
資料夾。
copy_files_with_match_glob = GCSToGCSOperator(
task_id="copy_files_with_match_glob",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
match_glob="**/*.txt",
)
以下範例會將 subdir/
資料夾中的所有檔案 (即 subdir/a.csv、subdir/b.csv、subdir/c.csv) 從 BUCKET_1_SRC
GCS 儲存區複製到 BUCKET_1_DST
儲存區中的 backup/
資料夾。(即 backup/a.csv、backup/b.csv、backup/c.csv)
copy_files = GCSToGCSOperator(
task_id="copy_files",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + "subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[
PREFIX + OBJECT_1,
PREFIX + OBJECT_2,
], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
最後,可以省略 source_object
引數,而是為 source_objects
引數提供列表來複製檔案。在此範例中,OBJECT_1
和 OBJECT_2
將從 BUCKET_1_SRC
複製到 BUCKET_1_DST
。提供大小為 1 的列表的功能與為 source_object
引數提供值的功能相同。
移動單一檔案¶
為 move
引數提供 True
會導致運算子在複製完成後刪除 source_object
。請注意,如果旗標 exact_match=False
,則 source_object
將被視為在 BUCKET_1_SRC
GCS 儲存區中搜尋物件的前綴。這就是為什麼如果找到任何物件,它們也會被複製。為了防止這種情況發生,請使用 exact_match=False
。
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
移動多個檔案¶
可以透過為 move
引數提供 True
來移動多個檔案。關於萬用字元和 delimiter
引數的相同規則適用於移動和複製。
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBucketsOperator¶
GCSSynchronizeBucketsOperator
運算子會檢查目標儲存區的初始狀態,然後將其與來源儲存區進行比較。基於此,它會建立一個操作計劃,描述應從目標儲存區刪除哪些物件、應覆寫哪些物件以及應複製哪些物件。
此運算子永遠不會修改來源儲存區中的資料。
當您使用此運算子時,您可以指定是否允許覆寫接收器中已存在的物件、是否應刪除僅存在於接收器中的物件、是否要處理子目錄或要處理哪個子目錄。
此運算子的運作方式可以比作 rsync
命令。
基本同步¶
以下範例將確保 BUCKET_1_SRC
中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名檔案,則不會覆寫它們。它不會刪除 BUCKET_1_DST
中不在 BUCKET_1_SRC
中的任何檔案。
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
完整儲存區同步¶
此範例將確保 BUCKET_1_SRC
中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名檔案,則會覆寫它們。它會刪除 BUCKET_1_DST
中不在 BUCKET_1_SRC
中的任何檔案。
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
同步到子目錄¶
以下範例將確保 BUCKET_1_SRC
中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST
中的 subdir
資料夾中。如果 BUCKET_1_DST/subdir
中已存在同名檔案,則不會覆寫它們,並且不會刪除 BUCKET_1_DST/subdir
中不在 BUCKET_1_SRC
中的任何檔案。
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
從子目錄同步¶
此範例將確保 BUCKET_1_SRC/subdir
中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名檔案,則不會覆寫它們,並且不會刪除 BUCKET_1_DST
中不在 BUCKET_1_SRC/subdir
中的任何檔案。
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)