8000 [core] Recover intermediate objects if needed while generator running by dayshah · Pull Request #53999 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[core] Recover intermediate objects if needed while generator running #53999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

dayshah
Copy link
Contributor
@dayshah dayshah commented Jun 22, 2025

Problem

Consider this sequence of events:

  1. Node goes down and an object is lost. The reconstruction of this object depends on a streaming generator output. The streaming generator output was used and destroyed because the (non-lineage) ref count dropped to 0. The streaming generator is still running to produce more objects.
  2. We now try to recover the streaming generator output by telling the task manager to resubmit the task.
  3. The task manager will see that the task is not finished / failed and will assume that the object is currently being recovered.
  4. The streaming generator will finish and never be rerun. The object will never be recovered and the retry that depended on that object will never move past PENDING_ARGS_AVAILABLE.

For concrete examples, see the Python tests added here. They would hang forever without this fix.

Solution

The solution is to cancel the running generator and resubmit it. We have to cancel and can't just wait for regular completion to resubmit because deadlock is possible due to generator backpressure, e.g. if calling ray.get(next(generator_ref)) is dependent on the completion of another task that needs the previous output of the generator. Note: cancelling in backpressure waiter is a follow-up.

Now if we find out that a streaming generator output is lost and we can't pin a secondary location, this is what will happen based on the current generator task status:

  1. If the task has been pushed to the worker and not completed → Cancel and Resubmit
  2. If the task is done (finished/failed) → Resubmit the task
  3. If the task is submitted, but hasn’t been pushed to the worker → Do nothing
  4. If the task is already in the submitter’s generators_to_resubmit_ map → Do nothing

Follow ups

We own the backpressure waiter, it's not user-defined code. If the generator is blocked on the executor due to backpressure, we can signal it when we get a cancel request to make it exit out there. This ensures deadlock is impossible.

CancelTask currently doesn't handle transient network failures. This is a change with a larger scope than this.

Currently, if multiple objects from the same generator are queued up to be recovered when the recovery periodical runner runs, we could resubmit for the first object and then cancel and resubmit for the second if argument resolution and sequence numbering lines up. Since this doesn't actually affect correctness and requires a bit of refactoring, it'll be in a follow-up PR.

