AWS DataSync

AWS DataSync 是一項資料傳輸服務,可簡化、自動化和加速在內部部署儲存系統與 AWS 儲存服務之間透過網際網路或 AWS Direct Connect 移動和複製資料。

先決條件任務

若要使用這些運算子,您必須執行幾件事

通用參數

aws_conn_id

參考 Amazon Web Services 連線 ID。如果此參數設定為 None,則會使用預設的 boto3 行為,而不會查閱連線。否則,請使用儲存在連線中的憑證。預設值: aws_default

region_name

AWS 區域名稱。如果此參數設定為 None 或省略,則將使用來自 AWS 連線額外參數region_name。否則,請使用指定的值,而不是連線值。預設值: None

verify

是否驗證 SSL 憑證。

  • False - 不驗證 SSL 憑證。

  • path/to/cert/bundle.pem - 要使用的 CA 憑證套件的檔案名稱。如果您想要使用與 botocore 使用的 CA 憑證套件不同的套件,則可以指定此引數。

如果此參數設定為 None 或省略,則將使用來自 AWS 連線額外參數verify。否則,請使用指定的值,而不是連線值。預設值: None

botocore_config

提供的字典用於建構 botocore.config.Config。此組態可用於設定 避免節流例外、逾時等。

範例,有關參數的更多詳細資訊,請參閱 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此參數設定為 None 或省略,則將使用來自 AWS 連線額外參數config_kwargs。否則,請使用指定的值,而不是連線值。預設值: None

注意

指定空字典 {} 將覆寫 botocore.config.Config 的連線組態

運算子

與 AWS DataSync 任務互動

您可以使用 DataSyncOperator 來尋找、建立、更新、執行和刪除 AWS DataSync 任務。

一旦 DataSyncOperator 識別要執行的正確 TaskArn(因為您已指定它,或因為已找到它),它將會被執行。每當執行 AWS DataSync 任務時,它都會建立一個 AWS DataSync TaskExecution,由 TaskExecutionArn 識別。

TaskExecutionArn 將被監控直到完成(成功/失敗),並且其狀態將定期寫入 Airflow 任務日誌。

DataSyncOperator 支援將額外的 kwargs 選擇性地傳遞到基礎的 boto3.start_task_execution() API。這是透過 task_execution_kwargs 參數完成的。這對於例如限制頻寬或篩選包含的檔案很有用,請參閱 boto3 Datasync 文件 以取得更多詳細資訊。

執行任務

若要執行特定任務,您可以將 task_arn 傳遞給運算子。

tests/system/amazon/aws/example_datasync.py

# Execute a specific task
execute_task_by_arn = DataSyncOperator(
    task_id="execute_task_by_arn",
    task_arn=created_task_arn,
)

搜尋並執行任務

若要搜尋任務,您可以將 source_location_uridestination_location_uri 指定給運算子。如果找到一個任務,則將執行此任務。如果找到多個任務,運算子將引發例外。為了避免這種情況,您可以將 allow_random_task_choice 設定為 True 以從候選任務中隨機選擇。

tests/system/amazon/aws/example_datasync.py

# Search and execute a task
execute_task_by_locations = DataSyncOperator(
    task_id="execute_task_by_locations",
    source_location_uri=f"s3://{s3_bucket_source}/test",
    destination_location_uri=f"s3://{s3_bucket_destination}/test",
    # Only transfer files from /test/subdir folder
    task_execution_kwargs={
        "Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
    },
)

建立並執行任務

在搜尋任務時,如果未找到任務,您可以選擇在執行任務之前建立一個任務。為了做到這一點,您需要提供額外的參數 create_task_kwargscreate_source_location_kwargscreate_destination_location_kwargs

這些額外參數為運算子提供了一種自動建立任務和/或位置的方式,如果沒有找到合適的現有任務。如果這些參數保留為預設值 (None),則不會嘗試建立。

此外,由於 delete_task_after_execution 設定為 True,因此任務將在成功完成後從 AWS DataSync 中刪除。

tests/system/amazon/aws/example_datasync.py

# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
    task_id="create_and_execute_task",
    source_location_uri=f"s3://{s3_bucket_source}/test_create",
    destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
    create_task_kwargs={"Name": "Created by Airflow"},
    create_source_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    create_destination_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    delete_task_after_execution=False,
)

在建立任務時,DataSyncOperator 將嘗試尋找並使用現有的 LocationArn,而不是建立新的 LocationArn。如果多個 LocationArn 符合指定的 URI,那麼我們需要選擇一個使用。在這種情況下,運算子的行為方式與它從多個任務中選擇單個任務的方式類似

運算子將引發例外。為了避免這種情況,您可以將 allow_random_location_choice 設定為 True 以從候選位置中隨機選擇。

此條目是否有幫助?