diff --git a/airflow/assets/manager.py b/airflow/assets/manager.py index 364d01607e5c4..99de69176a878 100644 --- a/airflow/assets/manager.py +++ b/airflow/assets/manager.py @@ -174,11 +174,6 @@ def register_asset_change( if alias_ref.dag.is_active and not alias_ref.dag.is_paused } - dags_to_reparse = dags_to_queue_from_asset_alias - dags_to_queue_from_asset - if dags_to_reparse: - file_locs = {dag.fileloc for dag in dags_to_reparse} - cls._send_dag_priority_parsing_request(file_locs, session) - cls.notify_asset_changed(asset=asset) Stats.incr("asset.updates") diff --git a/tests/assets/test_manager.py b/tests/assets/test_manager.py index afbbfc23adee1..b47dae7f2e7f1 100644 --- a/tests/assets/test_manager.py +++ b/tests/assets/test_manager.py @@ -34,7 +34,6 @@ DagScheduleAssetReference, ) from airflow.models.dag import DagModel -from airflow.models.dagbag import DagPriorityParsingRequest from airflow.sdk.definitions.asset import Asset, AssetAlias from tests.listeners import asset_listener @@ -139,7 +138,6 @@ def test_register_asset_change_with_alias(self, session, dag_maker, mock_task_in # Ensure we've created an asset assert session.query(AssetEvent).filter_by(asset_id=asm.id).count() == 1 assert session.query(AssetDagRunQueue).count() == 2 - assert session.query(DagPriorityParsingRequest).count() == 2 def test_register_asset_change_no_downstreams(self, session, mock_task_instance): asset_manager = AssetManager()