8000 [Data] Fix ActorPool autoscaler to properly scale up by alexeykudinkin · Pull Request #53983 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Data] Fix ActorPool autoscaler to properly scale up #53983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Jun 26, 2025

Conversation

alexeykudinkin
Copy link
Contributor
@alexeykudinkin alexeykudinkin commented Jun 20, 2025

Why are these changes needed?

Currently, ActorPool autoscaling actually isn't scaling up until all of its num_actors * max_tasks_in_flight slots are full. This is a seriously limiting factor:

  • It requires AP to accumulate backlog of 4x its # of actors before autoscaling even starts
  • 3 out of 4 tasks are sitting in the queue, meaning their execution won't start until previous ones complete

Changes

  1. Revisiting Actor Pool autoscaling protocol to be based on utilization defined as # of submitted tasks / number of running actors * max_concurrency
  2. Added AutoscalingConfig to make all configuration explicitly available inside DataContext
  3. Setting default upscaling threshold to 200% (currently is set to 400%)
  4. Updated tests
  5. Added docs

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner June 20, 2025 23:33
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jun 24, 2025
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner June 25, 2025 02:31
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Ignore free slots when autoscaling

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Copy link
Contributor
@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM

from typing import Optional

from ray.data._internal.execution.interfaces.execution_options import ExecutionResources
from ray.util.annotations import DeveloperAPI


@dataclass
class ScalingConfig:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly prefer the old name ScalingAction.
also consider naming it ActorPoolScalingAction to make it more explicit. And we may also add a ClusterScalingAction.

Copy link
Contributor Author
@alexeykudinkin alexeykudinkin Jun 26, 2025
8000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i've realized that this is rather an intent/request, renamed to APAutoscalingRequest

def current_in_flight_tasks(self) -> int:
"""Number of current in-flight tasks."""
def num_tasks_in_flight(self) -> int:
"""Number of current in-flight tasks (running + pending tasks)."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Number of current in-flight tasks (running + pending tasks)."""
"""Number of current in-flight tasks (tasks that have been submitted to the actor pool)."""

# running task we'd allow 1 more task to be enqueued
compute_strategy.max_tasks_in_flight_per_actor
or data_context.max_tasks_in_flight_per_actor
or max_actor_concurrency * 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, define a constant for the magic number 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Captured it in the comment above, not really sure what value the constant will add

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to defining constant. It's easier to understand if you're skimming the code and not reading all of the comments

config.delta, max(self.max_size() - self.current_size(), 0)
)

logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be too verbose. let's use a debug. same for the scale down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

# Make sure after scaling down actor pool size won't fall below its
# min size
target_num_actors = min(
abs(config.delta), max(self.current_size() - self.min_size(), 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allow the autoscaling policy to down scale below the min. I.E., when inputs are complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I was actually thinking to just shutdown the actor pool once processing is done, but now that i'm thinking about it, it's better to incrementally release workers as soon as they could be released.

@@ -338,6 +380,11 @@ class DataContext:
call is made with a S3 URI.
wait_for_min_actors_s: The default time to wait for minimum requested
actors to start before raising a timeout, in seconds.
max_tasks_in_flight_per_actor: Max number of tasks that could be enqueued
into the individual Actor's queue. Note that running tasks are not counted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
into the individual Actor's queue. Note that running tasks are not counted
into the individual Actor's local queue. Note that running tasks are not counted

@@ -338,6 +380,11 @@ class DataContext:
call is made with a S3 URI.
wait_for_min_actors_s: The default time to wait for minimum requested
actors to start before raising a timeout, in seconds.
max_tasks_in_flight_per_actor: Max number of tasks that could be enqueued
into the individual Actor's queue. Note that running tasks are not counted
towards this limit. This setting allows Actors to start pulling and buffering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that running tasks are not counted towards this limit.
This is total number. So running tasks are also counted. right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is remnant of when this parameter was not overlapping with max_concurrency. Will update

into the individual Actor's queue. Note that running tasks are not counted
towards this limit. This setting allows Actors to start pulling and buffering
blocks for the tasks waiting in the queue to make sure tasks could start
executing immediately once taken from the queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a little bit hard to understand for regular users.
We can just say this setting allows Actors to pipeline task execution with block transfer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me take a stab at rephrasing

 - Make validation whether request can be applied private (removed dup)
 - Log debugging warnings internally

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…aled down to 0 (upon completion)

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

return num_actors
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, why return None instead of 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just more clearly signal that there's no action taken (rather than attempted and failed for ex in case of downscaling)

DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD: float = env_float(
"RAY_DATA_DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD",
0.5,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be too low. we can keep an eye and revisit it later.

Copy link
Member
@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@alexeykudinkin I'm running the release tests here: https://buildkite.com/ray-project/release/builds/46720#.

Would you mind skimming it and making sure there's no major regression before merging?


return num_actors
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return 0? Not obvious when this method returns 0 vs None

Suggested change
return None
return 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe document the expected return value here or in the base value? I think it's implicit right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered this comment from Hao above (None to signal no action taken)

Comment on lines +106 to 108
def get_pool_util(self) -> float:
"""Calculate the utilization of the given actor pool."""
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the responsibility of the autoscaler? You can compute this just using public actor pool methods, and I think the actor pool interface is already really bloated

Copy link
Contributor Author

Choo D3FE se a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i went back and forth on this. Ultimately decided that the scope of AP should include defining what util actually means (in terms of # of actors used, concurrency slots or what not) as these are intrinsic actor details. Autoscaler on the other hand operates at a higher level (if util > threshold then upscale for ex)

# running task we'd allow 1 more task to be enqueued
compute_strategy.max_tasks_in_flight_per_actor
or data_context.max_tasks_in_flight_per_actor
or max_actor_concurrency * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to defining constant. It's easier to understand if you're skimming the code and not reading all of the comments

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin merged commit 5525301 into ray-project:master Jun 26, 2025
4 of 5 checks passed
minerharry pushed a commit to minerharry/ray that referenced this pull request Jun 27, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Currently, ActorPool autoscaling actually isn't scaling up until all of
its `num_actors * max_tasks_in_flight` slots are full. This is a
seriously limiting factor:

- It requires AP to accumulate backlog of 4x its # of actors before
autoscaling even starts
- 3 out of 4 tasks are sitting in the queue, meaning their execution
won't start until previous ones complete

Changes
---

1. Revisiting Actor Pool autoscaling protocol to be based on utilization
defined as `# of submitted tasks / number of running actors *
max_concurrency`
2. Added `AutoscalingConfig` to make all configuration explicitly
available inside `DataContext`
3. Setting default upscaling threshold to 200% (cur
850F
rently is set to
400%)
4. Updated tests
5. Added docs


## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants
0