Skip to content

Commit

Permalink
fixup! Swap Dag Parsing to use the TaskSDK machinery.
Browse files Browse the repository at this point in the history
  • Loading branch information
ashb committed Dec 18, 2024
1 parent d4935ea commit e61c595
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ def terminate(self):
"""Send termination signal to DAG parsing processor manager to terminate all DAG file processors."""
if self._process and self._process.is_alive():
self.log.info("Sending termination message to manager.")
self._parent_signal_conn.send(None)
try:
self._parent_signal_conn.send(None)
except ConnectionError:
pass
self._parent_signal_conn.close()

def end(self):
Expand Down Expand Up @@ -321,7 +324,7 @@ class DagFileProcessorManager:
stale_dag_threshold: float = attrs.field(factory=_config_int_factory("scheduler", "stale_dag_threshold"))
last_dag_dir_refresh_time: float = attrs.field(default=0, init=False)

log: logging.Logger = log
log: logging.Logger = attrs.field(default=log, init=False)

_last_deactivate_stale_dags_time: float = attrs.field(default=0, init=False)
print_stats_interval: float = attrs.field(
Expand Down

0 comments on commit e61c595

Please sign in to comment.