Skip to content

Commit

Permalink
Fix task completion time to use airflow task end time instead of dag …
Browse files Browse the repository at this point in the history
…end time (#869)

Signed-off-by: henneberger <[email protected]>

Co-authored-by: Willy Lulciuc <[email protected]>
  • Loading branch information
henneberger and wslulciuc authored Dec 15, 2020
1 parent 3c6cde6 commit 2afc493
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
8 changes: 6 additions & 2 deletions integrations/airflow/marquez_airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ def _report_task_instance(self, ti, dagrun, run_args, session):
self.log.debug(f'Setting task state: {ti.state}'
f' for {ti.task_id}')
if ti.state in {State.SUCCESS, State.SKIPPED}:
self._marquez.complete_run(run_id)
self._marquez.complete_run(
run_id,
DagUtils.to_iso_8601(ti.end_date))
else:
self._marquez.fail_run(run_id)
self._marquez.fail_run(
run_id,
DagUtils.to_iso_8601(ti.end_date))

def _extract_metadata(self, dagrun, task, ti=None):
extractor = self._get_extractor(task)
Expand Down
8 changes: 4 additions & 4 deletions integrations/airflow/marquez_airflow/marquez.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ def _register_dataset(self, step_dataset, run_id=None):
inputs = self._to_dataset_ids(step_dataset)
return inputs

def complete_run(self, run_id):
def complete_run(self, run_id, at):
self.get_or_create_marquez_client(). \
mark_job_run_as_completed(run_id=run_id)
mark_job_run_as_completed(run_id=run_id, at=at)

def fail_run(self, run_id):
def fail_run(self, run_id, at):
self.get_or_create_marquez_client().mark_job_run_as_failed(
run_id=run_id)
run_id=run_id, at=at)

def start_run(self, run_id, start):
self.get_or_create_marquez_client() \
Expand Down
12 changes: 8 additions & 4 deletions integrations/airflow/tests/test_marquez_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def test_marquez_dag(mock_get_or_create_marquez_client, mock_uuid,
)

mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
run_id=run_id_completed
run_id=run_id_completed,
at=mock.ANY
)

# When a task run completes, the task outputs are also updated in order
Expand All @@ -215,7 +216,8 @@ def test_marquez_dag(mock_get_or_create_marquez_client, mock_uuid,

dag.handle_callback(dagrun, success=False, session=session)
mock_marquez_client.mark_job_run_as_failed.assert_called_once_with(
run_id=run_id_failed
run_id=run_id_failed,
at=mock.ANY
)

# Assert an attempt to version the outputs of a task is not made when
Expand Down Expand Up @@ -455,7 +457,8 @@ def test_marquez_dag_with_extractor(mock_get_or_create_marquez_client,
kwargs['context'].get('extract') == 'extract'

mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
run_id=run_id
run_id=run_id,
at=mock.ANY
)

# When a task run completes, the task outputs are also updated in order
Expand Down Expand Up @@ -676,7 +679,8 @@ def test_marquez_dag_with_extract_on_complete(
kwargs['context'].get('extract_on_complete') == 'extract_on_complete'

mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
run_id=run_id
run_id=run_id,
at=mock.ANY
)

# When a task run completes, the task outputs are also updated in order
Expand Down

0 comments on commit 2afc493

Please sign in to comment.