AWS DataSync¶
AWS DataSync 是一項資料傳輸服務,可簡化、自動化和加速在內部部署儲存系統與 AWS 儲存服務之間透過網際網路或 AWS Direct Connect 移動和複製資料。
先決條件任務¶
若要使用這些運算子,您必須執行幾件事
使用 AWS Console 或 AWS CLI 建立必要的資源。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 的安裝
設定連線.
通用參數¶
- aws_conn_id
參考 Amazon Web Services 連線 ID。如果此參數設定為
None
,則會使用預設的 boto3 行為,而不會查閱連線。否則,請使用儲存在連線中的憑證。預設值:aws_default
- region_name
AWS 區域名稱。如果此參數設定為
None
或省略,則將使用來自 AWS 連線額外參數 的 region_name。否則,請使用指定的值,而不是連線值。預設值:None
- verify
是否驗證 SSL 憑證。
False
- 不驗證 SSL 憑證。path/to/cert/bundle.pem - 要使用的 CA 憑證套件的檔案名稱。如果您想要使用與 botocore 使用的 CA 憑證套件不同的套件,則可以指定此引數。
如果此參數設定為
None
或省略,則將使用來自 AWS 連線額外參數 的 verify。否則,請使用指定的值,而不是連線值。預設值:None
- botocore_config
提供的字典用於建構 botocore.config.Config。此組態可用於設定 避免節流例外、逾時等。
範例,有關參數的更多詳細資訊,請參閱 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此參數設定為
None
或省略,則將使用來自 AWS 連線額外參數 的 config_kwargs。否則,請使用指定的值,而不是連線值。預設值:None
注意
指定空字典
{}
將覆寫 botocore.config.Config 的連線組態
運算子¶
與 AWS DataSync 任務互動¶
您可以使用 DataSyncOperator
來尋找、建立、更新、執行和刪除 AWS DataSync 任務。
一旦 DataSyncOperator
識別要執行的正確 TaskArn(因為您已指定它,或因為已找到它),它將會被執行。每當執行 AWS DataSync 任務時,它都會建立一個 AWS DataSync TaskExecution,由 TaskExecutionArn 識別。
TaskExecutionArn 將被監控直到完成(成功/失敗),並且其狀態將定期寫入 Airflow 任務日誌。
DataSyncOperator
支援將額外的 kwargs 選擇性地傳遞到基礎的 boto3.start_task_execution()
API。這是透過 task_execution_kwargs
參數完成的。這對於例如限制頻寬或篩選包含的檔案很有用,請參閱 boto3 Datasync 文件 以取得更多詳細資訊。
執行任務¶
若要執行特定任務,您可以將 task_arn
傳遞給運算子。
tests/system/amazon/aws/example_datasync.py
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
搜尋並執行任務¶
若要搜尋任務,您可以將 source_location_uri
和 destination_location_uri
指定給運算子。如果找到一個任務,則將執行此任務。如果找到多個任務,運算子將引發例外。為了避免這種情況,您可以將 allow_random_task_choice
設定為 True
以從候選任務中隨機選擇。
tests/system/amazon/aws/example_datasync.py
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
建立並執行任務¶
在搜尋任務時,如果未找到任務,您可以選擇在執行任務之前建立一個任務。為了做到這一點,您需要提供額外的參數 create_task_kwargs
、create_source_location_kwargs
和 create_destination_location_kwargs
。
這些額外參數為運算子提供了一種自動建立任務和/或位置的方式,如果沒有找到合適的現有任務。如果這些參數保留為預設值 (None),則不會嘗試建立。
此外,由於 delete_task_after_execution
設定為 True
,因此任務將在成功完成後從 AWS DataSync 中刪除。
tests/system/amazon/aws/example_datasync.py
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
在建立任務時,DataSyncOperator
將嘗試尋找並使用現有的 LocationArn,而不是建立新的 LocationArn。如果多個 LocationArn 符合指定的 URI,那麼我們需要選擇一個使用。在這種情況下,運算子的行為方式與它從多個任務中選擇單個任務的方式類似
運算子將引發例外。為了避免這種情況,您可以將 allow_random_location_choice
設定為 True
以從候選位置中隨機選擇。