-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[Data] Fix ActorPool autoscaler to properly scale up #53983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Fix ActorPool autoscaler to properly scale up #53983
Conversation
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Ignore free slots when autoscaling Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM
from typing import Optional | ||
|
||
from ray.data._internal.execution.interfaces.execution_options import ExecutionResources | ||
from ray.util.annotations import DeveloperAPI | ||
|
||
|
||
@dataclass | ||
class ScalingConfig: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I slightly prefer the old name ScalingAction
.
also consider naming it ActorPoolScalingAction
to make it more explicit. And we may also add a ClusterScalingAction
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, i've realized that this is rather an intent/request, renamed to APAutoscalingRequest
def current_in_flight_tasks(self) -> int: | ||
"""Number of current in-flight tasks.""" | ||
def num_tasks_in_flight(self) -> int: | ||
"""Number of current in-flight tasks (running + pending tasks).""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Number of current in-flight tasks (running + pending tasks).""" | |
"""Number of current in-flight tasks (tasks that have been submitted to the actor pool).""" |
# running task we'd allow 1 more task to be enqueued | ||
compute_strategy.max_tasks_in_flight_per_actor | ||
or data_context.max_tasks_in_flight_per_actor | ||
or max_actor_concurrency * 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, define a constant for the magic number 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Captured it in the comment above, not really sure what value the constant will add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to defining constant. It's easier to understand if you're skimming the code and not reading all of the comments
config.delta, max(self.max_size() - self.current_size(), 0) | ||
) | ||
|
||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be too verbose. let's use a debug. same for the scale down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG
# Make sure after scaling down actor pool size won't fall below its | ||
# min size | ||
target_num_actors = min( | ||
abs(config.delta), max(self.current_size() - self.min_size(), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can allow the autoscaling policy to down scale below the min. I.E., when inputs are complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. I was actually thinking to just shutdown the actor pool once processing is done, but now that i'm thinking about it, it's better to incrementally release workers as soon as they could be released.
python/ray/data/context.py
Outdated
@@ -338,6 +380,11 @@ class DataContext: | |||
call is made with a S3 URI. | |||
wait_for_min_actors_s: The default time to wait for minimum requested | |||
actors to start before raising a timeout, in seconds. | |||
max_tasks_in_flight_per_actor: Max number of tasks that could be enqueued | |||
into the individual Actor's queue. Note that running tasks are not counted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
into the individual Actor's queue. Note that running tasks are not counted | |
into the individual Actor's local queue. Note that running tasks are not counted |
python/ray/data/context.py
Outdated
@@ -338,6 +380,11 @@ class DataContext: | |||
call is made with a S3 URI. | |||
wait_for_min_actors_s: The default time to wait for minimum requested | |||
actors to start before raising a timeout, in seconds. | |||
max_tasks_in_flight_per_actor: Max number of tasks that could be enqueued | |||
into the individual Actor's queue. Note that running tasks are not counted | |||
towards this limit. This setting allows Actors to start pulling and buffering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that running tasks are not counted towards this limit.
This is total number. So running tasks are also counted. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is remnant of when this parameter was not overlapping with max_concurrency. Will update
python/ray/data/context.py
Outdated
into the individual Actor's queue. Note that running tasks are not counted | ||
towards this limit. This setting allows Actors to start pulling and buffering | ||
blocks for the tasks waiting in the queue to make sure tasks could start | ||
executing immediately once taken from the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a little bit hard to understand for regular users.
We can just say this setting allows Actors to pipeline task execution with block transfer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me take a stab at rephrasing
- Make validation whether request can be applied private (removed dup) - Log debugging warnings internally Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
82d11d2
to
5e9368e
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…aled down to 0 (upon completion) Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
0945db5
to
0211656
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
a83df2e
to
ceef8d5
Compare
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
|
||
return num_actors | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, why return None instead of 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just more clearly signal that there's no action taken (rather than attempted and failed for ex in case of downscaling)
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD: float = env_float( | ||
"RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD", | ||
0.5, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be too low. we can keep an eye and revisit it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@alexeykudinkin I'm running the release tests here: https://buildkite.com/ray-project/release/builds/46720#.
Would you mind skimming it and making sure there's no major regression before merging?
python/ray/data/_internal/execution/autoscaler/autoscaling_actor_pool.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/autoscaler/autoscaling_actor_pool.py
Outdated
Show resolved
Hide resolved
|
||
return num_actors | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return 0? Not obvious when this method returns 0 vs None
return None | |
return 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, maybe document the expected return value here or in the base value? I think it's implicit right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered this comment from Hao above (None to signal no action taken)
def get_pool_util(self) -> float: | ||
"""Calculate the utilization of the given actor pool.""" | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be the responsibility of the autoscaler? You can compute this just using public actor pool methods, and I think the actor pool interface is already really bloated
There was a problem hiding this comment.
Choo D3FE se a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, i went back and forth on this. Ultimately decided that the scope of AP should include defining what util actually means (in terms of # of actors used, concurrency slots or what not) as these are intrinsic actor details. Autoscaler on the other hand operates at a higher level (if util > threshold then upscale for ex)
# running task we'd allow 1 more task to be enqueued | ||
compute_strategy.max_tasks_in_flight_per_actor | ||
or data_context.max_tasks_in_flight_per_actor | ||
or max_actor_concurrency * 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to defining constant. It's easier to understand if you're skimming the code and not reading all of the comments
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Currently, ActorPool autoscaling actually isn't scaling up until all of its `num_actors * max_tasks_in_flight` slots are full. This is a seriously limiting factor: - It requires AP to accumulate backlog of 4x its # of actors before autoscaling even starts - 3 out of 4 tasks are sitting in the queue, meaning their execution won't start until previous ones complete Changes --- 1. Revisiting Actor Pool autoscaling protocol to be based on utilization defined as `# of submitted tasks / number of running actors * max_concurrency` 2. Added `AutoscalingConfig` to make all configuration explicitly available inside `DataContext` 3. Setting default upscaling threshold to 200% (cur 850F rently is set to 400%) 4. Updated tests 5. Added docs ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Why are these changes needed?
Currently, ActorPool autoscaling actually isn't scaling up until all of its
num_actors * max_tasks_in_flight
slots are full. This is a seriously limiting factor:Changes
# of submitted tasks / number of running actors * max_concurrency
AutoscalingConfig
to make all configuration explicitly available insideDataContext
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.