dayshah added 3 commits June 22, 2025 15:27
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah dayshah added the go add ONLY when ready to merge, run all tests label Jun 23, 2025
if (!status.ok() ||
(!reply.attempt_succeeded() && reply.requested_task_running())) {
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status "
<< status.ToString();
Copy link
Contributor Author
@dayshah dayshah Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actor task cancellation is just not in a great spot, so proper cancellation failure handling is generally rough rn

if (cancel_retry_timer->expiry().time_since_epoch() <=
std::chrono::high_resolution_clock::now().time_since_epoch()) {
cancel_retry_timer->expires_after(boost::asio::chrono::milliseconds(
RayConfig::instance().cancellation_retry_ms()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this shouldn't be a config variable and should be exponential? but that's a separate change to existing behavior

/* include_task_info */ true,
task_entry.spec.AttemptNumber() + 1);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just inlined these two. They were only called in one spot each and are relatively small. Inlining makes it easier to reason about the whole thing imo.

@dayshah dayshah marked this pull request as ready for review June 23, 2025 05:53
@dayshah dayshah requested review from pcmoritz, raulchen and a team as code owners June 23, 2025 05:53
@dayshah
Copy link
Contributor Author
dayshah commented Jun 23, 2025

Note: still need unit tests but want a green light on logic.

@edoakes
Copy link
Collaborator
edoakes commented Jun 23, 2025

Defer to @israbbani for first pass

Copy link
Contributor
@israbbani israbbani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall approach looks reasonable AFAICS. I think we need another pair of eyes from @jjyao.

Does Actor Task Cancelation, is it best effort since we don't retry on failures? If so, how do guarantee there will not be deadlock?

Comment on lines 357 to 360
// Resubmit was queued up.
if (still_executing) {
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we've already resubmitted the task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No still_executing means we are now cancelling and resubmitting because the task is still on the executing worker. If still_executing is false it means the task completed the time we set generator_to_queue_for_resubmit so we should resubmit now instead of cancelling and resubmitting.

I'll make the variable names more clear and add a comment.

Comment on lines 379 to 380
// We should actually detect if the actor for this task is dead, but let's just assume
// it's not for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be in here? Should it at least be a TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't write this, it was here before. I think this isn't a problem today though, if the actor is dead dead and we try resubmitting we'll just fail the task and fail the corresponding object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed we just fail the task and mark the task object as failed if we try submitting and the actor is dead dead.

The only issue is that the other objects needed will still be reconstructed. They'll still be released though because task manager fail is still called.

So removing the comment.

[task_id = spec.TaskId()](const Status &status, const rpc::CancelTaskReply &reply) {
if (!status.ok() ||
(!reply.attempt_succeeded() && reply.requested_task_running())) {
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we'll retry in HandlePushTaskReply if necessary. Does this mean cancelation is best-effort? If so, we can't guarantee that there won't be deadlock.

Copy link
Contributor Author
@dayshah dayshah Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya no guarantee rn. We can guarantee no deadlock with the follow-up, so if the executor side received the cancel and if we ever get blocked due to backpressure we cancel there.

Comment on lines +847 to +849
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status "
<< status.ToString();
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as the Actor case. Is cancelation best effort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More effort than single-threaded actor case. We'll call kill_main_task to interrupt the thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this is still best-effort, users can intentionally or unintentionally catch the sigint and ignore it

@dayshah
Copy link
Contributor Author
dayshah commented Jun 24, 2025

The overall approach looks reasonable AFAICS. I think we need another pair of eyes from @jjyao.

Does Actor Task Cancelation, is it best effort since we don't retry on failures? If so, how do guarantee there will not be deadlock?

single-threaded actor task cancelation is v low effort, if it's already running it won't do anything. We can't guarantee no deadlock until I make the follow up fix to propagate cancellation to the backpressure waiter.

@edoakes
Copy link
Collaborator
edoakes commented Jun 24, 2025

ping me for review once @israbbani's comments are addressed

dayshah added 4 commits June 25, 2025 13:18
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah
Copy link
Contributor Author
dayshah commented Jun 25, 2025

ping me for review once @israbbani's comments are addressed

addressed / responded to comments

@dayshah dayshah requested a review from israbbani June 25, 2025 21:08
@edoakes
Copy link
Collaborator
edoakes commented Jun 26, 2025

Meta note before I forget: all three documented follow ups here are critical. @dayshah please make sure we file and follow up on all of them (rest of the team can help take some work).

Copy link
Collaborator
@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, the current implementation can cancel the task and resubmits the generator concurrently. This means there can be two invocations of the same streaming generator running at the same time (the one we're cancelling and the one we're resubmitting).

We document actor methods as having "at least once" semantics from the user's perspective, and I believe this can already happen under RPC failure conditions, so it isn't new but worth thinking about. For example, do we gracefully handle the case where the cancelled generator reports its results after the new generator has begun running and reported results earlier in the stream?

for i in range(3):
yield np.zeros(10 * 1024 * 1024, dtype=np.uint8)
ray.get(signal_actor.wait.remote())
time.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we SIGINT the task in the case of cancellation? If that's the case, we can catch the SIGINT and explicitly record it (e.g., ping a signal actor). That would be much more obvious/self-documenting test behavior.

Comment on lines 157 to 158
# Recovery periodical runner runs every 100ms
time.sleep(0.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly are we waiting for here and why?

Comment on lines 888 to 889
CancelGenerator(cancel_retry_timer_, client, spec.TaskId(), spec.CallerWorkerId());
return true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we call CancelGenerator outside of the mutex?

Copy link
Contributor Author
@dayshah dayshah Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to call inside mutex, expected everything left to be thread safe but the timer actually isn't. So putting the timer behind an absl guard too, so can't make this mistake in the future.

dayshah added 2 commits June 26, 2025 15:27
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah
Copy link
Contributor Author
dayshah commented Jun 27, 2025

If I'm understanding correctly, the current implementation can cancel the task and resubmits the generator concurrently. This means there can be two invocations of the same streaming generator running at the same time (the one we're cancelling and the one we're resubmitting).

It waits for the PushTask RPC to come back before resubmitting. If the PushTask RPC fails due to network failure, we'll go to the raylet to ask what's happening with the worker, and still go down that path even with this change. So it should never be possible for two of the same to be running at the same time, that can lead to a host of other problems.

For example, do we gracefully handle the case where the cancelled generator reports its results after the new generator has begun running and reported results earlier in the stream?

This is strictly an RPC ordering question right, the ReportGeneratorItem comes in after the PushTaskReply is handled(resubmit happens)? I'll look into this, but I'm assuming this is handled somehow, streaming generators are used p heavily, if just ordering being off = bug I'd assume we'd have seen it...

@dayshah dayshah requested a review from edoakes June 27, 2025 04:51
Signed-off-by: dayshah <dhyey2019@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0