在 Google Cloud Storage 中傳輸資料

Google Cloud Storage (GCS) 用於儲存來自各種應用程式的大量資料。請注意,在 GCS 術語中,檔案稱為物件,因此本指南中「物件」和「檔案」這兩個術語可以互換使用。有多個運算子的目的是複製資料,作為 Google Cloud 服務的一部分。此頁面說明如何使用這些運算子。另請參閱 Google Cloud Storage 運算子,以了解用於管理 Google Cloud Storage 儲存區的運算子。

Cloud Storage Transfer Service

有許多運算子可以管理 Google Cloud Data Transfer Service。如果您想建立新的資料傳輸任務,請使用運算子 CloudDataTransferServiceCreateJobOperator 您也可以使用此服務的先前運算子 - CloudDataTransferServiceGCSToGCSOperator

這些運算子不會在本機控制複製過程,而是使用 Google 資源,這使得它們能夠更快、更經濟地執行此任務。當 Airflow 未託管在 Google Cloud 中時,經濟效益尤其顯著,因為這些運算子減少了輸出流量。

如果啟用指定是否應在物件傳輸到接收器後從來源刪除物件的選項,這些運算子會修改來源物件。

當您使用 Google Cloud Data Transfer Service 時,您可以指定是否允許覆寫接收器中已存在的物件、是否應刪除僅存在於接收器中的物件,以及是否應在物件傳輸到接收器後從來源刪除物件。

來源物件可以使用包含和排除前綴來指定,也可以根據檔案修改日期來指定。

如果您需要有關如何使用的資訊,請查看指南:Google Cloud Transfer Service 運算子

Google Cloud Storage 的專用傳輸運算子

請參閱 Google 傳輸運算子,以取得往返 Google Cloud Storage 的專用傳輸運算子列表。

本機傳輸

有兩個運算子用於複製資料,整個過程都在本機控制。

在下一節中將對它們進行描述。

先決條件任務

若要使用這些運算子,您必須執行以下幾項操作

運算子

GCSToGCSOperator

GCSToGCSOperator 允許您在 GCS 內複製一個或多個檔案。檔案可以在兩個不同的儲存區之間或在一個儲存區內複製。複製始終在不考慮目標儲存區初始狀態的情況下進行。

僅當檔案移動選項處於啟用狀態時,此運算子才會刪除來源儲存區中的物件。在兩個不同儲存區之間複製檔案時,此運算子永遠不會刪除目標儲存區中的資料。

當您使用此運算子時,您可以指定是否應在物件傳輸到接收器後從來源刪除物件。來源物件可以使用單一萬用字元來指定,也可以根據檔案修改日期來指定。

可以透過使用 match_glob 欄位 來完成根據路徑篩選物件。您應避免在來源物件的路徑中使用 delimiter 欄位或萬用字元,因為這兩種做法都已棄用。此外,可以根據檔案的建立日期 (is_older_than) 或修改日期 (last_modified_timemaximum_modified_time) 達成篩選。

此運算子的預設運作方式可以比作 cp 命令。當檔案移動選項處於啟用狀態時,此運算子的功能類似於 mv 命令。

以下是使用 GCSToGCSOperator 複製單一檔案、使用萬用字元複製多個檔案、複製多個檔案、移動單一檔案和移動多個檔案的範例。

複製單一檔案

以下範例會將單一檔案 OBJECT_1BUCKET_1_SRC GCS 儲存區複製到 BUCKET_1_DST 儲存區。請注意,如果旗標 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存區中搜尋物件的前綴。這就是為什麼如果找到任何物件,它們也會被複製。為了防止這種情況發生,請使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)

複製多個檔案

有多種方法可以複製多個檔案,以下列出各種範例。

如先前所述,delimiter 欄位以及在來源物件中使用萬用字元 (*) 都已棄用。因此,不建議使用它們 - 而是使用 match_glob,如下所示

以下範例會將與 data/ 資料夾中的 glob 模式相符的檔案從 BUCKET_1_SRC GCS 儲存區複製到 BUCKET_1_DST 儲存區中的 backup/ 資料夾。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)

以下範例會將 subdir/ 資料夾中的所有檔案 (即 subdir/a.csv、subdir/b.csv、subdir/c.csv) 從 BUCKET_1_SRC GCS 儲存區複製到 BUCKET_1_DST 儲存區中的 backup/ 資料夾。(即 backup/a.csv、backup/b.csv、backup/c.csv)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

copy_files = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + "subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[
        PREFIX + OBJECT_1,
        PREFIX + OBJECT_2,
    ],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

最後,可以省略 source_object 引數,而是為 source_objects 引數提供列表來複製檔案。在此範例中,OBJECT_1OBJECT_2 將從 BUCKET_1_SRC 複製到 BUCKET_1_DST。提供大小為 1 的列表的功能與為 source_object 引數提供值的功能相同。

移動單一檔案

move 引數提供 True 會導致運算子在複製完成後刪除 source_object。請注意,如果旗標 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存區中搜尋物件的前綴。這就是為什麼如果找到任何物件,它們也會被複製。為了防止這種情況發生,請使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)

移動多個檔案

可以透過為 move 引數提供 True 來移動多個檔案。關於萬用字元和 delimiter 引數的相同規則適用於移動和複製。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

GCSSynchronizeBucketsOperator

GCSSynchronizeBucketsOperator 運算子會檢查目標儲存區的初始狀態,然後將其與來源儲存區進行比較。基於此,它會建立一個操作計劃,描述應從目標儲存區刪除哪些物件、應覆寫哪些物件以及應複製哪些物件。

此運算子永遠不會修改來源儲存區中的資料。

當您使用此運算子時,您可以指定是否允許覆寫接收器中已存在的物件、是否應刪除僅存在於接收器中的物件、是否要處理子目錄或要處理哪個子目錄。

此運算子的運作方式可以比作 rsync 命令。

基本同步

以下範例將確保 BUCKET_1_SRC 中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆寫它們。它不會刪除 BUCKET_1_DST 中不在 BUCKET_1_SRC 中的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)

完整儲存區同步

此範例將確保 BUCKET_1_SRC 中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則會覆寫它們。它會刪除 BUCKET_1_DST 中不在 BUCKET_1_SRC 中的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

同步到子目錄

以下範例將確保 BUCKET_1_SRC 中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST 中的 subdir 資料夾中。如果 BUCKET_1_DST/subdir 中已存在同名檔案,則不會覆寫它們,並且不會刪除 BUCKET_1_DST/subdir 中不在 BUCKET_1_SRC 中的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)

從子目錄同步

此範例將確保 BUCKET_1_SRC/subdir 中的所有檔案 (包括任何子目錄中的檔案) 也位於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆寫它們,並且不會刪除 BUCKET_1_DST 中不在 BUCKET_1_SRC/subdir 中的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[原始碼]

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)

此條目是否有幫助?