Google Cloud Storage Operators (雲端儲存運算子)

Cloud Storage 允許在全球範圍內隨時儲存和檢索任何數量的資料。您可以將 Cloud Storage 用於各種情境,包括提供網站內容、儲存資料以進行歸檔和災難復原,或透過直接下載將大型資料物件分發給使用者。

請參閱 Google Transfer Operators (Google 傳輸運算子) 以取得往返 Google Cloud Storage 的專用傳輸運算子列表。

先決條件任務

若要使用這些運算子,您必須執行以下幾項操作

Operators (運算子)

GCSTimeSpanFileTransformOperator

使用 GCSTimeSpanFileTransformOperator 來轉換在特定時間範圍(資料間隔)內修改的檔案。時間範圍由時間範圍的開始和結束時間戳記定義。如果 DAG 沒有排程下一個 DAG 實例,則時間範圍結束為無限,表示運算子會處理所有早於 data_interval_start 的檔案。

tests/system/google/cloud/gcs/example_gcs_transform_timespan.py[source]

gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
    task_id="gcs_timespan_transform_files",
    source_bucket=BUCKET_NAME_SRC,
    source_prefix=SOURCE_PREFIX,
    source_gcp_conn_id=SOURCE_GCP_CONN_ID,
    destination_bucket=BUCKET_NAME_DST,
    destination_prefix=DESTINATION_PREFIX,
    destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
    transform_script=["python", TRANSFORM_SCRIPT_PATH],
)

GCSBucketCreateAclEntryOperator

在指定的儲存桶上建立新的 ACL 項目。

有關參數定義,請參閱 GCSBucketCreateAclEntryOperator

使用運算子

tests/system/google/cloud/gcs/example_gcs_acl.py[source]

gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_BUCKET_ROLE,
    task_id="gcs_bucket_create_acl_entry_task",
)

範本化

template_fields: Sequence[str] = (
    "bucket",
    "entity",
    "role",
    "user_project",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud Storage 文件以為儲存桶建立新的 ACL 項目

GCSObjectCreateAclEntryOperator

在指定的物件上建立新的 ACL 項目。

有關參數定義,請參閱 GCSObjectCreateAclEntryOperator

使用運算子

tests/system/google/cloud/gcs/example_gcs_acl.py[source]

gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    object_name=FILE_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_OBJECT_ROLE,
    task_id="gcs_object_create_acl_entry_task",
)

範本化

template_fields: Sequence[str] = (
    "bucket",
    "object_name",
    "entity",
    "generation",
    "role",
    "user_project",
    "impersonation_chain",
)

更多資訊

請參閱 Google Cloud Storage 插入文件以為 ObjectAccess 建立 ACL 項目

刪除儲存桶

刪除儲存桶允許您從 Google Cloud Storage 移除儲存桶物件。這是透過 GCSDeleteBucketOperator 運算子執行的。

tests/system/google/cloud/gcs/example_gcs_upload_download.py[source]

delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)

您可以將 Jinja 範本化bucket_namegcp_conn_idimpersonation_chainuser_project 參數搭配使用,以便動態決定值。

參考文檔

如需更多資訊,請參閱

Sensors (感測器)

GCSObjectExistenceSensor

使用 GCSObjectExistenceSensor 來等待 (輪詢) Google Cloud Storage 中檔案的存在。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_exists = GCSObjectExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_exists_task",
)

如果您想要在感測器執行時釋出 worker 插槽,也可以在此運算子中使用可延遲模式。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_exists_defered = GCSObjectExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)

GCSObjectsWithPrefixExistenceSensor

使用 GCSObjectsWithPrefixExistenceSensor 來等待 (輪詢) Google Cloud Storage 中具有指定前綴的檔案是否存在。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME[:5],
    task_id="gcs_object_with_prefix_exists_task",
)

如果您希望此感測器非同步執行,從而更有效率地利用 Airflow 部署中的資源,您可以將 deferrable 參數設定為 True。但是,必須啟用觸發器元件,此功能才能運作。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME[:5],
    task_id="gcs_object_with_prefix_exists_task_async",
    deferrable=True,
)

GCSUploadSessionCompleteSensor

使用 GCSUploadSessionCompleteSensor 來檢查 Google Cloud Storage 中具有指定前綴的檔案數量是否發生變化。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME,
    inactivity_period=15,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    task_id="gcs_upload_session_complete_task",
)

如果您希望在感測器執行時釋出 worker 插槽,您可以將參數 deferrable 設定為 True。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME,
    inactivity_period=15,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    task_id="gcs_upload_session_async_complete",
    deferrable=True,
)

GCSObjectUpdateSensor

使用 GCSObjectUpdateSensor 來檢查 Google Cloud Storage 中的物件是否已更新。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_update_object_exists = GCSObjectUpdateSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_update_sensor_task",
)

如果您希望此感測器非同步執行,從而有效率地利用 Airflow 部署中的資源,您可以將 deferrable 參數設定為 True。但是,必須啟用觸發器元件,此功能才能運作。

tests/system/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_update_object_exists_async = GCSObjectUpdateSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_update_sensor_task_async",
    deferrable=True,
)

更多資訊

感測器具有不同的模式,這些模式決定了任務執行時資源的行為。請參閱 Airflow 感測器文件,以取得使用感測器的最佳實務。

參考文檔

如需更多資訊,請參閱

此條目是否對您有幫助?