Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][autoscaler] Fix incorrectly terminating nodes misclassified as idle in autoscaler v1 #48519

Merged
merged 11 commits into from
Dec 3, 2024

Conversation

mimiliaogo
Copy link
Contributor

@mimiliaogo mimiliaogo commented Nov 3, 2024

In autoscaler v1, nodes are incorrectly classified as idle based solely on their resource usage metrics. This misclassification can occur under the following conditions:

  1. Tasks running on the node do not have assigned resources.
  2. All tasks on the node are blocked on get or wait operations.

This will lead to the incorrect termination of nodes during downscaling.

To resolve this issue, use the idle_duration_ms reported by raylet instead, which already considers the aforementioned conditions. ref: #39582

Before: NodeDiedError

image

After

image

Reproduction Script (on local fake nodes)

  • Setting:
    head_nodes: < 10 cpus, worker nodes: 10 cpus
  • Code:
    import ray
    import time
    import os
    import random
    
    @ray.remote(max_retries=5, num_cpus=10)
    def inside_ray_task_with_outside():
        print('start inside_ray_task_with_outside')
        sleep_time = 15 
        start_time = time.perf_counter()
        while True:
            if(time.perf_counter() - start_time < sleep_time):
                time.sleep(0.001)
            else:
                break
    
    @ray.remote(max_retries=5, num_cpus=10)
    def inside_ray_task_without_outside():
        print('start inside_ray_task_without_outside task')
        sleep_time = 50 
        start_time = time.perf_counter()
        while True:
            if(time.perf_counter() - start_time < sleep_time):
                time.sleep(0.001)
            else:
                break
    
    @ray.remote(max_retries=0, num_cpus=10)
    def outside_ray_task():
        print('start outside_ray_task task')
        future_list = [inside_ray_task_with_outside.remote(), 
                            inside_ray_task_without_outside.remote()]
        ray.get(future_list)
    
    
    if __name__ == '__main__':
        ray.init()
        ray.get(outside_ray_task.remote())
    

Related issue number

Closes #46492

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@mimiliaogo mimiliaogo force-pushed the autoscaler-v1-idle-check branch 2 times, most recently from 8cd0be3 to 4f03be0 Compare November 3, 2024 18:46
@mimiliaogo mimiliaogo changed the title Fix incorrectly terminating nodes misclassified as idle in autoscaler v1 [core][autoscaler] Fix incorrectly terminating nodes misclassified as idle in autoscaler v1 Nov 3, 2024
@kevin85421 kevin85421 self-assigned this Nov 5, 2024
@mimiliaogo mimiliaogo marked this pull request as ready for review November 6, 2024 17:59
@mimiliaogo mimiliaogo requested review from hongchaodeng and a team as code owners November 6, 2024 17:59
@mimiliaogo mimiliaogo force-pushed the autoscaler-v1-idle-check branch from 0ba6dec to b65602f Compare November 12, 2024 17:38
@kevin85421
Copy link
Member

I will review this PR today

horizon = now - (60 * self.config["idle_timeout_minutes"])

# local import to avoid circular dependencies
from ray.autoscaler.v2.sdk import get_cluster_resource_state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autoscaler v1 relying on Autoscaler v2's functions is hacky for me. We should avoid that.

@mimiliaogo
Copy link
Contributor Author

Hi, here are my investigations:

  1. Autoscaler v1 uses GetAllResourceUsage to get the current resource usage, which actually contains a column about idle time. However, I found that the value of idle time is always 0 and the value is not being used now. They instead use a simple check on resources utilization here to decide if the node is idle, which will misclassify a blocking worker as idle.
    This is the previous PR about reporting idle time. Further investigations are needed to check if anything is wrong when reporting idle time.

  2. On the other side, Autoscaler v2 uses GetClusterResourceState to get resource state. The idle time reported here is correct.

