airflow.providers.google.cloud.operators.gcs

此模組包含 Google Cloud Storage 儲存桶運算子。

模組內容

類別

GCSCreateBucketOperator

建立新的儲存桶。

GCSListObjectsOperator

列出儲存桶中所有物件,並依據指定的字串前綴和分隔符號在名稱或 match_glob 中篩選。

GCSDeleteObjectsOperator

從 Google Cloud Storage 儲存桶中刪除物件列表中的物件,或刪除符合前綴的所有物件。

GCSBucketCreateAclEntryOperator

在指定的儲存桶上建立新的 ACL 項目。

GCSObjectCreateAclEntryOperator

在指定的物件上建立新的 ACL 項目。

GCSFileTransformOperator

將資料從來源 GCS 位置複製到本機檔案系統上的暫存位置。

GCSTimeSpanFileTransformOperator

複製在時間範圍內修改的物件,執行轉換,並將結果上傳到儲存桶。

GCSDeleteBucketOperator

從 Google Cloud Storage 刪除儲存桶。

GCSSynchronizeBucketsOperator

同步 Google Cloud Services 中儲存桶或儲存桶目錄的內容。

class airflow.providers.google.cloud.operators.gcs.GCSCreateBucketOperator(*, bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

建立新的儲存桶。

Google Cloud Storage 使用扁平命名空間,因此您無法建立與已在使用中的名稱相同的儲存桶。

另請參閱

如需更多資訊,請參閱儲存桶命名指南: https://cloud.google.com/storage/docs/bucketnaming.html#requirements

參數
  • bucket_name (str) – 儲存桶的名稱。(已套用範本)

  • resource (dict | None) – 包含建立儲存桶參數的可選 dict。如需可用參數的資訊,請參閱 Cloud Storage API 文件: https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    這定義了儲存桶中物件的儲存方式,並決定了 SLA 和儲存成本 (已套用範本)。值包括

    • MULTI_REGIONAL

    • REGIONAL

    • STANDARD

    • NEARLINE

    • COLDLINE.

    如果在建立儲存桶時未指定此值,則預設為 STANDARD。

  • location (str) –

    儲存桶的位置。(已套用範本) 儲存桶中物件的物件資料位於此區域內的實體儲存空間中。預設為 US。

  • project_id (str) – Google Cloud 專案的 ID。(已套用範本)

  • labels (dict | None) – 使用者提供的標籤,以鍵/值對形式呈現。

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

以下運算子將在 EU 區域中建立一個具有 MULTI_REGIONAL 儲存類別的新儲存桶 test-bucket

CreateBucket = GCSCreateBucketOperator(
    task_id="CreateNewBucket",
    bucket_name="test-bucket",
    storage_class="MULTI_REGIONAL",
    location="EU",
    labels={"env": "dev", "team": "airflow"},
    gcp_conn_id="airflow-conn-id",
)
template_fields: collections.abc.Sequence[str] = ('bucket_name', 'storage_class', 'location', 'project_id', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.gcs.GCSListObjectsOperator(*, bucket, prefix=None, delimiter=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, match_glob=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

列出儲存桶中所有物件,並依據指定的字串前綴和分隔符號在名稱或 match_glob 中篩選。

此運算子會傳回一個 Python 列表,其中包含物件名稱,下游任務可以在 XCom 中使用這些物件名稱。

參數
  • bucket (str) – 要在其中尋找物件的 Google Cloud Storage 儲存桶。(已套用範本)

  • prefix (str | list[str] | None) – 字串或字串列表,用於篩選名稱以其開頭的物件。(已套用範本)

  • delimiter (str | None) – (已棄用) 您要用來篩選物件的分隔符號。(已套用範本) 例如,若要列出 GCS 中目錄中的 CSV 檔案,您會使用 delimiter=’.csv’。

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • match_glob (str | None) – (可選) 根據字串給定的 glob 模式篩選物件 (例如, '**/*/.json')

範例:

以下運算子會列出 data 儲存桶中 sales/sales-2017 資料夾中的所有 Avro 檔案。

GCS_Files = GCSListOperator(
    task_id="GCS_Files",
    bucket="data",
    prefix="sales/sales-2017/",
    match_glob="**/*/.avro",
    gcp_conn_id=google_cloud_conn_id,
)
template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator(*, bucket_name, objects=None, prefix=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

從 Google Cloud Storage 儲存桶中刪除物件列表中的物件,或刪除符合前綴的所有物件。

參數
  • bucket_name (str) – 要從中刪除的 GCS 儲存桶

  • objects (list[str] | None) – 要刪除的物件列表。這些應為儲存桶中物件的名稱,不包含 gs://bucket/

  • prefix (str | list[str] | None) – 字串或字串列表,用於篩選名稱以其開頭的物件。(已套用範本)

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'prefix', 'objects', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

get_openlineage_facets_on_start()[source]
class airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator(*, bucket, entity, role, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的儲存桶上建立新的 ACL 項目。

另請參閱

如需如何使用此運算子的更多資訊,請參閱指南: GCSBucketCreateAclEntryOperator

參數
  • bucket (str) – 儲存桶的名稱。

  • entity (str) – 持有權限的實體,以下列形式之一:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers

  • role (str) – 實體的存取權限。可接受的值為:“OWNER”、“READER”、“WRITER”。

  • user_project (str | None) – (可選) 此請求要計費的專案。對於請求者付費儲存桶為必填。

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'entity', 'role', 'user_project', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.gcs.GCSObjectCreateAclEntryOperator(*, bucket, object_name, entity, role, generation=None, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的物件上建立新的 ACL 項目。

另請參閱

如需如何使用此運算子的更多資訊,請參閱指南: GCSObjectCreateAclEntryOperator

參數
  • bucket (str) – 儲存桶的名稱。

  • object_name (str) – 物件的名稱。如需關於如何對物件名稱進行 URL 編碼以使其路徑安全的資訊,請參閱: https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 持有權限的實體,以下列形式之一:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers

  • role (str) – 實體的存取權限。可接受的值為:“OWNER”、“READER”。

  • generation (int | None) – 可選。如果存在,則選取此物件的特定修訂版本。

  • user_project (str | None) – (可選) 此請求要計費的專案。對於請求者付費儲存桶為必填。

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'object_name', 'entity', 'generation', 'role', 'user_project', 'impersonation_chain')[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.gcs.GCSFileTransformOperator(*, source_bucket, source_object, transform_script, destination_bucket=None, destination_object=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

將資料從來源 GCS 位置複製到本機檔案系統上的暫存位置。

對此檔案執行轉換 (由轉換腳本指定),並將輸出上傳到目的地儲存桶。如果未指定輸出儲存桶,則會覆寫原始檔案。

來源檔案和目的地檔案在本機檔案系統中的位置作為第一個和第二個引數提供給轉換腳本。轉換腳本預期會從來源讀取資料、轉換資料,並將輸出寫入本機目的地檔案。

參數
  • source_bucket (str) – 定位 source_object 的儲存桶。(已套用範本)

  • source_object (str) – 要從 GCS 擷取的金鑰。(已套用範本)

  • destination_bucket (str | None) – 用於在轉換後上傳金鑰的儲存桶。如果未提供,將使用 source_bucket。(已套用範本)

  • destination_object (str | None) – 要寫入 GCS 的金鑰。如果未提供,將使用 source_object。(已套用範本)

  • transform_script (str | list[str]) – 可執行轉換腳本的位置,或傳遞至子流程的引數列表,例如 [‘python’, ‘script.py’, 10]。(已套用範本)

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',...[source]
execute(context)[source]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

get_openlineage_facets_on_start()[source]
class airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator(*, source_bucket, source_prefix, source_gcp_conn_id, destination_bucket, destination_prefix, destination_gcp_conn_id, transform_script, source_impersonation_chain=None, destination_impersonation_chain=None, chunk_size=None, download_continue_on_fail=False, download_num_attempts=1, upload_continue_on_fail=False, upload_num_attempts=1, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

複製在時間範圍內修改的物件,執行轉換,並將結果上傳到儲存桶。

決定在特定時間範圍內,於 GCS 來源位置新增或修改的物件清單,將這些物件複製到本地檔案系統的暫存位置,依照轉換腳本的指定對檔案執行轉換,並將輸出上傳到目的地儲存貯體。

另請參閱

如需更多關於如何使用此運算子的資訊,請參閱指南:GCSTimeSpanFileTransformOperator

來源檔案和目的地檔案在本地檔案系統中的位置,會作為轉換腳本的第一個和第二個參數提供。時間範圍則以 UTC ISO 8601 字串格式,作為第三個和第四個參數傳遞給轉換腳本。

轉換腳本應從來源讀取資料、進行轉換,並將輸出寫入本地目的地檔案。

參數
  • source_bucket (str) – 要從中提取資料的儲存貯體。(已套用樣板)

  • source_prefix (str) – 前綴字串,用於篩選名稱以此前綴開頭的物件。可以內插邏輯日期和時間組件。(已套用樣板)

  • source_gcp_conn_id (str) – 用於連線至 Google Cloud,以下載要處理檔案的連線 ID。

  • source_impersonation_chain (str | collections.abc.Sequence[str] | None) – 選擇性的服務帳戶,用於使用短期憑證模擬身分(以下載要處理的檔案),或是取得清單中最後一個帳戶的 access_token 所需的串聯帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊接在前的身分,清單中的第一個帳戶則將此角色授予原始帳戶(已套用樣板)。

  • destination_bucket (str) – 要將資料寫入的儲存貯體。(已套用樣板)

  • destination_prefix (str) – 上傳位置的前綴字串。可以內插邏輯日期和時間組件。(已套用樣板)

  • destination_gcp_conn_id (str) – 用於連線至 Google Cloud,以上傳已處理檔案的連線 ID。

  • destination_impersonation_chain (str | collections.abc.Sequence[str] | None) – 選擇性的服務帳戶,用於使用短期憑證模擬身分(以上傳已處理的檔案),或是取得清單中最後一個帳戶的 access_token 所需的串聯帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊接在前的身分,清單中的第一個帳戶則將此角色授予原始帳戶(已套用樣板)。

  • transform_script (str | list[str]) – 可執行轉換腳本的位置,或傳遞至子流程的引數列表,例如 [‘python’, ‘script.py’, 10]。(已套用範本)

  • chunk_size (int | None) – 下載或上傳資料時的區塊大小(以位元組為單位)。這必須是 256 KB 的倍數(根據 Google Cloud Storage API 規範)。

  • download_continue_on_fail (bool | None) – 設定為 true 時,如果下載失敗,任務不會產生錯誤,但仍會繼續執行。

  • upload_chunk_size – 上傳資料時的區塊大小(以位元組為單位)。這必須是 256 KB 的倍數(根據 Google Cloud Storage API 規範)。

  • upload_continue_on_fail (bool | None) – 設定為 true 時,如果上傳失敗,任務不會產生錯誤,但仍會繼續執行。

  • upload_num_attempts (int) – 嘗試上傳單一檔案的次數。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_prefix', 'destination_bucket', 'destination_prefix',...[原始碼]
static interpolate_prefix(prefix, dt)[原始碼]

使用日期時間內插前綴。

參數
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

get_openlineage_facets_on_complete(task_instance)[原始碼]

實作 on_complete,因為 execute() 會解析物件前綴。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator(*, bucket_name, force=True, gcp_conn_id='google_cloud_default', impersonation_chain=None, user_project=None, **kwargs)[原始碼]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

從 Google Cloud Storage 刪除儲存桶。

另請參閱

如需更多關於如何使用此運算子的資訊,請參閱指南:刪除儲存貯體

參數
  • bucket_name (str) – 將被刪除的儲存貯體名稱

  • force (bool) – false 不允許刪除非空的儲存貯體,設定 force=True 允許刪除非空的儲存貯體

  • gcp_conn_id (str) – 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

  • user_project (str | None) – (選用) 要為此請求計費的專案識別碼。請求者付費儲存貯體為必要參數。

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'gcp_conn_id', 'impersonation_chain', 'user_project')[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

同步 Google Cloud Services 中儲存貯體或儲存貯體目錄的內容。

參數 source_objectdestination_object 描述根同步目錄。如果未傳遞,則將同步整個儲存貯體。它們應指向目錄。

注意

不支援同步個別檔案。僅能同步整個目錄。

另請參閱

如需更多關於如何使用此運算子的資訊,請參閱指南:GCSSynchronizeBucketsOperator

參數
  • source_bucket (str) – 包含來源物件的儲存貯體名稱。

  • destination_bucket (str) – 包含目的地物件的儲存貯體名稱。

  • source_object (str | None) – 來源儲存貯體中的根同步目錄。

  • destination_object (str | None) – 目的地儲存貯體中的根同步目錄。

  • recursive (bool) – 若為 True,則會考慮子目錄

  • allow_overwrite (bool) – 若為 True,則在找到不符的檔案時,將覆寫檔案。預設情況下,不允許覆寫檔案

  • delete_extra_files (bool) –

    若為 True,則從來源中刪除在目的地中找不到的額外檔案。預設情況下,不會刪除額外檔案。

    注意

    如果您指定錯誤的來源/目的地組合,此選項可能會快速刪除資料。

  • gcp_conn_id (str) – (可選) 用於連線至 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳戶,用於使用短期憑證模擬身分,或用於取得列表中最後一個帳戶的 access_token 的鏈結帳戶列表,該帳戶將在請求中被模擬身分。 如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。 如果設定為序列,則列表中的身分必須將「服務帳戶權杖建立者」IAM 角色授予緊鄰的前一個身分,列表中的第一個帳戶將此角色授予原始帳戶 (已套用範本)。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'destination_bucket', 'source_object', 'destination_object', 'recursive',...[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文資訊。

此條目是否有幫助?