airflow.providers.google.cloud.hooks.gcs

此模組包含 Google Cloud Storage hook。

模組內容

類別

GCSHook

使用 Google Cloud 連線與 Google Cloud Storage 互動。

GCSAsyncHook

GCSAsyncHook 在觸發 worker 上執行,繼承自 GoogleBaseAsyncHook。

函數

gcs_object_is_directory(bucket)

如果給定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是目錄或空 bucket,則返回 True。

parse_json_from_gcs(gcp_conn_id, file_uri[, ...])

從 Google Cloud Storage 下載並解析 json 檔案。

屬性

RT

T

FParams

List

DEFAULT_TIMEOUT

PROVIDE_BUCKET

airflow.providers.google.cloud.hooks.gcs.RT[原始碼]
airflow.providers.google.cloud.hooks.gcs.T[原始碼]
airflow.providers.google.cloud.hooks.gcs.FParams[原始碼]
airflow.providers.google.cloud.hooks.gcs.List[原始碼]
airflow.providers.google.cloud.hooks.gcs.DEFAULT_TIMEOUT = 60[原始碼]
airflow.providers.google.cloud.hooks.gcs.PROVIDE_BUCKET: str[原始碼]
class airflow.providers.google.cloud.hooks.gcs.GCSHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]

基底類別: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

使用 Google Cloud 連線與 Google Cloud Storage 互動。

get_conn()[原始碼]

返回 Google Cloud Storage 服務物件。

copy(source_bucket, source_object, destination_bucket=None, destination_object=None)[原始碼]

將物件從一個 bucket 複製到另一個 bucket,並在要求時重新命名。

destination_bucket 或 destination_object 可以省略,在這種情況下會使用來源 bucket/object,但不能同時省略。

參數
  • source_bucket (str) – 要複製來源物件的 bucket。

  • source_object (str) – 要複製的物件。

  • destination_bucket (str | None) – 要複製到目標物件的目的地 bucket。可以省略;然後使用相同的 bucket。

  • destination_object (str | None) – 如果給定,物件的(重新命名)路徑。可以省略;然後使用相同的名稱。

rewrite(source_bucket, source_object, destination_bucket, destination_object=None)[原始碼]

類似於 copy;支援超過 5 TB 的檔案,以及在位置和/或儲存類別之間複製。

destination_object 可以省略,在這種情況下會使用 source_object。

參數
  • source_bucket (str) – 要複製來源物件的 bucket。

  • source_object (str) – 要複製的物件。

  • destination_bucket (str) – 要複製到目標物件的目的地。

  • destination_object (str | None) – 如果給定,物件的(重新命名)路徑。可以省略;然後使用相同的名稱。

download(bucket_name: str, object_name: str, filename: None = None, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None = 1, user_project: str | None = None) bytes[原始碼]
download(bucket_name: str, object_name: str, filename: str, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None = 1, user_project: str | None = None) str

從 Google Cloud Storage 下載檔案。

當未提供 filename 時,運算子會將檔案載入記憶體並返回其內容。當提供 filename 時,它會將檔案寫入指定位置並返回該位置。對於超出可用記憶體大小的檔案,建議寫入檔案。

參數
  • bucket_name – 要從中提取的 bucket。

  • object_name – 要提取的物件。

  • filename – 如果設定,則為應將檔案寫入的本地檔案路徑。

  • chunk_size – Blob 分塊大小。

  • timeout – 請求逾時秒數。

  • num_max_attempts – 下載檔案的嘗試次數。

  • user_project – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

download_as_byte_array(bucket_name, object_name, chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1)[原始碼]

從 Google Cloud Storage 下載檔案。

當未提供 filename 時,運算子會將檔案載入記憶體並返回其內容。當提供 filename 時,它會將檔案寫入指定位置並返回該位置。對於超出可用記憶體大小的檔案,建議寫入檔案。

參數
  • bucket_name (str) – 要從中提取的 bucket。

  • object_name (str) – 要提取的物件。

  • chunk_size (int | None) – Blob 分塊大小。

  • timeout (int | None) – 請求逾時秒數。

  • num_max_attempts (int | None) – 下載檔案的嘗試次數。

provide_file(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, dir=None, user_project=None)[原始碼]

將檔案下載到臨時目錄並返回檔案控制代碼。

