Google Cloud Datastore Operators¶
Firestore 在 Datastore 模式中是一個 NoSQL 文件資料庫,專為自動擴展、高效能和易於應用程式開發而建構。
有關此服務的更多資訊,請造訪Datastore 產品文件
先決條件任務¶
要使用這些 Operators,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用計費功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 程式庫。
pip install 'apache-airflow[google]'詳細資訊請參閱安裝。
匯出實體¶
若要將實體從 Google Cloud Datastore 匯出到 Cloud Storage,請使用 CloudDatastoreExportEntitiesOperator
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET_NAME,
project_id=PROJECT_ID,
overwrite_existing=True,
)
匯入實體¶
若要將實體從 Cloud Storage 匯入到 Google Cloud Datastore,請使用 CloudDatastoreImportEntitiesOperator
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
project_id=PROJECT_ID,
)
分配 ID¶
若要為不完整的鍵分配 ID,請使用 CloudDatastoreAllocateIdsOperator
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
Operator 需要的部分鍵範例
KEYS = [
{
"partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
"path": {"kind": "airflow"},
}
]
開始交易¶
若要開始新的交易,請使用 CloudDatastoreBeginTransactionOperator
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
Operator 需要的交易選項範例
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
提交交易¶
若要提交交易,可選擇性地建立、刪除或修改一些實體,請使用 CloudDatastoreCommitOperator
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
Operator 需要的提交資訊範例
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
"mutations": [
{
"insert": {
"key": KEYS[0],
"properties": {"string": {"stringValue": "airflow is awesome!"}},
}
}
],
"singleUseTransaction": {"readWrite": {}},
}
執行查詢¶
若要執行實體查詢,請使用 CloudDatastoreRunQueryOperator
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
Operator 需要的查詢範例
QUERY = {
"partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
"readOptions": {"transaction": begin_transaction_query.output},
"query": {},
}
回滾交易¶
若要回滾交易,請使用 CloudDatastoreRollbackOperator
rollback_transaction = CloudDatastoreRollbackOperator(
task_id="rollback_transaction",
transaction=begin_transaction_to_rollback.output,
)
取得操作狀態¶
若要取得長時間執行操作的目前狀態,請使用 CloudDatastoreGetOperationOperator
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
刪除操作¶
若要刪除操作,請使用 CloudDatastoreDeleteOperationOperator
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)