-
Notifications
You must be signed in to change notification settings - Fork 423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayJob] implement deletion policy API #2643
base: master
Are you sure you want to change the base?
Conversation
08bdbfb
to
b2d43be
Compare
b2d43be
to
472b4c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you resolve the conflicts? Also, should we add some tests for this feature?
472b4c2
to
0db79c3
Compare
Fixed conflicts, will add tests tomorrow |
30adbd6
to
33747ec
Compare
Added unit tests, going to skip e2e tests for now since it's currently not trivial to enable feature gates in the e2e tests |
Signed-off-by: Andrew Sy Kim <[email protected]>
33747ec
to
f08fe12
Compare
f08fe12
to
fed8484
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
fed8484
to
5637ea0
Compare
rayJobInstance.Spec.DeletionPolicy != nil && | ||
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy && | ||
len(rayJobInstance.Spec.ClusterSelector) == 0 { | ||
logger.Info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
to the above of if features.Enabled(features.RayJobDeletionPolicy) &&
and remove the similar logics from L391 to L403.
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to return bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this is an oversight
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function may not work when autoscaling is enabled. If autoscaling is enabled, Pod deletion is always determined by the Ray Autoscaler. KubeRay will not delete any Pods, even if the number of Pods exceeds the goal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, but autoscaling with RayJob is pretty uncommon though right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to fix this is to also set max replicas to 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but autoscaling with RayJob is pretty uncommon though right?
I checked with my colleagues, and this may be incorrect. Autoscaling is not very common for Ray Train. However, it is commonly used for Ray Data, Ray Tune, and RLlib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most Ray Data users use autoscaling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean the Ray API, I mean autoscaling is not common when using the RayJob custom resource. I am sure Ray Data with RayCluster + autoscaling is very common
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, a K8s controller should only write to the CR status and treat the CR spec as read-only, but implementing this feature without writing to the CR spec is challenging for us.
Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool
), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.
This way, the RayJob controller doesn't need to modify replicas and minReplicas, which can also be modified by the Ray Autoscaler or users. Allowing multiple stakeholders to modify a field is typically the root cause of KubeRay's instability issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Controllers writing to spec is not necessarily bad, but I see what you mean. I think it would be wrong to write to RayJob spec from RayJob controller, but in this case we're writing to RayCluster spec from RayJob controller. I feel that updating replcias, minReplicas and maxReplicas for ephemeral RayCluster specifcally is actually fine because we don't care about the RayCluster spec once the cluster is deleted.
Will think about this more and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main concern is that multiple personas can modify these fields, such as users, the Autoscaler, and the RayJob controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.
@kevin85421 how about a suspend
field per worker group in WorkerGroupSpec? This allows for granularity of suspension per worker, and from RayJob we can just set suspend: true
for all worker groups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the draft PR #2663
Let me know what you think, I will clean up the PR and add tests if the API looks good to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API looks good to me.
nowTime := time.Now() | ||
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second) | ||
|
||
if features.Enabled(features.RayJobDeletionPolicy) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update validateRayJobSpec
to ensure that the combination of ShutdownAfterJobFinishes: true
and rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy
is invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can split this issue into 3 PRs if the comment makes sense to you?
- Add a new field and feature flag in RayJob.
- Add a new field in RayCluster CRD to terminate all worker Pods.
- Implement the deletion policy API based on (2)
Signed-off-by: Andrew Sy Kim <[email protected]>
5637ea0
to
ff2deca
Compare
Why are these changes needed?
Implement RayJob DeletionPolicy API
Related issue number
#2615
Checks