The image shows the idle time from 1. is always 0, while the idle time from 2. is correct.
image

  1. Here's a related PR to solve the misclassification of idle nodes by setting footprint of busy worker. So the reported idle time from the local resource manager should already be handled and correct. However, I haven't figured out why the idle time in 1. is always 0.

From the investigations above, now we have two choices:

  1. Debug why idle time in GetAllResourceUsage is not correct, it should be consistent with the one reported by GetClusterResourceState.
  2. Directly call GetClusterResourceState to get the correct idle time.

IMO, 1. will be the best practice but I guess it will take a certain amount of time. 2. will be quicker fix.
If anyone from the team can give some insights for 1. will be very helpful.

@kevin85421 What do you think?

@kevin85421
Copy link
Member

IMO, 1. will be the best practice but I guess it will take a certain amount of time. 2. will be quicker fix.
If anyone from the team can give some insights for 1. will be very helpful.

Let me discuss this with my colleagues. Proceeding with option (2) first, and then having me take it over, is also an option.

@mimiliaogo mimiliaogo force-pushed the autoscaler-v1-idle-check branch 2 times, most recently from f663683 to efbe62e Compare November 18, 2024 04:27
@mimiliaogo
Copy link
Contributor Author

@kevin85421 Option 2 is done, the test error doesn't seem to be related to mine.
If debugging Option 1 is needed, I can also take some time to look at it.

@kevin85421
Copy link
Member

cc @rickyyx

@rickyyx
Copy link
Contributor

rickyyx commented Nov 19, 2024

Niceeeee!

2 is a fine approach to me - while GetClusterResourceState is used by Autoscaler V2, it should also work when V2 is turned off.

So having V1 autoscaler polling that endpoint is fine.

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding some tests since this PR is not only for KubeRay Autoscaler?

ray_nodes_idle_duration_ms_by_ip = (
self.load_metrics.ray_nodes_idle_duration_ms_by_ip
)
now = time.time()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to reset now or is it OK to use the arg now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can

@@ -238,13 +239,32 @@ def get_latest_readonly_config():
prom_metrics=self.prom_metrics,
)

def get_cluster_resource_state(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a function is only called by other member functions within the same class, we typically prefix the function name with an underscore _.

Suggested change
def get_cluster_resource_state(self):
def _get_cluster_resource_state(self):

@@ -238,13 +239,32 @@ def get_latest_readonly_config():
prom_metrics=self.prom_metrics,
)