您可以使用傳遞 bucket_name 和 object_name 參數或僅 object_url 參數來使用此方法。

參數
  • bucket_name (str) – 要從中提取的 bucket。

  • object_name (str | None) – 要提取的物件。

  • object_url (str | None) – 檔案參考 URL。必須以 “gs://” 開頭

  • dir (str | None) – 要將檔案下載到的臨時子目錄。(傳遞給 NamedTemporaryFile)

  • user_project (str | None) – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

返回

檔案控制代碼

返回類型

collections.abc.Generator[IO[bytes], None, None]

provide_file_and_upload(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, user_project=None)[原始碼]

建立臨時檔案,返回檔案控制代碼,並在關閉時上傳檔案內容。

您可以使用傳遞 bucket_name 和 object_name 參數或僅 object_url 參數來使用此方法。

參數
  • bucket_name (str) – 要從中提取的 bucket。

  • object_name (str | None) – 要提取的物件。

  • object_url (str | None) – 檔案參考 URL。必須以 “gs://” 開頭

  • user_project (str | None) – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

返回

檔案控制代碼

返回類型

collections.abc.Generator[IO[bytes], None, None]

upload(bucket_name, object_name, filename=None, data=None, mime_type=None, gzip=False, encoding='utf-8', chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1, metadata=None, cache_control=None, user_project=None)[原始碼]

將本地檔案或檔案資料作為字串或位元組上傳到 Google Cloud Storage。

參數
  • bucket_name (str) – 要上傳到的 bucket。

  • object_name (str) – 上傳檔案時要設定的物件名稱。

  • filename (str | None) – 要上傳的檔案的本地檔案路徑。

  • data (str | bytes | None) – 要上傳的檔案資料,以字串或位元組表示。

  • mime_type (str | None) – 上傳檔案時設定的檔案 MIME 類型。

  • gzip (bool) – 是否壓縮本地檔案或檔案資料以進行上傳的選項

  • encoding (str) – 如果以字串形式提供檔案資料,則為位元組編碼

  • chunk_size (int | None) – Blob 分塊大小。

  • timeout (int | None) – 請求逾時秒數。

  • num_max_attempts (int) – 嘗試上傳檔案的次數。

  • metadata (dict | None) – 要與檔案一起上傳的 metadata。

  • cache_control (str | None) – Cache-Control metadata 欄位。

  • user_project (str | None) – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

exists(bucket_name, object_name, retry=DEFAULT_RETRY)[原始碼]

檢查 Google Cloud Storage 中是否存在檔案。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的 blob_name 名稱。

  • retry (google.api_core.retry.Retry) – (可選) 如何重試 RPC

get_blob_update_time(bucket_name, object_name)[原始碼]

取得 Google Cloud Storage 中檔案的更新時間。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要從 Google Cloud Storage bucket 取得更新時間的 blob 名稱。

is_updated_after(bucket_name, object_name, ts)[原始碼]

檢查 blob_name 在 Google Cloud Storage 中是否已更新。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • ts (datetime.datetime) – 要對照檢查的時間戳記。

is_updated_between(bucket_name, object_name, min_ts, max_ts)[原始碼]

檢查 blob_name 在 Google Cloud Storage 中是否已更新。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • min_ts (datetime.datetime) – 要對照檢查的最小時間戳記。

  • max_ts (datetime.datetime) – 要對照檢查的最大時間戳記。

is_updated_before(bucket_name, object_name, ts)[原始碼]

檢查 blob_name 在 Google Cloud Storage 中是否在給定時間之前更新。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • ts (datetime.datetime) – 要對照檢查的時間戳記。

is_older_than(bucket_name, object_name, seconds)[原始碼]

檢查物件是否比給定時間舊。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • seconds (int) – 要對照檢查的時間(秒)。

delete(bucket_name, object_name)[source]

從儲存貯體中刪除物件。

參數
  • bucket_name (str) – 儲存貯體的名稱,物件位於此儲存貯體中

  • object_name (str) – 要刪除的物件名稱

delete_bucket(bucket_name, force=False, user_project=None)[source]

從 Google Cloud Storage 刪除儲存貯體物件。

