Google Cloud Transfer Service 運算子

先決條件任務

要使用這些運算子,您必須完成幾件事

CloudDataTransferServiceCreateJobOperator

建立傳輸工作。

此函數接受兩種格式的日期

此函數接受兩種格式的時間

  • Google API 一致

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
  • 作為 time 物件

如果您想要建立從 AWS S3 複製資料的工作傳輸,則必須設定連線。有關 AWS 設定的資訊請參閱:Amazon Web Services 連線 AWS 的選定連線可以透過參數 aws_conn_id 指示。

有關參數定義,請參閱 CloudDataTransferServiceCreateJobOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[原始碼]

gcs_to_gcs_transfer_body = {
    DESCRIPTION: "description",
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: PROJECT_ID_TRANSFER,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    JOB_NAME: GCP_TRANSFER_JOB_NAME,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
    task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)

範本

template_fields: Sequence[str] = (
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)

CloudDataTransferServiceDeleteJobOperator

刪除傳輸工作。

有關參數定義,請參閱 CloudDataTransferServiceDeleteJobOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
    task_id="delete_transfer_job_s3_to_gcs",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

範本

template_fields: Sequence[str] = (
    "job_name",
    "project_id",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

CloudDataTransferServiceRunJobOperator

執行傳輸工作。

有關參數定義,請參閱 CloudDataTransferServiceRunJobOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[原始碼]

run_transfer = CloudDataTransferServiceRunJobOperator(
    task_id="run_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    project_id=PROJECT_ID_TRANSFER,
)

範本

template_fields: Sequence[str] = (
    "job_name",
    "project_id",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

CloudDataTransferServiceUpdateJobOperator

更新傳輸工作。

有關參數定義,請參閱 CloudDataTransferServiceUpdateJobOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[原始碼]

update_body = {
    PROJECT_ID: PROJECT_ID_TRANSFER,
    TRANSFER_JOB: {DESCRIPTION: "description_updated"},
    TRANSFER_JOB_FIELD_MASK: "description",
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[原始碼]

update_transfer = CloudDataTransferServiceUpdateJobOperator(
    task_id="update_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    body=update_body,
)

範本

template_fields: Sequence[str] = (
    "job_name",
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)

CloudDataTransferServiceCancelOperationOperator

取得傳輸操作。結果會傳回至 XCOM。

有關參數定義,請參閱 CloudDataTransferServiceCancelOperationOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

cancel_operation = CloudDataTransferServiceCancelOperationOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)

範本

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

CloudDataTransferServiceGetOperationOperator

取得傳輸操作。結果會傳回至 XCOM。

有關參數定義,請參閱 CloudDataTransferServiceGetOperationOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

get_operation = CloudDataTransferServiceGetOperationOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)

範本

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "google_impersonation_chain",
)

CloudDataTransferServiceListOperationsOperator

列出傳輸操作。結果會傳回至 XCOM。

有關參數定義,請參閱 CloudDataTransferServiceListOperationsOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

list_operations = CloudDataTransferServiceListOperationsOperator(
    task_id="list_operations",
    request_filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
    },
)

範本

template_fields: Sequence[str] = (
    "request_filter",
    "gcp_conn_id",
    "google_impersonation_chain",
)

CloudDataTransferServicePauseOperationOperator

暫停傳輸操作。

有關參數定義,請參閱 CloudDataTransferServicePauseOperationOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

pause_operation = CloudDataTransferServicePauseOperationOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)

範本

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

CloudDataTransferServiceResumeOperationOperator

恢復傳輸操作。

有關參數定義,請參閱 CloudDataTransferServiceResumeOperationOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

resume_operation = CloudDataTransferServiceResumeOperationOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)

範本

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

CloudDataTransferServiceJobStatusSensor

等待屬於該工作的至少一個操作具有預期狀態。

有關參數定義,請參閱 CloudDataTransferServiceJobStatusSensor

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[原始碼]

wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)

範本

template_fields: Sequence[str] = (
    "job_name",
    "impersonation_chain",
)

CloudDataTransferServiceGCSToGCSOperator

將資料從一個 GCS 儲存桶複製到另一個。

有關參數定義,請參閱 CloudDataTransferServiceGCSToGCSOperator

使用運算子

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcs_to_gcs.py[原始碼]

transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
    task_id="transfer_gcs_to_gcs",
    source_bucket=BUCKET_NAME_SRC,
    source_path=FILE_URI,
    destination_bucket=BUCKET_NAME_DST,
    destination_path=FILE_URI,
    wait=True,
)

範本

template_fields: Sequence[str] = (
    "gcp_conn_id",
    "source_bucket",
    "destination_bucket",
    "source_path",
    "destination_path",
    "description",
    "object_conditions",
    "google_impersonation_chain",
)

參考資料

如需更多資訊,請參閱

此條目是否有幫助?