Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Task instance history from task SDK #44952

Open
amoghrajesh opened this issue Dec 16, 2024 · 0 comments
Open

Handle Task instance history from task SDK #44952

amoghrajesh opened this issue Dec 16, 2024 · 0 comments
Assignees
Labels
area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk

Comments

@amoghrajesh
Copy link
Contributor

amoghrajesh commented Dec 16, 2024

Body

Usage:

if ti.state == TaskInstanceState.RUNNING:
# If the task instance is in the running state, it means it raised an exception and
# about to retry so we record the task instance history. For other states, the task
# instance was cleared and already recorded in the task instance history.
from airflow.models.taskinstancehistory import TaskInstanceHistory
TaskInstanceHistory.record_ti(ti, session=session)

@amoghrajesh amoghrajesh added kind:meta High-level information important to the community and removed kind:meta High-level information important to the community labels Dec 16, 2024
@amoghrajesh amoghrajesh self-assigned this Dec 16, 2024
@amoghrajesh amoghrajesh added area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk labels Dec 16, 2024
kaxil pushed a commit that referenced this issue Dec 18, 2024
…lowSensorTimeout` (#44954)

related: #44414

We already have support for handling terminal states from the task execution side as well as the task SDK client side. (almost) and failed state is part of the terminal state.

This PR extends the task runner's run function to handle cases when we have to fail a task: `AirflowFailException, AirflowSensorTimeout`. It is functionally very similar to #44786

As part of failing a task, multiple other things also needs to be done like:
- Callbacks: which will eventually be converted to teardown tasks
- Retries: Handled in #44351
- unmapping TIs: #44351
- Handling task history: will be handled by #44952
- Handling downstream tasks and non teardown tasks: will be handled by #44951

### Testing performed
#### End to End with Postman

1. Run airflow with breeze and run any DAG
![image](https://github.com/user-attachments/assets/fafc89ea-4e28-4802-912b-d72bf401d94b)

2. Login to metadata DB and get the "id" for your task instance from TI table
![image](https://github.com/user-attachments/assets/75440f0f-f62a-4277-a2e6-cb78bd666dd4)

3. Send a request to `fail` your task
![image](https://github.com/user-attachments/assets/5991e944-f416-4b79-9954-15f1a6ebdd79)

Or using curl:
```
curl --location --request PATCH 'http://localhost:29091/execution/task-instances/0193cec2-f46b-7348-9c27-9869d835dc7b/state' \
--header 'Content-Type: application/json' \
--data '{
    "state": "failed",
    "end_date": "2024-10-31T12:00:00Z"
}'
```

4. Refresh back the Airflow UI to see that the task is in failed state.
![image](https://github.com/user-attachments/assets/bb866dc6-e1d6-435e-abe4-2d04c97280ad)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Development

No branches or pull requests

1 participant