airflow.providers.google.cloud.transfers.s3_to_gcs

模組內容

類別

S3ToGCSOperator

將 S3 金鑰(可能是前綴)與 Google Cloud Storage 目標路徑同步。

class airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator(*, bucket, prefix='', apply_gcs_prefix=False, delimiter='', aws_conn_id='aws_default', verify=None, gcp_conn_id='google_cloud_default', dest_gcs=None, replace=False, gzip=False, google_impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=10, **kwargs)[原始碼]

Bases: airflow.providers.amazon.aws.operators.s3.S3ListOperator

將 S3 金鑰(可能是前綴)與 Google Cloud Storage 目標路徑同步。

參見

有關如何使用此運算子的更多資訊,請參閱指南:將資料從 Amazon S3 傳輸到 Google Cloud Storage

參數
  • bucket – 要尋找物件的 S3 儲存桶。(已套用範本)

  • prefix – 前綴字串,用於篩選名稱以此前綴開頭的物件。(已套用範本)

  • apply_gcs_prefix

    (可選) 是否將來源物件的路徑替換為給定的 GCS 目標路徑。如果 apply_gcs_prefix 為 False(預設值),則來自 S3 的物件將複製到 GCS 儲存桶中的給定 GSC 路徑,且來源路徑將放置在其中。例如,<s3_bucket><s3_prefix><content> => <gcs_prefix><s3_prefix><content>

    如果 apply_gcs_prefix 為 True,則來自 S3 的物件將複製到 GCS 儲存桶中的給定 GCS 路徑,且來源路徑將被省略。例如:<s3_bucket><s3_prefix><content> => <gcs_prefix><content>

  • delimiter – 劃分金鑰階層結構的分隔符號。(已套用範本)

  • aws_conn_id – 來源 S3 連線

  • verify

    是否驗證 S3 連線的 SSL 憑證。預設情況下,會驗證 SSL 憑證。您可以提供以下值

    • False:不驗證 SSL 憑證。仍將使用 SSL

      (除非 use_ssl 為 False),但不會驗證 SSL 憑證。

    • path/to/cert/bundle.pem:要使用的 CA 憑證套件的檔案名稱。

      如果您想使用與 botocore 使用的 CA 憑證套件不同的套件,則可以指定此引數。

  • gcp_conn_id – (可選) 用於連線至 Google Cloud 的連線 ID。

  • dest_gcs – 您要儲存檔案的目的地 Google Cloud Storage 儲存桶和前綴。(已套用範本)

  • replace – 是否要替換現有的目標檔案。

  • gzip – 壓縮檔案以上傳的選項。在可延遲模式下會忽略此參數。

  • google_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的 Google 服務帳戶,用於使用短期憑證模擬身分,或是取得清單中最後一個帳戶的 access_token 所需的鏈結帳戶清單,該帳戶將在請求中被模擬身分。如果設定為字串,則該帳戶必須授予原始帳戶「服務帳戶權杖建立者」IAM 角色。如果設定為序列,則清單中的身分必須將「服務帳戶權杖建立者」IAM 角色授予直接前一個身分,清單中的第一個帳戶將此角色授予原始帳戶(已套用範本)。

  • deferrable – 在可延遲模式下執行運算子

  • poll_interval (int) – 輪詢任務完成之間的時間間隔,以秒為單位。僅在可延遲模式下執行時才會考慮此值。必須大於 0。

範例:

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="data/customers-201804",
    gcp_conn_id="google_cloud_default",
    dest_gcs="gs://my.gcs.bucket/some/customers/",
    replace=False,
    gzip=True,
    dag=my_dag,
)

請注意,bucketprefixdelimiterdest_gcs 已套用範本,因此您可以在其中使用變數(如果需要)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'dest_gcs', 'google_impersonation_chain')[原始碼]
ui_color = '#e09411'[原始碼]
transfer_job_max_files_number = 1000[原始碼]
execute(context)[原始碼]

在建立運算子時衍生。

Context 是與呈現 jinja 範本時使用的字典相同的字典。

請參閱 get_template_context 以取得更多上下文。

exclude_existing_objects(s3_objects, gcs_hook)[原始碼]

從列表中排除已存在於 GCS 儲存桶中的物件。

s3_to_gcs_object(s3_object)[原始碼]

根據運算子的邏輯將 S3 路徑轉換為 GCS 路徑。

如果 apply_gcs_prefix == True 則 <s3_prefix><content> => <gcs_prefix><content> 如果 apply_gcs_prefix == False 則 <s3_prefix><content> => <gcs_prefix><s3_prefix><content>

gcs_to_s3_object(gcs_object)[原始碼]

根據運算子的邏輯將 GCS 路徑轉換為 S3 路徑。

如果 apply_gcs_prefix == True 則 <gcs_prefix><content> => <s3_prefix><content> 如果 apply_gcs_prefix == False 則 <gcs_prefix><s3_prefix><content> => <s3_prefix><content>

transfer_files(s3_objects, gcs_hook, s3_hook)[原始碼]
transfer_files_async(files, gcs_hook, s3_hook)[原始碼]

提交 Google Cloud Storage Transfer Service 任務,以將檔案從 AWS S3 複製到 GCS。

submit_transfer_jobs(files, gcs_hook, s3_hook)[原始碼]
execute_complete(context, event)[原始碼]

立即返回並依賴觸發器擲回成功事件。觸發器的回呼。

依賴觸發器擲回例外,否則假定執行成功。

get_transfer_hook()[原始碼]
get_openlineage_facets_on_start()[原始碼]

此條目是否有幫助?