Google Cloud Datastore Operators

Firestore 在 Datastore 模式中是一個 NoSQL 文件資料庫,專為自動擴展、高效能和易於應用程式開發而建構。

有關此服務的更多資訊,請造訪Datastore 產品文件

先決條件任務

要使用這些 Operators,您必須執行以下幾項操作

匯出實體

若要將實體從 Google Cloud Datastore 匯出到 Cloud Storage,請使用 CloudDatastoreExportEntitiesOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET_NAME,
    project_id=PROJECT_ID,
    overwrite_existing=True,
)

匯入實體

若要將實體從 Cloud Storage 匯入到 Google Cloud Datastore,請使用 CloudDatastoreImportEntitiesOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=PROJECT_ID,
)

分配 ID

若要為不完整的鍵分配 ID,請使用 CloudDatastoreAllocateIdsOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)

Operator 需要的部分鍵範例

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

KEYS = [
    {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]

開始交易

若要開始新的交易,請使用 CloudDatastoreBeginTransactionOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=PROJECT_ID,
)

Operator 需要的交易選項範例

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}

提交交易

若要提交交易,可選擇性地建立、刪除或修改一些實體,請使用 CloudDatastoreCommitOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)

Operator 需要的提交資訊範例

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

    COMMIT_BODY = {
        "mode": "TRANSACTIONAL",
        "mutations": [
            {
                "insert": {
                    "key": KEYS[0],
                    "properties": {"string": {"stringValue": "airflow is awesome!"}},
                }
            }
        ],
        "singleUseTransaction": {"readWrite": {}},
    }

執行查詢

若要執行實體查詢,請使用 CloudDatastoreRunQueryOperator

tests/system/google/cloud/datastore/example_datastore_query.py[原始碼]

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)

Operator 需要的查詢範例

tests/system/google/cloud/datastore/example_datastore_query.py[原始碼]

    QUERY = {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
        "readOptions": {"transaction": begin_transaction_query.output},
        "query": {},
    }

回滾交易

若要回滾交易,請使用 CloudDatastoreRollbackOperator

tests/system/google/cloud/datastore/example_datastore_rollback.py[原始碼]

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction=begin_transaction_to_rollback.output,
)

取得操作狀態

若要取得長時間執行操作的目前狀態,請使用 CloudDatastoreGetOperationOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

get_operation = CloudDatastoreGetOperationOperator(
    task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)

刪除操作

若要刪除操作,請使用 CloudDatastoreDeleteOperationOperator

tests/system/google/cloud/datastore/example_datastore_commit.py[原始碼]

delete_export_operation = CloudDatastoreDeleteOperationOperator(
    task_id="delete_export_operation",
    name="{{ task_instance.xcom_pull('export_task')['name'] }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

參考資料

如需更多資訊,請參閱

此條目是否有幫助?