Skip to content

Commit

Permalink
Add only_failed option to clear DagRun endpoint (#44988)
Browse files Browse the repository at this point in the history
* Add only_failed option to clear DagRun endpoint

* Fix CI
  • Loading branch information
pierrejeambrun authored Dec 18, 2024
1 parent cce2faf commit 1ce34cc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 19 deletions.
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DAGRunClearBody(BaseModel):
"""DAG Run serializer for clear endpoint body."""

dry_run: bool = True
only_failed: bool = False


class DAGRunResponse(BaseModel):
Expand Down
4 changes: 4 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7048,6 +7048,10 @@ components:
type: boolean
title: Dry Run
default: true
only_failed:
type: boolean
title: Only Failed
default: false
type: object
title: DAGRunClearBody
description: DAG Run serializer for clear endpoint body.
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def clear_dag_run(
start_date=start_date,
end_date=end_date,
task_ids=None,
only_failed=False,
only_failed=body.only_failed,
dry_run=True,
session=session,
)
Expand All @@ -248,7 +248,7 @@ def clear_dag_run(
start_date=start_date,
end_date=end_date,
task_ids=None,
only_failed=False,
only_failed=body.only_failed,
session=session,
)
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,11 @@ export const $DAGRunClearBody = {
title: "Dry Run",
default: true,
},
only_failed: {
type: "boolean",
title: "Only Failed",
default: false,
},
},
type: "object",
title: "DAGRunClearBody",
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ export type DAGResponse = {
*/
export type DAGRunClearBody = {
dry_run?: boolean;
only_failed?: boolean;
};

/**
Expand Down
55 changes: 38 additions & 17 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def setup(request, dag_maker, session=None):
start_date=START_DATE1,
):
task1 = EmptyOperator(task_id="task_1")
task2 = EmptyOperator(task_id="task_2")

dag_run1 = dag_maker.create_dagrun(
run_id=DAG1_RUN1_ID,
state=DAG1_RUN1_STATE,
Expand All @@ -99,18 +101,29 @@ def setup(request, dag_maker, session=None):

dag_run1.note = (DAG1_RUN1_NOTE, 1)

ti1 = dag_run1.get_task_instance(task_id="task_1")
ti1.task = task1
ti1.state = State.SUCCESS
for task in [task1, task2]:
ti = dag_run1.get_task_instance(task_id=task.task_id)
ti.task = task
ti.state = State.SUCCESS

dag_maker.create_dagrun(
session.merge(ti)

dag_run2 = dag_maker.create_dagrun(
run_id=DAG1_RUN2_ID,
state=DAG1_RUN2_STATE,
run_type=DAG1_RUN2_RUN_TYPE,
triggered_by=DAG1_RUN2_TRIGGERED_BY,
logical_date=LOGICAL_DATE2,
)

ti1 = dag_run2.get_task_instance(task_id=task1.task_id)
ti1.task = task1
ti1.state = State.SUCCESS

ti2 = dag_run2.get_task_instance(task_id=task2.task_id)
ti2.task = task2
ti2.state = State.FAILED

with dag_maker(DAG2_ID, schedule=None, start_date=START_DATE2, params=DAG2_PARAM):
EmptyOperator(task_id="task_2")
dag_maker.create_dagrun(
Expand All @@ -132,6 +145,7 @@ def setup(request, dag_maker, session=None):
dag_maker.dag_model
dag_maker.dag_model.has_task_concurrency_limits = True
session.merge(ti1)
session.merge(ti2)
session.merge(dag_maker.dag_model)
session.commit()

Expand Down Expand Up @@ -204,9 +218,9 @@ def get_dag_run_dict(run: DagRun):
"end_date": from_datetime_to_zulu(run.end_date),
"data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start),
"data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end),
"last_scheduling_decision": from_datetime_to_zulu(run.last_scheduling_decision)
if run.last_scheduling_decision
else None,
"last_scheduling_decision": (
from_datetime_to_zulu(run.last_scheduling_decision) if run.last_scheduling_decision else None
),
"run_type": run.run_type,
"state": run.state,
"external_trigger": run.external_trigger,
Expand Down Expand Up @@ -492,9 +506,11 @@ def get_dag_run_dict(run: DagRun):
"end_date": from_datetime_to_zulu(run.end_date),
"data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start),
"data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end),
"last_scheduling_decision": from_datetime_to_zulu_without_ms(run.last_scheduling_decision)
if run.last_scheduling_decision
else None,
"last_scheduling_decision": (
from_datetime_to_zulu_without_ms(run.last_scheduling_decision)
if run.last_scheduling_decision
else None
),
"run_type": run.run_type,
"state": run.state,
"external_trigger": run.external_trigger,
Expand Down Expand Up @@ -1024,16 +1040,21 @@ def test_clear_dag_run(self, test_client):
assert body["state"] == "queued"

@pytest.mark.parametrize(
"body",
[{"dry_run": True}, {}],
"body, dag_run_id, expected_state",
[
[{"dry_run": True}, DAG1_RUN1_ID, ["success", "success"]],
[{}, DAG1_RUN1_ID, ["success", "success"]],
[{}, DAG1_RUN2_ID, ["success", "failed"]],
[{"only_failed": True}, DAG1_RUN2_ID, ["failed"]],
],
)
def test_clear_dag_run_dry_run(self, test_client, session, body):
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear", json=body)
def test_clear_dag_run_dry_run(self, test_client, session, body, dag_run_id, expected_state):
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns/{dag_run_id}/clear", json=body)
assert response.status_code == 200
body = response.json()
assert body["total_entries"] == 1
for each in body["task_instances"]:
assert each["state"] == "success"
assert body["total_entries"] == len(expected_state)
for index, each in enumerate(sorted(body["task_instances"], key=lambda x: x["task_id"])):
assert each["state"] == expected_state[index]
dag_run = session.scalar(select(DagRun).filter_by(dag_id=DAG1_ID, run_id=DAG1_RUN1_ID))
assert dag_run.state == DAG1_RUN1_STATE

Expand Down

0 comments on commit 1ce34cc

Please sign in to comment.