Google Dataplex 運算子¶
Dataplex 是一個智慧型資料網狀架構,可在您的資料湖、資料倉儲和資料超市之間提供統一的分析和資料管理。
如需更多關於此任務的資訊,請造訪 Dataplex 生產文件 <產品文件
建立任務¶
在您建立 Dataplex 任務之前,您需要定義其主體。如需更多關於建立任務時可傳遞的可用欄位資訊,請造訪 Dataplex 建立任務 API。
一個簡單的任務配置範例如下
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
透過此配置,我們可以同步和非同步地建立任務: DataplexCreateTaskOperator
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
刪除任務¶
要刪除任務,您可以使用
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
列出任務¶
要列出任務,您可以使用
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
取得任務¶
要取得任務,您可以使用
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
等待任務¶
要等待非同步建立的任務,您可以使用
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
建立湖泊¶
在您建立 Dataplex 湖泊之前,您需要定義其主體。
如需更多關於建立湖泊時可傳遞的可用欄位資訊,請造訪 Dataplex 建立湖泊 API。
一個簡單的任務配置範例如下
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
透過此配置,我們可以建立湖泊
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
刪除湖泊¶
要刪除湖泊,您可以使用
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
建立或更新資料品質掃描¶
在您建立 Dataplex 資料品質掃描之前,您需要定義其主體。如需更多關於建立資料品質掃描時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資料品質 API。
一個簡單的資料品質掃描配置範例如下
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
{
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
)
透過此配置,我們可以建立或更新資料品質掃描
DataplexCreateOrUpdateDataQualityScanOperator
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
取得資料品質掃描¶
要取得資料品質掃描,您可以使用
DataplexGetDataQualityScanOperator
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
刪除資料品質掃描¶
要刪除資料品質掃描,您可以使用
DataplexDeleteDataQualityScanOperator
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
執行資料品質掃描¶
您可以以非同步模式執行 Dataplex 資料品質掃描,以便稍後使用感測器檢查其狀態
DataplexRunDataQualityScanOperator
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要檢查執行的 Dataplex 資料品質掃描是否成功,您可以使用
DataplexDataQualityJobStatusSensor
.
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
對於此操作,您也可以在可延遲模式下使用運算子
run_data_scan_def = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
取得資料品質掃描工作¶
要取得資料品質掃描工作,您可以使用
DataplexGetDataQualityScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
對於此操作,您也可以在可延遲模式下使用運算子
get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
建立區域¶
在您建立 Dataplex 區域之前,您需要定義其主體。
如需更多關於建立區域時可傳遞的可用欄位資訊,請造訪 Dataplex 建立區域 API。
一個簡單的區域配置範例如下
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
透過此配置,我們可以建立區域
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
刪除區域¶
要刪除區域,您可以使用
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立資產¶
在您建立 Dataplex 資產之前,您需要定義其主體。
如需更多關於建立資產時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資產 API。
一個簡單的資產配置範例如下
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
透過此配置,我們可以建立資產
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
刪除資產¶
要刪除資產,您可以使用
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立或更新資料剖析掃描¶
在您建立 Dataplex 資料剖析掃描之前,您需要定義其主體。如需更多關於建立資料剖析掃描時可傳遞的可用欄位資訊,請造訪 Dataplex 建立資料剖析 API。
一個簡單的資料剖析掃描配置範例如下
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})
透過此配置,我們可以建立或更新資料剖析掃描
DataplexCreateOrUpdateDataProfileScanOperator
create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
取得資料剖析掃描¶
要取得資料剖析掃描,您可以使用
DataplexGetDataProfileScanOperator
get_data_scan = DataplexGetDataProfileScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
刪除資料剖析掃描¶
要刪除資料剖析掃描,您可以使用
DataplexDeleteDataProfileScanOperator
delete_data_scan = DataplexDeleteDataProfileScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
執行資料剖析掃描¶
您可以以非同步模式執行 Dataplex 資料剖析掃描,以便稍後使用感測器檢查其狀態
DataplexRunDataProfileScanOperator
run_data_scan_async = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要檢查執行的 Dataplex 資料剖析掃描是否成功,您可以使用
DataplexDataProfileJobStatusSensor
.
get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
對於此操作,您也可以在可延遲模式下使用運算子
run_data_scan_def = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
取得資料剖析掃描工作¶
要取得資料剖析掃描工作,您可以使用
DataplexGetDataProfileScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)