HTTP 操作器¶
以下程式碼範例使用 http_default
連線,這表示請求會傳送到 httpbin 網站以執行基本的 HTTP 操作。
HttpSensor¶
使用 HttpSensor
來輪詢,直到 response_check
可調用物件評估為 true
為止。
在這裡,我們輪詢直到 httpbin 給我們包含 httpbin
的回應文字。
task_http_sensor_check = HttpSensor(
task_id="http_sensor_check",
http_conn_id="http_default",
endpoint="",
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
此感測器也可以在可延遲模式中使用
task_http_sensor_check_async = HttpSensor(
task_id="http_sensor_check_async",
http_conn_id="http_default",
endpoint="",
deferrable=True,
request_params={},
poke_interval=5,
dag=dag,
)
HttpOperator¶
使用 HttpOperator
來呼叫 HTTP 請求並取得回應文字。
警告
透過 HttpOperator 設定 https
是違反直覺的
由於歷史原因,透過 HTTP 操作器設定 HTTPS
連線能力,嗯,是困難且違反直覺的。操作器預設為 http
協定,您可以透過 scheme
連線屬性來變更操作器使用的協定。然而,此欄位最初是為了資料庫類型的 URI 而新增到連線中,在資料庫類型的 URI 中,資料庫協定傳統上設定為 URI path
的第一個組件。因此,如果您想透過 URI 設定為 https
連線,您需要將 https
協定傳遞給 HttpOperator。儘管看起來很愚蠢,但您的連線 URI 會看起來像這樣:http://your_host:443/https
。然後,如果您想在 HttpOperator 中使用不同的 URL 路徑,您應該在執行任務時將您的路徑作為 endpoint
參數傳遞。例如,要對 https://your_host:443/my_endpoint
執行查詢,您需要將 endpoint 參數設定為 my_endpoint
。或者,如果您願意,您也可以對主機進行百分比編碼,包括 https://
前綴,只要它包含 ://
(百分比編碼 %3a%2f%2f
),路徑的第一個組件將不會被用作協定。您的 URI 定義可能會看起來像 http://https%3a%2f%2fyour_host:443/
。然而,在這種情況下,path
將完全不會被使用 - 如果您希望使用特定路徑發出請求,您仍然需要在任務中使用 endpoint
參數。儘管這違反直覺,但這是歷史上操作器/Hook 的運作方式,而且在不破壞向後相容性的情況下很難更改,因為還有其他操作器建立在 HttpOperator
之上,依賴該功能,而且已經有很多用戶在使用它了。
在第一個範例中,我們呼叫一個帶有 json 資料的 POST
請求,並且當我們收到相同的 json 資料時成功,否則任務將會失敗。
task_post_op = HttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()["json"]["priority"] == 5,
dag=dag,
)
在這裡,我們呼叫一個 GET
請求並傳遞參數給它。無論回應文字為何,任務都會成功。
task_get_op = HttpOperator(
task_id="get_op",
method="GET",
endpoint="get",
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
HttpOperator 預設會將回應主體以文字形式傳回。如果您想在將回應傳遞到下一個下游任務之前修改它,請使用 response_filter
。這在以下情況下很有用:
您正在使用的 API 傳回大型 JSON 酬載,而您只對資料的子集感興趣
API 傳回 xml 或 csv 格式的資料,而您想將其轉換為 JSON
您對回應的標頭感興趣,而不是主體
以下是從 REST API 檢索資料並僅傳回巢狀屬性而不是完整回應主體的範例。
task_get_op_response_filter = HttpOperator(
task_id="get_op_response_filter",
method="GET",
endpoint="get",
response_filter=lambda response: response.json()["nested"]["property"],
dag=dag,
)
在第三個範例中,我們執行 PUT
操作以根據提供給請求的資料來放置/設定資料。
task_put_op = HttpOperator(
task_id="put_op",
method="PUT",
endpoint="put",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag,
)
在此範例中,我們呼叫 DELETE
操作到 delete
端點。這次我們將表單資料傳遞給請求。
task_del_op = HttpOperator(
task_id="del_op",
method="DELETE",
endpoint="delete",
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
在這裡,我們將表單資料傳遞給 POST
操作,這相當於一般的表單提交。
task_post_op_formenc = HttpOperator(
task_id="post_op_formenc",
endpoint="post",
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
HttpOperator
也允許重複呼叫 API 端點,通常用於循環遍歷其頁面。所有 API 回應都由 Operator 儲存在記憶體中,並以單一結果傳回。因此,與非分頁呼叫相比,它可能更耗費記憶體和 CPU。
預設情況下,HttpOperator 的結果將會變成 Response.text 的列表(而不是單個 Response.text 物件)。
範例 - 假設您的 API 傳回包含游標的 JSON 主體:您可以編寫一個 pagination_function
,它將接收請求的原始 request.Response
物件,並根據此游標產生新的請求參數(作為 dict
)。HttpOperator 將重複呼叫 API,直到該函數停止傳回任何內容。
def get_next_page_cursor(response) -> dict | None:
"""
Take the raw `request.Response` object, and check for a cursor.
If a cursor exists, this function creates and return parameters to call
the next page of result.
"""
next_cursor = response.json().get("cursor")
if next_cursor:
return dict(data={"cursor": next_cursor})
return None
task_get_paginated = HttpOperator(
task_id="get_paginated",
method="GET",
endpoint="get",
data={"cursor": ""},
pagination_function=get_next_page_cursor,
dag=dag,
)