Google Dataplex 運算子

Dataplex 是一個智慧型資料網狀架構,可在您的資料湖、資料倉儲和資料超市之間提供統一的分析和資料管理。

如需更多關於此任務的資訊,請造訪 Dataplex 生產文件 <產品文件

建立任務

在您建立 Dataplex 任務之前,您需要定義其主體。如需更多關於建立任務時可傳遞的可用欄位資訊,請造訪 Dataplex 建立任務 API。

一個簡單的任務配置範例如下

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

透過此配置,我們可以同步和非同步地建立任務: DataplexCreateTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

刪除任務

要刪除任務,您可以使用

DataplexDeleteTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

列出任務

要列出任務,您可以使用

DataplexListTasksOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

取得任務

要取得任務,您可以使用

DataplexGetTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

等待任務

要等待非同步建立的任務,您可以使用

DataplexTaskStateSensor

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

建立湖泊

在您建立 Dataplex 湖泊之前,您需要定義其主體。

如需更多關於建立湖泊時可傳遞的可用欄位資訊,請造訪 Dataplex 建立湖泊 API。

一個簡單的任務配置範例如下

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

透過此配置,我們可以建立湖泊

DataplexCreateLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

刪除湖泊

要刪除湖泊,您可以使用

DataplexDeleteLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py[原始碼]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

建立或更新資料品質掃描

在您建立 Dataplex 資料品質掃描之前,您需要定義其主體。如需更多關於建立資料品質掃描時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資料品質 API。

一個簡單的資料品質掃描配置範例如下

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

透過此配置,我們可以建立或更新資料品質掃描

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

取得資料品質掃描

要取得資料品質掃描,您可以使用

DataplexGetDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

刪除資料品質掃描

要刪除資料品質掃描,您可以使用

DataplexDeleteDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

執行資料品質掃描

您可以以非同步模式執行 Dataplex 資料品質掃描,以便稍後使用感測器檢查其狀態

DataplexRunDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要檢查執行的 Dataplex 資料品質掃描是否成功,您可以使用

DataplexDataQualityJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

對於此操作,您也可以在可延遲模式下使用運算子

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

取得資料品質掃描工作

要取得資料品質掃描工作,您可以使用

DataplexGetDataQualityScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

對於此操作,您也可以在可延遲模式下使用運算子

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

建立區域

在您建立 Dataplex 區域之前,您需要定義其主體。

如需更多關於建立區域時可傳遞的可用欄位資訊,請造訪 Dataplex 建立區域 API。

一個簡單的區域配置範例如下

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

透過此配置,我們可以建立區域

DataplexCreateZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

刪除區域

要刪除區域,您可以使用

DataplexDeleteZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立資產

在您建立 Dataplex 資產之前,您需要定義其主體。

如需更多關於建立資產時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資產 API。

一個簡單的資產配置範例如下

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

透過此配置,我們可以建立資產

DataplexCreateAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

刪除資產

要刪除資產,您可以使用

DataplexDeleteAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[原始碼]

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立或更新資料剖析掃描

在您建立 Dataplex 資料剖析掃描之前,您需要定義其主體。如需更多關於建立資料剖析掃描時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資料剖析 API。

一個簡單的資料剖析掃描配置範例如下

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

透過此配置,我們可以建立或更新資料剖析掃描

DataplexCreateOrUpdateDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

取得資料剖析掃描

要取得資料剖析掃描,您可以使用

DataplexGetDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

刪除資料剖析掃描

要刪除資料剖析掃描,您可以使用

DataplexDeleteDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

執行資料剖析掃描

您可以以非同步模式執行 Dataplex 資料剖析掃描,以便稍後使用感測器檢查其狀態

DataplexRunDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要檢查執行的 Dataplex 資料剖析掃描是否成功,您可以使用

DataplexDataProfileJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

對於此操作,您也可以在可延遲模式下使用運算子

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

取得資料剖析掃描工作

要取得資料剖析掃描工作,您可以使用

DataplexGetDataProfileScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[原始碼]

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

這個條目是否有幫助?