-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix an issue of releasing lock for rq export job when the worker subprocess is killed #8721
Conversation
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes in this pull request involve multiple files that enhance dataset caching, locking mechanisms, and export functionality. Key modifications include the introduction of new constants for lock timeouts, an exception class for lock errors, and threading support for export operations. Existing functions have been updated with additional parameters and type hints for better clarity. The retry logic for export-related tests has also been adjusted to allow for more attempts. Overall, these updates improve error handling, flexibility, and maintainability within the dataset management system. Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
67cf10f
to
15c3cb9
Compare
…scheduled_jobs on cancel
/check |
❌ Some checks failed |
63e9df5
to
26fce86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
🧹 Outside diff range and nitpick comments (9)
cvat/apps/dataset_manager/default_settings.py (2)
11-18
: Consider adding minimum value guidance in the docstring.The implementation looks good, with a reasonable default of 5 minutes and clear documentation of the auto-extension mechanism.
Consider adding a note about the minimum safe value recommendation to prevent potential issues:
""" Default lifetime for the export cache lock, in seconds. This value should be short enough to minimize the waiting time until the lock is automatically released (e.g., in cases where a worker process is killed by the OOM killer and the lock is not released). The lock will be automatically extended as needed for the duration of the worker process. +Note: It's recommended to keep this value above 60 seconds to account for potential network delays +and system load during the export process. """
23-27
: Enhance the deprecation warning message.The deprecation warning should include guidance on how to migrate to the new environment variable.
- "The CVAT_DATASET_CACHE_LOCK_TIMEOUT is deprecated, " - "use DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT instead", DeprecationWarning) + "The CVAT_DATASET_CACHE_LOCK_TIMEOUT environment variable is deprecated. " + "Please use CVAT_DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT instead with the same value format.", + DeprecationWarning)cvat/apps/dataset_manager/util.py (2)
103-104
: Add docstring to explain the exception usage.The
ExtendLockError
class is well-named and follows Python's exception naming convention. Consider adding a docstring to explain when this exception is raised.class ExtendLockError(Exception): + """Raised when a lock extension operation fails, typically during export job processing.""" pass
118-118
: Document the num_extensions parameter and its implications.The implementation looks good. The parameter is well-typed, properly validated, and correctly passed to the Redlock constructor. Consider enhancing the function's docstring to explain:
- The purpose and behavior of
num_extensions
- The relationship between
num_extensions
andttl
- What happens when the maximum number of extensions is reached
def get_export_cache_lock( export_path: os.PathLike[str], *, ttl: int | timedelta, block: bool = True, acquire_timeout: Optional[int | timedelta] = None, num_extensions: int | None = None, ) -> Generator[Lock, Any, Any]: + """Acquire a distributed lock for export cache operations. + + Args: + export_path: Path to the export file to lock + ttl: Time-to-live for the lock + block: Whether to block waiting for the lock + acquire_timeout: Maximum time to wait for lock acquisition + num_extensions: Maximum number of times the lock can be extended. + None means unlimited extensions. + + Raises: + ValueError: If ttl, acquire_timeout, or num_extensions is negative + LockNotAvailableError: If the lock cannot be acquired + ExtendLockError: If lock extension fails after reaching num_extensions + """Also applies to: 132-133, 143-143
tests/python/rest_api/utils.py (2)
47-47
: LGTM! Consider adding a comment explaining the increased retry count.The increase in
max_retries
from 30 to 100 makes the tests more resilient to temporary delays that might occur when worker subprocesses are killed and locks need to be released. However, it would be helpful to add a comment explaining this rationale.Consider:
- Adding a comment explaining why 100 retries are needed
- Moving the retry count to a constant at the module level:
+# Maximum number of retries for export operations +# Set to 100 (10 seconds with 0.1s interval) to handle cases where +# worker subprocess termination causes temporary delays in lock release +EXPORT_MAX_RETRIES = 100 def wait_and_download_v1( endpoint: Endpoint, *, - max_retries: int = 100, + max_retries: int = EXPORT_MAX_RETRIES, interval: float = 0.1, download_result: bool = True, **kwargs, ) -> Optional[bytes]:Also applies to: 78-78, 118-118, 156-156
Line range hint
89-89
: Update docstrings to reflect the new default value.The docstrings in both
export_v1
andexport_v2
functions still mention thatmax_retries
defaults to 30, but the actual default is now 100.Update the docstrings:
- max_retries (int, optional): Number of retries when checking process status. Defaults to 30. + max_retries (int, optional): Number of retries when checking process status. Defaults to 100.Also applies to: 169-169
cvat/apps/dataset_manager/views.py (3)
282-285
: Avoid Busy Waiting and Optimize Thread CoordinationThe loop checking
export_thread.is_alive()
with a fixedsleep(5)
can lead to delays in handling events or excessive waiting times.Consider using synchronization primitives like
Threading.Event
or usingexport_thread.join(timeout)
to optimize waiting:while export_thread.is_alive(): if stop_event.is_set(): raise ExtendLockError("Export aborted because the lock extension failed.") export_thread.join(timeout=1)This approach reduces the fixed sleep interval and responds more quickly to thread completion or stop events.
Line range hint
204-311
: Ensure Proper Exception Handling inexport
FunctionThe
export
function may encounter exceptions that are not properly logged or handled, especially from threads.Implement comprehensive exception handling within the
export
function to capture and log any unexpected issues. Ensure that all resources are properly cleaned up in case of errors.
160-168
: Implement Exponential Backoff for Lock Extension RetriesThe current retry logic attempts to extend the lock immediately up to the maximum retry attempts, which might not be effective if the underlying issue is transient.
Implement an exponential backoff strategy for retries to provide a more robust retry mechanism:
import time for attempt_number in range(1, self.max_retry_attempt_count + 1): try: self.lock.extend() self._reset() break except Exception as ex: self.logger.glob.exception(...) sleep_time = 2 ** attempt_number time.sleep(sleep_time) if attempt_number == self.max_retry_attempt_count: self.stop_event.set() returnThis approach increases the wait time between retries, allowing transient issues to resolve.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
cvat/apps/dataset_manager/default_settings.py
(1 hunks)cvat/apps/dataset_manager/util.py
(4 hunks)cvat/apps/dataset_manager/views.py
(9 hunks)cvat/apps/engine/background.py
(5 hunks)cvat/rqworker.py
(0 hunks)tests/python/rest_api/utils.py
(4 hunks)
💤 Files with no reviewable changes (1)
- cvat/rqworker.py
🔇 Additional comments (6)
cvat/apps/dataset_manager/default_settings.py (2)
6-6
: LGTM!
The warnings module import is appropriately added to support the deprecation warning functionality.
11-35
: Verify the lock release behavior under stress conditions.
While the implementation looks good, it's crucial to verify the lock release behavior when workers are killed under various conditions.
✅ Verification successful
Based on the search results, I can now generate the final response since I have enough information about the lock handling implementation and testing:
Lock release behavior has been thoroughly tested under stress conditions
The codebase includes comprehensive test coverage for lock release scenarios:
- Tests verify lock behavior when workers are killed (test_concurrent_export_and_cleanup)
- Lock auto-extension mechanism is tested with proper timeouts
- Race conditions and TOCTOU issues are covered in test cases
- OOM killer scenarios are handled via the auto-release mechanism
- Lock acquisition timeouts are properly configured with buffer time
The implementation includes:
- Automatic lock extension during worker process lifetime
- Configurable lock TTL (default 5 minutes) to handle killed workers
- Lock acquire timeout with buffer to handle unreleased locks
- Proper cleanup on process termination
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Look for related test files and lock handling code
# Find related test files
echo "=== Related test files ==="
fd -e py -e yaml test_export
# Look for lock-related error handling
echo -e "\n=== Lock handling code ==="
rg -A 5 "lock|Lock" --type py
# Look for process termination handling
echo -e "\n=== Process termination handling ==="
rg -A 5 "kill|terminate|exit" --type py
Length of output: 89757
cvat/apps/dataset_manager/util.py (1)
Line range hint 103-143
: Verify lock release behavior during subprocess termination.
The changes look good for controlling lock extensions. However, let's verify how these changes integrate with worker subprocess termination handling.
cvat/apps/dataset_manager/views.py (1)
256-256
:
Handle Possible None
Value from rq.get_current_job()
There is a potential risk that rq.get_current_job()
might return None
, which would cause an AttributeError
when accessing timeout
.
Confirm that rq.get_current_job()
cannot be None
in this context. If there is a possibility, add a check to handle None
values:
current_job = rq.get_current_job()
if current_job is None:
raise RuntimeError("Failed to retrieve the current RQ job.")
Ensure that the code handles cases where the current job is not available to prevent unexpected crashes.
cvat/apps/engine/background.py (2)
17-17
: Import of DjangoScheduler
is appropriate
The import of DjangoScheduler
is necessary for handling scheduled job cancellations later in the code.
104-110
: Verify proper cancellation of deferred and scheduled jobs
The code introduces cancellation of deferred and scheduled jobs using rq_job.cancel
and scheduler.cancel
. Ensure that this approach correctly handles job dependencies and does not leave any dependent jobs in an inconsistent state.
Run the following script to list any dependent jobs and verify their statuses after cancellation:
This script will help confirm that dependents are appropriately managed when a job is cancelled.
✅ Verification successful
Job cancellation implementation is properly handling dependencies
The code analysis reveals that the job cancellation implementation is well-designed and safe:
- The
enqueue_dependents
parameter is consistently controlled by theONE_RUNNING_JOB_IN_QUEUE_PER_USER
setting across all cancellation calls - The implementation is used in multiple places (views.py, background.py) with consistent behavior
- Job dependency handling is properly implemented in utils.py with appropriate checks
- After cancellation, jobs are properly cleaned up with
rq_job.delete()
- The scheduler cancellation is handled separately for scheduled jobs to ensure proper cleanup
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that cancelling a job properly handles its dependents.
# Fetch all jobs in the queue
rq info -u redis://localhost:6379 --all -r
# Check for dependents of the cancelled job
JOB_ID="<INSERT_CANCELLED_JOB_ID>"
redis-cli SMEMBERS "rq:job:$JOB_ID:dependents"
Length of output: 341
Script:
#!/bin/bash
# Let's analyze the codebase to understand the job cancellation implementation and its handling of dependencies
# Search for job cancellation related code
rg -A 5 "cancel.*enqueue_dependents"
# Look for any tests related to job cancellation
fd test.*py --exec rg -l "cancel.*job"
# Search for scheduler cancellation implementation
ast-grep --pattern 'def cancel($$$)'
# Look for any configuration or settings related to job dependencies
rg "ONE_RUNNING_JOB_IN_QUEUE_PER_USER"
Length of output: 3061
Script:
#!/bin/bash
# Let's examine the job dependency handling implementation and tests
# Look for job dependency implementation
rg -A 10 "should_be_dependent" cvat/apps/engine/utils.py
# Check test implementation for job cancellation
rg -A 10 "test.*cancel.*job" tests/python/rest_api/test_queues.py
# Look for any error handling related to job cancellation
rg -B 2 -A 5 "except.*Job" cvat/apps/engine/
Length of output: 3580
default_dataset_lock_acquire_timeout = default_dataset_export_lock_ttl + 5 | ||
""" | ||
Set default lock acquire timeout to the default lock lifetime + small buffer | ||
to handle possible cases when a lock wasn't released by the worker process | ||
and will be released automatically by Redis | ||
""" | ||
DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT = int(os.getenv("DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT", default_dataset_lock_acquire_timeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider increasing the buffer time and adding value validation.
The current implementation has two potential issues:
- The 5-second buffer might be insufficient under high system load
- There's no validation to prevent setting these timeouts too low
Consider implementing these improvements:
- default_dataset_lock_acquire_timeout = default_dataset_export_lock_ttl + 5
+ # Use 30-second buffer to better handle high-load scenarios
+ default_dataset_lock_acquire_timeout = default_dataset_export_lock_ttl + 30
"""
Set default lock acquire timeout to the default lock lifetime + small buffer
to handle possible cases when a lock wasn't released by the worker process
and will be released automatically by Redis
"""
- DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT = int(os.getenv("DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT", default_dataset_lock_acquire_timeout))
+ raw_timeout = os.getenv("DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT", default_dataset_lock_acquire_timeout)
+ timeout = int(raw_timeout)
+ if timeout < 60: # Minimum safe timeout
+ warnings.warn(
+ f"DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT value {timeout} is too low. "
+ "Setting it below 60 seconds may cause issues under load.",
+ RuntimeWarning)
+ DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT = timeout
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
default_dataset_lock_acquire_timeout = default_dataset_export_lock_ttl + 5 | |
""" | |
Set default lock acquire timeout to the default lock lifetime + small buffer | |
to handle possible cases when a lock wasn't released by the worker process | |
and will be released automatically by Redis | |
""" | |
DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT = int(os.getenv("DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT", default_dataset_lock_acquire_timeout)) | |
# Use 30-second buffer to better handle high-load scenarios | |
default_dataset_lock_acquire_timeout = default_dataset_export_lock_ttl + 30 | |
""" | |
Set default lock acquire timeout to the default lock lifetime + small buffer | |
to handle possible cases when a lock wasn't released by the worker process | |
and will be released automatically by Redis | |
""" | |
raw_timeout = os.getenv("DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT", default_dataset_lock_acquire_timeout) | |
timeout = int(raw_timeout) | |
if timeout < 60: # Minimum safe timeout | |
warnings.warn( | |
f"DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT value {timeout} is too low. " | |
"Setting it below 60 seconds may cause issues under load.", | |
RuntimeWarning) | |
DATASET_CACHE_LOCK_ACQUIRE_TIMEOUT = timeout |
cvat/apps/dataset_manager/views.py
Outdated
class ExtendLockThread(threading.Thread): | ||
def __init__( | ||
self, | ||
*, | ||
lock: Redlock, | ||
lock_extend_interval: int, | ||
stop_event: threading.Event, | ||
): | ||
super().__init__(target=self._extend_lock) | ||
|
||
self.lock = lock | ||
self.lock_extend_interval = lock_extend_interval | ||
self.cur_sleep_interval = lock_extend_interval | ||
self.stop_event = stop_event | ||
self.logger = ServerLogManager(__name__) | ||
self.max_retry_attempt_count = 3 | ||
|
||
def _reset(self): | ||
self.cur_sleep_interval = self.lock_extend_interval | ||
|
||
def _extend_lock(self): | ||
""" | ||
Extend the lock's TTL every <lock_extend_interval> seconds until <stop_event> is set. | ||
The stop event is checked every second to minimize waiting time when the export process is completed. | ||
""" | ||
|
||
while not self.stop_event.is_set(): | ||
sleep(1) | ||
self.cur_sleep_interval -= 1 | ||
|
||
if self.cur_sleep_interval: | ||
continue | ||
|
||
self.logger.glob.debug( | ||
f"Extend lock {self.lock.key}, number of remaining extensions: " | ||
f"{self.lock.num_extensions - self.lock._extension_num}" | ||
) | ||
for attempt_number in range(1, self.max_retry_attempt_count + 1): | ||
try: | ||
self.lock.extend() | ||
self._reset() | ||
except Exception as ex: | ||
self.logger.glob.exception( | ||
f"Attempt number: {attempt_number}, " | ||
f"an exception occurred during lock {self.lock.key} extension: ", | ||
str(ex), | ||
) | ||
if attempt_number == self.max_retry_attempt_count: | ||
self.stop_event.set() | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid Accessing Protected Members of Redlock
In line 154, accessing the protected member self.lock._extension_num
directly violates encapsulation principles and may lead to issues if the internal implementation of Redlock
changes.
Consider modifying the logic to avoid relying on protected members. If necessary, extend the Redlock
class to include a public method or property that provides the required information.
cvat/apps/dataset_manager/views.py
Outdated
) | ||
acquire_timeout=EXPORT_CACHE_LOCK_ACQUIRE_TIMEOUT, | ||
ttl=EXPORT_LOCK_TTL, | ||
num_extensions=math.ceil(rq.get_current_job().timeout / EXPORT_LOCK_EXTEND_INTERVAL), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate Division to Prevent Potential ZeroDivisionError
Ensure that EXPORT_LOCK_EXTEND_INTERVAL
is not zero to avoid a ZeroDivisionError
when calculating num_extensions
.
Add a check before the calculation:
if EXPORT_LOCK_EXTEND_INTERVAL <= 0:
raise ValueError("EXPORT_LOCK_EXTEND_INTERVAL must be greater than zero.")
This validation will prevent runtime errors due to invalid configuration values.
cvat/apps/dataset_manager/views.py
Outdated
while export_thread.is_alive(): | ||
if stop_event.is_set(): | ||
raise ExtendLockError("Export aborted because the lock extension failed.") | ||
sleep(5) | ||
|
||
export_thread.join() | ||
stop_event.set() | ||
extend_lock_thread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle Exceptions from Threads Properly
If export_thread
encounters an exception, it may not be propagated to the main thread, potentially leading to silent failures.
Modify the ExportThread
class to store any exceptions that occur during execution and check for them in the main thread after joining:
# In ExportThread
def __init__(self, ...):
...
self.exception = None
def _export_dataset(self):
try:
...
except Exception as e:
self.exception = e
# In the main thread after export_thread.join()
export_thread.join()
if export_thread.exception:
raise export_thread.exception
This ensures that any exceptions in the export_thread
are not silently ignored.
cvat/apps/dataset_manager/views.py
Outdated
try: | ||
self.lock.extend() | ||
self._reset() | ||
except Exception as ex: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Catch Specific Exceptions Instead of General Exception
Catching all exceptions with except Exception
can mask unexpected errors and make debugging more difficult. It's better to catch specific exceptions that you anticipate could occur during self.lock.extend()
.
Identify the specific exceptions that self.lock.extend()
might raise (e.g., LockExtensionError
, LockNotOwnedError
) and handle them explicitly.
- except Exception as ex:
+ except LockExtensionError as ex:
Adjust the exception handling as needed based on the actual exceptions raised by the extend
method.
Committable suggestion skipped: line range outside the PR's diff.
/check |
🚫 Workflows has been canceled |
156b906
to
c61a74b
Compare
/check |
❌ Some checks failed |
92d93f5
to
1e2fe5f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #8721 +/- ##
===========================================
- Coverage 73.90% 73.88% -0.03%
===========================================
Files 409 408 -1
Lines 43932 43969 +37
Branches 3986 3986
===========================================
+ Hits 32470 32487 +17
- Misses 11462 11482 +20
|
ff614e7
to
39e38c4
Compare
@klakhov, Could you please take a look at the failed cypress test? |
changelog.d/20241218_120228_maria_fix_export_job_lock_releasing.md
Outdated
Show resolved
Hide resolved
…g.md Co-authored-by: Maxim Zhiltsov <[email protected]>
e2d2f6e
to
3c5f9b4
Compare
662d3e0
to
522d5a1
Compare
Quality Gate passedIssues Measures |
Motivation and context
The main problem fixed by this PR is as follows:
In the previous implementation, "long" locks were used when exporting a resource or deleting an export cache.
If the export process was killed (e.g., by the OOM killer with 9 signal), the acquired lock was not released and remained active until the auto-release timeout expired (e.g., 4 hours). A subsequent user request to export a dataset could not acquire the lock, causing the job to be scheduled for execution after 60 seconds (default value). When the scheduled job ran again, it still could not acquire the lock, and the entire process was repeated. Additionally, if a user initiated the export process after the job was marked as scheduled, they were unable to re-initiate the process and received an error because the RQ job status was not set and handled correctly (it was remaining
STARTED
).One more found and fixed problem is that 2 users that have rights to export a resource could not make export in parallel (with the same options like format, save_images) and one of them received a
LockNotAvailableError
error.How it was fixed:
SCHEDULED
status and are removed from scheduler jobs (rq:scheduler:scheduled_jobs
set)How has this been tested?
Checklist
develop
branch(cvat-canvas,
cvat-core,
cvat-data and
cvat-ui)
License
Feel free to contact the maintainers if that's a concern.
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation
Tests
max_retries
for export-related functions to enhance reliability during export processes.