Google Cloud Storage Operators (雲端儲存運算子)¶
Cloud Storage 允許在全球範圍內隨時儲存和檢索任何數量的資料。您可以將 Cloud Storage 用於各種情境,包括提供網站內容、儲存資料以進行歸檔和災難復原,或透過直接下載將大型資料物件分發給使用者。
請參閱 Google Transfer Operators (Google 傳輸運算子) 以取得往返 Google Cloud Storage 的專用傳輸運算子列表。
先決條件任務¶
若要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console (雲端主控台) 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件所述。
啟用 API,如 Cloud Console 文件所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
Operators (運算子)¶
GCSTimeSpanFileTransformOperator¶
使用 GCSTimeSpanFileTransformOperator
來轉換在特定時間範圍(資料間隔)內修改的檔案。時間範圍由時間範圍的開始和結束時間戳記定義。如果 DAG 沒有排程下一個 DAG 實例,則時間範圍結束為無限,表示運算子會處理所有早於 data_interval_start
的檔案。
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
在指定的儲存桶上建立新的 ACL 項目。
有關參數定義,請參閱 GCSBucketCreateAclEntryOperator
使用運算子¶
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
範本化¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Storage 文件以為儲存桶建立新的 ACL 項目。
GCSObjectCreateAclEntryOperator¶
在指定的物件上建立新的 ACL 項目。
有關參數定義,請參閱 GCSObjectCreateAclEntryOperator
使用運算子¶
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
範本化¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Storage 插入文件以為 ObjectAccess 建立 ACL 項目。
刪除儲存桶¶
刪除儲存桶允許您從 Google Cloud Storage 移除儲存桶物件。這是透過 GCSDeleteBucketOperator
運算子執行的。
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
您可以將 Jinja 範本化 與 bucket_name
、gcp_conn_id
、impersonation_chain
、user_project
參數搭配使用,以便動態決定值。
Sensors (感測器)¶
GCSObjectExistenceSensor¶
使用 GCSObjectExistenceSensor
來等待 (輪詢) Google Cloud Storage 中檔案的存在。
gcs_object_exists = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
如果您想要在感測器執行時釋出 worker 插槽,也可以在此運算子中使用可延遲模式。
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectsWithPrefixExistenceSensor¶
使用 GCSObjectsWithPrefixExistenceSensor
來等待 (輪詢) Google Cloud Storage 中具有指定前綴的檔案是否存在。
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
如果您希望此感測器非同步執行,從而更有效率地利用 Airflow 部署中的資源,您可以將 deferrable
參數設定為 True。但是,必須啟用觸發器元件,此功能才能運作。
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
使用 GCSUploadSessionCompleteSensor
來檢查 Google Cloud Storage 中具有指定前綴的檔案數量是否發生變化。
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
如果您希望在感測器執行時釋出 worker 插槽,您可以將參數 deferrable
設定為 True。
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
使用 GCSObjectUpdateSensor
來檢查 Google Cloud Storage 中的物件是否已更新。
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
如果您希望此感測器非同步執行,從而有效率地利用 Airflow 部署中的資源,您可以將 deferrable
參數設定為 True。但是,必須啟用觸發器元件,此功能才能運作。
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
更多資訊¶
感測器具有不同的模式,這些模式決定了任務執行時資源的行為。請參閱 Airflow 感測器文件,以取得使用感測器的最佳實務。