參數
  • bucket_name (str) – 將被刪除的儲存貯體名稱

  • force (bool) – false 不允許刪除非空的儲存貯體,設定 force=True 允許刪除非空的儲存貯體

  • user_project (str | None) – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

list(bucket_name, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None, user_project=None)[source]

列出儲存貯體中具有單一字首或多個字首的所有物件。

參數
  • bucket_name (str) – 儲存貯體名稱

  • versions (bool | None) – 若為 true,則列出物件的所有版本

  • max_results (int | None) – 單頁回應中要傳回的項目最大計數

  • prefix (str | List[str] | None) – 字串或字串列表,用於篩選名稱以其開頭的物件

  • delimiter (str | None) – (已棄用) 根據分隔符號篩選物件 (例如 ‘.csv’)

  • match_glob (str | None) – (選用) 根據字串給定的 glob 模式篩選物件 (例如,'**/*/.json')。

  • user_project (str | None) – 要為請求計費的 Google Cloud 專案的識別碼。Requester Pays buckets (請求者付費 bucket) 必需。

返回

符合篩選條件的物件名稱串流

list_by_timespan(bucket_name, timespan_start, timespan_end, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None)[source]

列出儲存貯體中具有給定字串字首的所有物件,這些物件在時間範圍內已更新。

參數
  • bucket_name (str) – 儲存貯體名稱

  • timespan_start (datetime.datetime) – 將傳回在此日期時間 (UTC) 或之後更新的物件

  • timespan_end (datetime.datetime) – 將傳回在此日期時間 (UTC) 之前更新的物件

  • versions (bool | None) – 若為 true,則列出物件的所有版本

  • max_results (int | None) – 單頁回應中要傳回的項目最大計數

  • prefix (str | None) – 字首字串,用於篩選名稱以此字首開頭的物件

  • delimiter (str | None) – (已棄用) 根據分隔符號篩選物件 (例如 ‘.csv’)

  • match_glob (str | None) – (選用) 根據字串給定的 glob 模式篩選物件 (例如,'**/*/.json')。

返回

符合篩選條件的物件名稱串流

返回類型

List[str]

get_size(bucket_name, object_name)[source]

取得 Google Cloud Storage 中檔案的大小。

參數
  • bucket_name (str) – Blob 名稱所在的 Google Cloud Storage 儲存貯體。

  • object_name (str) – 要在 Google Cloud Storage 儲存貯體名稱中檢查的物件名稱。

get_crc32c(bucket_name, object_name)[source]

取得 Google Cloud Storage 中物件的 CRC32c 檢查碼。

參數
  • bucket_name (str) – Blob 名稱所在的 Google Cloud Storage 儲存貯體。

  • object_name (str) – 要在 Google Cloud Storage 儲存貯體名稱中檢查的物件名稱。

get_md5hash(bucket_name, object_name)[source]

取得 Google Cloud Storage 中物件的 MD5 雜湊值。

參數
  • bucket_name (str) – Blob 名稱所在的 Google Cloud Storage 儲存貯體。

  • object_name (str) – 要在 Google Cloud Storage 儲存貯體名稱中檢查的物件名稱。

get_metadata(bucket_name, object_name)[source]

取得 Google Cloud Storage 中物件的 metadata。

參數
  • bucket_name (str) – 物件所在的 Google Cloud Storage 儲存貯體的名稱。

  • object_name (str) – 包含所需 metadata 的物件名稱

返回

與物件相關聯的 metadata

返回類型

dict | None

create_bucket(bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None)[source]

建立新的儲存貯體。

Google Cloud Storage 使用扁平命名空間,因此您無法建立名稱已在使用的儲存貯體。

另請參閱

如需更多資訊,請參閱儲存貯體命名指南: https://cloud.google.com/storage/docs/bucketnaming.html#requirements

參數
  • bucket_name (str) – 儲存貯體的名稱。

  • resource (dict | None) – 包含建立儲存貯體參數的選用字典。如需可用參數的資訊,請參閱 Cloud Storage API 文件: https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    這定義了儲存貯體中物件的儲存方式,並決定了 SLA 和儲存成本。值包括

    • MULTI_REGIONAL

    • REGIONAL

    • STANDARD

    • NEARLINE

    • COLDLINE.

    如果在建立儲存貯體時未指定此值,則預設為 STANDARD。

  • location (str) –

    儲存貯體的位置。儲存貯體中物件的物件資料位於此區域內的實體儲存空間中。預設為 US。

  • project_id (str) – Google Cloud 專案的 ID。

  • labels (dict | None) – 使用者提供的標籤,以鍵/值組表示。

