OpenSearch

OpenSearch

操作器

在 OpenSearch 中建立索引

使用 OpenSearchCreateIndexOperator 以在 OpenSearch 網域中建立新索引。

tests/system/opensearch/example_opensearch.py[原始碼]

create_index = OpenSearchCreateIndexOperator(
    task_id="create_index",
    index_name=INDEX_NAME,
    index_body={"settings": {"index": {"number_of_shards": 1}}},
)

在 OpenSearch 的索引中新增文件

使用 OpenSearchAddDocumentOperator 以將單一文件新增至 OpenSearch 索引

tests/system/opensearch/example_opensearch.py[原始碼]

add_document_by_args = OpenSearchAddDocumentOperator(
    task_id="add_document_with_args",
    index_name=INDEX_NAME,
    doc_id=1,
    document={"log_group_id": 1, "logger": "python", "message": "hello world"},
)

add_document_by_class = OpenSearchAddDocumentOperator(
    task_id="add_document_by_class",
    doc_class=LogDocument(log_group_id=2, logger="airflow", message="hello airflow"),
)

對 OpenSearch 索引執行查詢

使用 OpenSearchQueryOperator 以對 OpenSearch 索引執行查詢。

tests/system/opensearch/example_opensearch.py[原始碼]

search_low_level = OpenSearchQueryOperator(
    task_id="low_level_query",
    index_name="system_test",
    query={"query": {"bool": {"must": {"match": {"message": "hello world"}}}}},
)
search = Search()
search._index = [INDEX_NAME]
search_object = search.filter("term", logger="airflow").query("match", message="hello airflow")

search_high_level = OpenSearchQueryOperator(task_id="high_level_query", search_object=search_object)

這個條目有幫助嗎?