Papermill¶
Apache Airflow 支援與 Papermill 整合。Papermill 是一個用於參數化和執行 Jupyter Notebooks 的工具。也許您有一個財務報告,希望在每個月的第一天或最後一天,或在年初或年末使用不同的值來執行。在您的 notebook 中使用參數,並使用 PapermillOperator
,這會變得輕而易舉。
用法¶
建立 notebook¶
為了參數化您的 notebook,請指定一個帶有標籤 parameters 的儲存格。Papermill 會尋找 parameters 儲存格,並將此儲存格視為執行時傳入的參數的預設值。Papermill 將新增一個帶有標籤 injected-parameters 的新儲存格,其中包含輸入參數,以便覆寫 parameters 中的值。如果沒有儲存格標記為 parameters,則注入的儲存格將插入到 notebook 的頂部。
請確保將您的 notebook 儲存在 Airflow 可以存取的位置。Papermill 支援 S3、GCS、Azure 和 Local。不支援 HDFS。
範例 DAG¶
使用 PapermillOperator
來執行 jupyter notebook
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb="/tmp/hello_world.ipynb",
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)
範例 DAG,用於驗證 notebook 中的訊息
@task
def check_notebook(inlets, logical_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(inlets[0].url)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {logical_date}")
if message.data != f"Ran from Airflow at {logical_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_verify",
schedule=SCHEDULE_INTERVAL,
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ logical_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
)
run_this >> check_notebook(inlets=AUTO, logical_date="{{ logical_date }}")
範例 DAG,用於驗證 notebook 中的訊息,使用遠端 jupyter kernel
@task
def check_notebook(output_notebook, execution_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(output_notebook)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {execution_date}")
if message.data != f"Ran from Airflow at {execution_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_remote_verify",
schedule="@once",
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
kernel_conn_id="jupyter_kernel_default",
)
run_this >> check_notebook(
output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
)