Google Cloud Transfer Service 運算子¶
先決條件任務¶
要使用這些運算子,您必須完成幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件 中所述。
啟用 API,如 Cloud Console 文件 中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
CloudDataTransferServiceCreateJobOperator¶
建立傳輸工作。
此函數接受兩種格式的日期
與 Google API 一致
{ "year": 2019, "month": 2, "day": 11 }
作為
datetime
物件
此函數接受兩種格式的時間
與 Google API 一致
{ "hours": 12, "minutes": 30, "seconds": 0 }
作為
time
物件
如果您想要建立從 AWS S3 複製資料的工作傳輸,則必須設定連線。有關 AWS 設定的資訊請參閱:Amazon Web Services 連線 AWS 的選定連線可以透過參數 aws_conn_id
指示。
有關參數定義,請參閱 CloudDataTransferServiceCreateJobOperator
。
使用運算子¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
範本¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceDeleteJobOperator¶
刪除傳輸工作。
有關參數定義,請參閱 CloudDataTransferServiceDeleteJobOperator
。
使用運算子¶
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
範本¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceRunJobOperator¶
執行傳輸工作。
有關參數定義,請參閱 CloudDataTransferServiceRunJobOperator
。
使用運算子¶
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
範本¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceUpdateJobOperator¶
更新傳輸工作。
有關參數定義,請參閱 CloudDataTransferServiceUpdateJobOperator
。
使用運算子¶
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
範本¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceCancelOperationOperator¶
取得傳輸操作。結果會傳回至 XCOM。
有關參數定義,請參閱 CloudDataTransferServiceCancelOperationOperator
。
使用運算子¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
範本¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceGetOperationOperator¶
取得傳輸操作。結果會傳回至 XCOM。
有關參數定義,請參閱 CloudDataTransferServiceGetOperationOperator
。
使用運算子¶
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
範本¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServiceListOperationsOperator¶
列出傳輸操作。結果會傳回至 XCOM。
有關參數定義,請參閱 CloudDataTransferServiceListOperationsOperator
。
使用運算子¶
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
範本¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
CloudDataTransferServicePauseOperationOperator¶
暫停傳輸操作。
有關參數定義,請參閱 CloudDataTransferServicePauseOperationOperator
。
使用運算子¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
範本¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceResumeOperationOperator¶
恢復傳輸操作。
有關參數定義,請參閱 CloudDataTransferServiceResumeOperationOperator
。
使用運算子¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
範本¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
CloudDataTransferServiceJobStatusSensor¶
等待屬於該工作的至少一個操作具有預期狀態。
有關參數定義,請參閱 CloudDataTransferServiceJobStatusSensor
。
使用運算子¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
範本¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
將資料從一個 GCS 儲存桶複製到另一個。
有關參數定義,請參閱 CloudDataTransferServiceGCSToGCSOperator
。
使用運算子¶
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
範本¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)