返回

如果成功,則會傳回儲存貯體的 id

返回類型

str

insert_bucket_acl(bucket_name, entity, role, user_project=None)[source]

在指定的 bucket_name 上建立新的 ACL 項目。

請參閱: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert

參數
  • bucket_name (str) – 儲存貯體名稱。

  • entity (str) – 持有權限的實體,格式如下:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers。請參閱: https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 實體的存取權限。可接受的值為:「OWNER」、「READER」、「WRITER」。

  • user_project (str | None) – (選用) 此請求要計費的專案。請求者付費儲存貯體為必要項目。

insert_object_acl(bucket_name, object_name, entity, role, generation=None, user_project=None)[source]

在指定的物件上建立新的 ACL 項目。

請參閱: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert

參數
  • bucket_name (str) – 儲存貯體名稱。

  • object_name (str) – 物件的名稱。如需瞭解如何對物件名稱進行網址編碼以確保路徑安全,請參閱: https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 持有權限的實體,格式如下:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers 請參閱: https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 實體的存取權限。可接受的值為:「OWNER」、「READER」。

  • generation (int | None) – 選用項目。如果存在,則選取此物件的特定修訂版本。

  • user_project (str | None) – (選用) 此請求要計費的專案。請求者付費儲存貯體為必要項目。

compose(bucket_name, source_objects, destination_object)[source]

將現有物件的清單組合成同一個儲存貯體名稱中的新物件。

目前僅支援在單一操作中串連最多 32 個物件

https://cloud.google.com/storage/docs/json_api/v1/objects/compose

參數
  • bucket_name (str) – 包含來源物件的儲存貯體名稱。這也是儲存組合的目的地物件的同一個儲存貯體。

  • source_objects (List[str]) – 將組合成單一物件的來源物件清單。

  • destination_object (str) – 如果已指定物件的路徑。

sync(source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, allow_overwrite=False, delete_extra_files=False)[source]

同步儲存貯體的內容。

參數 source_objectdestination_object 描述根同步目錄。如果未傳遞這些參數,則會同步整個儲存貯體。如果已傳遞這些參數,則應指向目錄。

注意

不支援同步個別檔案。只能同步整個目錄。

參數
  • source_bucket (str) – 包含來源物件的儲存貯體名稱。

  • destination_bucket (str) – 包含目的地物件的儲存貯體名稱。

  • source_object (str | None) – 來源儲存貯體中的根同步目錄。

  • destination_object (str | None) – 目的地儲存貯體中的根同步目錄。

  • recursive (bool) – 若為 True,則會考量子目錄

  • recursive – 若為 True,則會考量子目錄

  • allow_overwrite (bool) – 若為 True,則在找到不符的檔案時會覆寫檔案。預設情況下,不允許覆寫檔案

  • delete_extra_files (bool) –

    若為 True,則從來源中刪除在目的地中找不到的其他檔案。預設情況下,不會刪除額外檔案。

    注意

    如果您指定錯誤的來源/目的地組合,此選項可能會快速刪除資料。

返回

none

返回類型

None

airflow.providers.google.cloud.hooks.gcs.gcs_object_is_directory(bucket)[source]

如果給定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是目錄或空 bucket,則返回 True。

airflow.providers.google.cloud.hooks.gcs.parse_json_from_gcs(gcp_conn_id, file_uri, impersonation_chain=None)[source]

從 Google Cloud Storage 下載並解析 json 檔案。

參數
  • gcp_conn_id (str) – Airflow Google Cloud 連線 ID。

  • file_uri (str) – json 檔案的完整路徑,範例: gs://test-bucket/dir1/dir2/file

class airflow.providers.google.cloud.hooks.gcs.GCSAsyncHook(**kwargs)[source]

基底類別: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

GCSAsyncHook 在觸發 worker 上執行,繼承自 GoogleBaseAsyncHook。

sync_hook_class[source]
async get_storage_client(session)[source]

返回 Google Cloud Storage 服務物件。

此條目是否有幫助?