def get_cluster_resource_state(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to work around this issue by using the Autoscaler V2 API. Do we still need to define get_cluster_resource_state in this file, or can we directly import it from v2/sdk.py?

)
now = time.time()
last_used = {
ip: now - duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In load_metrics.py, the type hint of idle_duration_ms is int. Can we directly subtract idle_duration_ms from time.time()?

@mimiliaogo
Copy link
Contributor Author

mimiliaogo commented Nov 24, 2024

Test added. For the original version, v1 fails and v2 passes as expected
image

After PR, both v1 and v2 pass.
image

@@ -97,10 +97,12 @@ def update(
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False,
node_last_used_time_s: float = time.time(),
Copy link
Member

@kevin85421 kevin85421 Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the goal of setting this as the default value? It is a bit weird for me. time.time() will be called only once, when the function is defined. For example, the following program prints the same timestamp twice.

import time
def f(t = time.time()):
    print(t)

f()
time.sleep(5)
f()

Maybe use node_last_used_time_s: Optional[float] = None instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

):
self.static_resources_by_ip[ip] = static_resources
self.raylet_id_by_ip[ip] = raylet_id
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected
self.ray_nodes_last_used_time_by_ip[ip] = node_last_used_time_s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases will node_last_used_time_s not be set? If node_last_used_time_s is None, an error may occur when performing time (float) - None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check added on be86afd

@kevin85421
Copy link
Member

By the way, would you mind opening an issue in the KubeRay repo to track the progress of adding an end-to-end test for this PR?

@mimiliaogo
Copy link
Contributor Author

track the progress of adding an end-to-end test

Issue opened: ray-project/kuberay#2568

@@ -97,6 +97,7 @@ def update(
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False,
node_last_used_time_s: Optional[float] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case will this not be set and the default value be used? How about making the field required instead of optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the testing code, manually call load_metrics.update().
It's more convenient to set last_used_time default as now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making the field required instead? The current implementation makes ray_nodes_last_used_time_by_ip have two different definitions.

):
self.last_used_time_by_ip[ip] = now
self.ray_nodes_last_used_time_by_ip[ip] = (
node_last_used_time_s if node_last_used_time_s else now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When checking whether a variable is None, it's better to use is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_last_used_time_s if node_last_used_time_s is not None else now

@@ -318,6 +337,9 @@ def update_load_metrics(self):
infeasible_bundles,
pending_placement_groups,
cluster_full,
time.time()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make it into a single line or two lines?

time.time() - idle_duration_ms / 1000,  # node_last_used_time_s = now - idle_duration

if node_id in ray_nodes_idle_duration_ms_by_id:
idle_duration_ms = ray_nodes_idle_duration_ms_by_id[node_id]
else:
logger.warning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases will this condition occur?

Copy link
Contributor Author

@mimiliaogo mimiliaogo Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, get_all_resource_usage and get_cluster_resource_state should return the same set of nodes. However, since they are implemented in Autoscaler v1 and v2 respectively, I'm uncertain if there may be any discrepancies between them in some special cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to understand how severe the inconsistency is to determine whether this warrants a warning or a panic. @rickyyx Would you mind providing some insights?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both seem to be getting data from GCS, which should be mostly consistent I believe, but yeah, the codepaths that generate the data is different -> i think graceful handling with warnings is fine (they should eventually be consistent)

BTW, is there any reason we don't use mainly the v2's info from get_cluster_resource_state here entirely? I think the only v1 bit of info that's missing in v2 is "cluster_full", which we could probably also add to v2.

I don't have a strong opinion between the below given the source of info is both GCS, and the likelihood of inconsistency is low IMO:

  1. use mostly v1's info, and patch with v2's idle info
  2. use mostly v2's info, and patch with v1's cluster_full or other missing ones in v2.

I think merging this PR to fix the idle issue is fine - and we could follow up to bring v2's RPC in parity with V1 so we could use V2 entirely here. Or if we are pushing V2 really hard, we might just deprecate V1 in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use mostly v1's info, and patch with v2's idle info
  2. use mostly v2's info, and patch with v1's cluster_full or other missing ones in v2.

How about we go with option 1 so that we can reduce the dependency between V1 and V2?

@MortalHappiness MortalHappiness added the go add ONLY when ready to merge, run all tests label Nov 30, 2024
@mimiliaogo mimiliaogo force-pushed the autoscaler-v1-idle-check branch from be86afd to 914f5b5 Compare December 1, 2024 06:55
assert autoscaler.resource_demand_scheduler.node_types["worker"][
"resources"
] == {"CPU": 1}
# def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False):
Copy link
Contributor Author

@mimiliaogo mimiliaogo Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used by the above commented function. So I commented it as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rickyyx, why are testAggressiveAutoscaling and testAggressiveAutoscalingWithForegroundLauncher commented out? Should we remove _aggressiveAutoscalingHelper if it is not used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's commented here: #38459

@rynewang (author) probably has the most context.

Probably irrelevant to this PR, we could leave it as it is, and open another PR to either clean up this deadcode or reenable the test (if possible)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably irrelevant to this PR, we could leave it as it is, and open another PR to either clean up this deadcode or reenable the test (if possible)

In that case, this PR is ready for merge.

@mimiliaogo mimiliaogo force-pushed the autoscaler-v1-idle-check branch from 914f5b5 to cbee98b Compare December 1, 2024 16:12
@mimiliaogo
Copy link
Contributor Author

The general logic for adding node_idle_duration_s in testing code is

  1. if dynamic resource != static resource, node_idle_duration_s = 0
  2. Else, set it as a dummy value, (e.g., 3 sec). As far as I know, the testing codes here are not for idle time-out testing (the time-out config is 5 minutes by default), here we just let the dummy value not cause killing idle nodes as the original behavior.
  3. I found that in test_autoscaling_policy.py, the testing results of Removing nodes are all because of launch failed. Not sure if this is intended or not. Because of that, setting node_idle_duration_s always 0 won't affect the test results.
    Screenshot 2024-12-01 at 10 19 18 AM

I might not be able to understand what every testing function is doing, pls correct me if anything is missing.

@mimiliaogo mimiliaogo requested a review from kevin85421 December 2, 2024 01:53
assert autoscaler.resource_demand_scheduler.node_types["worker"][
"resources"
] == {"CPU": 1}
# def _aggressiveAutoscalingHelper(self, foreground_node_launcher: bool = False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rickyyx, why are testAggressiveAutoscaling and testAggressiveAutoscalingWithForegroundLauncher commented out? Should we remove _aggressiveAutoscalingHelper if it is not used?

@@ -320,6 +320,8 @@ def update_nodes(self):
SMALL_CLUSTER, **{"available_node_types": TYPES_A, "head_node_type": "empty_node"}
)

DUMMY_IDLE_DURATION_S = 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments for DUMMY_IDLE_DURATION_S? For example, explain when DUMMY_IDLE_DURATION_S should be used (e.g., when static resources (total resources) are not equal to dynamic resources (available resources)) and DUMMY_IDLE_DURATION_S should not trigger scale down? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: 1e424ce

@kevin85421 kevin85421 removed the go add ONLY when ready to merge, run all tests label Dec 3, 2024
@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label Dec 3, 2024
@rickyyx rickyyx enabled auto-merge (squash) December 3, 2024 20:41
@rickyyx rickyyx merged commit fc3cfef into ray-project:master Dec 3, 2024
7 checks passed
jecsand838 pushed a commit to jecsand838/ray that referenced this pull request Dec 4, 2024
… idle in autoscaler v1 (ray-project#48519)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

In autoscaler v1, nodes are incorrectly classified as idle based solely
on their resource usage metrics. This misclassification can occur under
the following conditions:
1. Tasks running on the node do not have assigned resources.
2. All tasks on the node are blocked on get or wait operations.

This will lead to the incorrect termination of nodes during downscaling.

To resolve this issue, use the `idle_duration_ms` reported by raylet
instead, which already considers the aforementioned conditions. ref:
ray-project#39582

### Before: NodeDiedError

![image](https://github.com/user-attachments/assets/a126af98-7950-40c4-ad43-2448f4b0d71a)
### After

![image](https://github.com/user-attachments/assets/ae5f6c74-6b7a-4684-a126-66e9a562149c)

### Reproduction Script (on local fake nodes)
- Setting:
  head_nodes: < 10 cpus, worker nodes: 10 cpus
- Code:
  ```
  import ray
  import time
  import os
  import random

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_with_outside():
      print('start inside_ray_task_with_outside')
      sleep_time = 15
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_without_outside():
      print('start inside_ray_task_without_outside task')
      sleep_time = 50
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=0, num_cpus=10)
  def outside_ray_task():
      print('start outside_ray_task task')
      future_list = [inside_ray_task_with_outside.remote(),
                          inside_ray_task_without_outside.remote()]
      ray.get(future_list)

  if __name__ == '__main__':
      ray.init()
      ray.get(outside_ray_task.remote())
  ```

## Related issue number

<!-- For example: "Closes ray-project#1234" -->
Closes ray-project#46492
## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Mimi Liao <[email protected]>
Signed-off-by: Connor Sanders <[email protected]>
@rickyyx
Copy link
Contributor

rickyyx commented Dec 5, 2024

Great work @mimiliaogo! and thanks for the reviews on this too @kevin85421

dentiny pushed a commit to dentiny/ray that referenced this pull request Dec 7, 2024
… idle in autoscaler v1 (ray-project#48519)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

In autoscaler v1, nodes are incorrectly classified as idle based solely
on their resource usage metrics. This misclassification can occur under
the following conditions:
1. Tasks running on the node do not have assigned resources.
2. All tasks on the node are blocked on get or wait operations.


This will lead to the incorrect termination of nodes during downscaling.

To resolve this issue, use the `idle_duration_ms` reported by raylet
instead, which already considers the aforementioned conditions. ref:
ray-project#39582

### Before: NodeDiedError

![image](https://github.com/user-attachments/assets/a126af98-7950-40c4-ad43-2448f4b0d71a)
### After

![image](https://github.com/user-attachments/assets/ae5f6c74-6b7a-4684-a126-66e9a562149c)

### Reproduction Script (on local fake nodes)
- Setting: 
  head_nodes: < 10 cpus, worker nodes: 10 cpus
- Code: 
  ```
  import ray
  import time
  import os
  import random
  
  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_with_outside():
      print('start inside_ray_task_with_outside')
      sleep_time = 15 
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break
  
  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_without_outside():
      print('start inside_ray_task_without_outside task')
      sleep_time = 50 
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break
  
  @ray.remote(max_retries=0, num_cpus=10)
  def outside_ray_task():
      print('start outside_ray_task task')
      future_list = [inside_ray_task_with_outside.remote(), 
                          inside_ray_task_without_outside.remote()]
      ray.get(future_list)
  
  
  if __name__ == '__main__':
      ray.init()
      ray.get(outside_ray_task.remote())
  ```



## Related issue number

<!-- For example: "Closes ray-project#1234" -->
Closes ray-project#46492 
## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Mimi Liao <[email protected]>
Signed-off-by: hjiang <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Dec 17, 2024
… idle in autoscaler v1 (ray-project#48519)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

In autoscaler v1, nodes are incorrectly classified as idle based solely
on their resource usage metrics. This misclassification can occur under
the following conditions:
1. Tasks running on the node do not have assigned resources.
2. All tasks on the node are blocked on get or wait operations.

This will lead to the incorrect termination of nodes during downscaling.

To resolve this issue, use the `idle_duration_ms` reported by raylet
instead, which already considers the aforementioned conditions. ref:
ray-project#39582

### Before: NodeDiedError

![image](https://github.com/user-attachments/assets/a126af98-7950-40c4-ad43-2448f4b0d71a)
### After

![image](https://github.com/user-attachments/assets/ae5f6c74-6b7a-4684-a126-66e9a562149c)

### Reproduction Script (on local fake nodes)
- Setting:
  head_nodes: < 10 cpus, worker nodes: 10 cpus
- Code:
  ```
  import ray
  import time
  import os
  import random

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_with_outside():
      print('start inside_ray_task_with_outside')
      sleep_time = 15
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=5, num_cpus=10)
  def inside_ray_task_without_outside():
      print('start inside_ray_task_without_outside task')
      sleep_time = 50
      start_time = time.perf_counter()
      while True:
          if(time.perf_counter() - start_time < sleep_time):
              time.sleep(0.001)
          else:
              break

  @ray.remote(max_retries=0, num_cpus=10)
  def outside_ray_task():
      print('start outside_ray_task task')
      future_list = [inside_ray_task_with_outside.remote(),
                          inside_ray_task_without_outside.remote()]
      ray.get(future_list)

  if __name__ == '__main__':
      ray.init()
      ray.get(outside_ray_task.remote())
  ```

## Related issue number

<!-- For example: "Closes ray-project#1234" -->
Closes ray-project#46492
## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Mimi Liao <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Ray Autoscaler] Autoscaler kills working nodes unexpectedly
4 participants