Amazon S3

Amazon Simple Storage Service (Amazon S3) 是網際網路的儲存空間。您可以使用 Amazon S3 隨時隨地儲存和檢索任何數量的資料。

先決條件任務

若要使用這些運算子,您必須執行以下幾項操作

運算子

建立 Amazon S3 儲存貯體

若要建立 Amazon S3 儲存貯體,您可以使用 S3CreateBucketOperator

tests/system/amazon/aws/example_s3.py

create_bucket = S3CreateBucketOperator(
    task_id="create_bucket",
    bucket_name=bucket_name,
)

刪除 Amazon S3 儲存貯體

若要刪除 Amazon S3 儲存貯體,您可以使用 S3DeleteBucketOperator

tests/system/amazon/aws/example_s3.py

delete_bucket = S3DeleteBucketOperator(
    task_id="delete_bucket",
    bucket_name=bucket_name,
    force_delete=True,
)

設定 Amazon S3 儲存貯體的標籤

若要設定 Amazon S3 儲存貯體的標籤,您可以使用 S3PutBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

put_tagging = S3PutBucketTaggingOperator(
    task_id="put_tagging",
    bucket_name=bucket_name,
    key=TAG_KEY,
    value=TAG_VALUE,
)

取得 Amazon S3 儲存貯體的標籤

若要取得與 Amazon S3 儲存貯體關聯的標籤集,您可以使用 S3GetBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

get_tagging = S3GetBucketTaggingOperator(
    task_id="get_tagging",
    bucket_name=bucket_name,
)

刪除 Amazon S3 儲存貯體的標籤

若要刪除 Amazon S3 儲存貯體的標籤,您可以使用 S3DeleteBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id="delete_tagging",
    bucket_name=bucket_name,
)

建立 Amazon S3 物件

若要建立新的(或取代)Amazon S3 物件,您可以使用 S3CreateObjectOperator

tests/system/amazon/aws/example_s3.py

create_object = S3CreateObjectOperator(
    task_id="create_object",
    s3_bucket=bucket_name,
    s3_key=key,
    data=DATA,
    replace=True,
)

複製 Amazon S3 物件

若要將 Amazon S3 物件從一個儲存貯體複製到另一個儲存貯體,您可以使用 S3CopyObjectOperator。此處使用的 Amazon S3 連線需要有權存取來源和目的地儲存貯體/金鑰。

tests/system/amazon/aws/example_s3.py

copy_object = S3CopyObjectOperator(
    task_id="copy_object",
    source_bucket_name=bucket_name,
    dest_bucket_name=bucket_name_2,
    source_bucket_key=key,
    dest_bucket_key=key_2,
)

刪除 Amazon S3 物件

若要刪除一個或多個 Amazon S3 物件,您可以使用 S3DeleteObjectsOperator

tests/system/amazon/aws/example_s3.py

delete_objects = S3DeleteObjectsOperator(
    task_id="delete_objects",
    bucket=bucket_name_2,
    keys=key_2,
)

轉換 Amazon S3 物件

若要轉換來自一個 Amazon S3 物件的資料並將其儲存到另一個物件,您可以使用 S3FileTransformOperator。您也可以套用選用的 Amazon S3 Select 表達式,以使用 select_expression 選取您要從 source_s3_key 檢索的資料。

tests/system/amazon/aws/example_s3.py

file_transform = S3FileTransformOperator(
    task_id="file_transform",
    source_s3_key=f"s3://{bucket_name}/{key}",
    dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
    # Use `cp` command as transform script as an example
    transform_script="cp",
    replace=True,
)

列出 Amazon S3 字首

若要列出 Amazon S3 儲存貯體內的所有 Amazon S3 字首,您可以使用 S3ListPrefixesOperator。請參閱此處以取得關於 Amazon S3 字首的更多資訊。

tests/system/amazon/aws/example_s3.py

list_prefixes = S3ListPrefixesOperator(
    task_id="list_prefixes",
    bucket=bucket_name,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

列出 Amazon S3 物件

若要列出 Amazon S3 儲存貯體內的所有 Amazon S3 物件,您可以使用 S3ListOperator。您可以指定 prefix 來篩選名稱以此字首開頭的物件。

tests/system/amazon/aws/example_s3.py

list_keys = S3ListOperator(
    task_id="list_keys",
    bucket=bucket_name,
    prefix=PREFIX,
)

感測器

等待 Amazon S3 金鑰

若要等待一個或多個金鑰出現在 Amazon S3 儲存貯體中,您可以使用 S3KeySensor。對於每個金鑰,它會呼叫 head_object API(或 list_objects_v2 API,如果 wildcard_matchTrue)以檢查它是否存在。請記住,尤其是在用於檢查大量金鑰時,它會為每個金鑰進行一次 API 呼叫。

檢查一個檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="sensor_one_key",
    bucket_name=bucket_name,
    bucket_key=key,
)

檢查多個檔案

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="sensor_two_keys",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
)

檢查帶有正規表示式的檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
    task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)

若要使用額外的自訂檢查,您可以定義一個函數,該函數接收符合的 S3 物件屬性列表並傳回布林值

  • True:符合特定條件

  • False:不符合條件

對於在 bucket_key 中作為參數傳遞的每個金鑰,都會呼叫此函數。此函數的參數是一個物件列表的原因是,當 wildcard_matchTrue 時,多個檔案可以符合一個金鑰。符合的 S3 物件屬性列表僅包含大小,格式如下

[{"Size": int}]

tests/system/amazon/aws/example_s3.py

def check_fn(files: list, **kwargs) -> bool:
    """
    Example of custom check: check if all files are bigger than ``20 bytes``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    """
    return all(f.get("Size", 0) > 20 for f in files)

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="sensor_key_with_function",
    bucket_name=bucket_name,
    bucket_key=key,
    check_fn=check_fn,
)

您也可以透過將參數 deferrable 設定為 True,在可延遲模式下執行此運算子。這將有效利用 Airflow 工作人員,因為輪詢工作狀態是非同步地在觸發器上進行。請注意,這將需要您的 Airflow 部署上提供觸發器。

檢查一個檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
    task_id="sensor_one_key_deferrable",
    bucket_name=bucket_name,
    bucket_key=key,
    deferrable=True,
)

檢查多個檔案

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
    task_id="sensor_two_keys_deferrable",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
    deferrable=True,
)

檢查帶有正規表示式的檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
    task_id="sensor_key_with_regex_deferrable",
    bucket_name=bucket_name,
    bucket_key=key_regex_pattern,
    use_regex=True,
    deferrable=True,
)

等待 Amazon S3 字首變更

若要檢查 Amazon S3 儲存貯體中特定字首的物件數量變更,並等待直到非活動期間過去且物件數量沒有增加,您可以使用 S3KeysUnchangedSensor。請注意,此感測器在重新排程模式下將無法正確運作,因為 Amazon S3 儲存貯體中列出的物件狀態將在重新排程的調用之間遺失。

tests/system/amazon/aws/example_s3.py

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="sensor_keys_unchanged",
    bucket_name=bucket_name_2,
    prefix=PREFIX,
    inactivity_period=10,  # inactivity_period in seconds
)

您也可以透過將參數 deferrable 設定為 True,在可延遲模式下執行此感測器。這將有效利用 Airflow 工作人員,因為輪詢工作狀態是非同步地在觸發器上進行。請注意,這將需要您的 Airflow 部署上提供觸發器。

此條目是否有幫助?