-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[core] Recover intermediate objects if needed while generator running #53999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
if (!status.ok() || | ||
(!reply.attempt_succeeded() && reply.requested_task_running())) { | ||
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status " | ||
<< status.ToString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actor task cancellation is just not in a great spot, so proper cancellation failure handling is generally rough rn
if (cancel_retry_timer->expiry().time_since_epoch() <= | ||
std::chrono::high_resolution_clock::now().time_since_epoch()) { | ||
cancel_retry_timer->expires_after(boost::asio::chrono::milliseconds( | ||
RayConfig::instance().cancellation_retry_ms())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this shouldn't be a config variable and should be exponential? but that's a separate change to existing behavior
/* include_task_info */ true, | ||
task_entry.spec.AttemptNumber() + 1); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just inlined these two. They were only called in one spot each and are relatively small. Inlining makes it easier to reason about the whole thing imo.
Note: still need unit tests but want a green light on logic. |
Defer to @israbbani for first pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall approach looks reasonable AFAICS. I think we need another pair of eyes from @jjyao.
Does Actor Task Cancelation, is it best effort since we don't retry on failures? If so, how do guarantee there will not be deadlock?
src/ray/core_worker/task_manager.cc
Outdated
// Resubmit was queued up. | ||
if (still_executing) { | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we've already resubmitted the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No still_executing
means we are now cancelling and resubmitting because the task is still on the executing worker. If still_executing
is false it means the task completed the time we set generator_to_queue_for_resubmit
so we should resubmit now instead of cancelling and resubmitting.
I'll make the variable names more clear and add a comment.
src/ray/core_worker/task_manager.cc
Outdated
// We should actually detect if the actor for this task is dead, but let's just assume | ||
// it's not for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be in here? Should it at least be a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i didn't write this, it was here before. I think this isn't a problem today though, if the actor is dead dead and we try resubmitting we'll just fail the task and fail the corresponding object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirmed we just fail the task and mark the task object as failed if we try submitting and the actor is dead dead.
The only issue is that the other objects needed will still be reconstructed. They'll still be released though because task manager fail is still called.
So removing the comment.
[task_id = spec.TaskId()](const Status &status, const rpc::CancelTaskReply &reply) { | ||
if (!status.ok() || | ||
(!reply.attempt_succeeded() && reply.requested_task_running())) { | ||
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we'll retry in HandlePushTaskReply if necessary. Does this mean cancelation is best-effort? If so, we can't guarantee that there won't be deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya no guarantee rn. We can guarantee no deadlock with the follow-up, so if the executor side received the cancel and if we ever get blocked due to backpressure we cancel there.
RAY_LOG(INFO) << "Failed to cancel generator " << task_id << " with status " | ||
<< status.ToString(); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as the Actor case. Is cancelation best effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More effort than single-threaded actor case. We'll call kill_main_task
to interrupt the thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note this is still best-effort, users can intentionally or unintentionally catch the sigint and ignore it
single-threaded actor task cancelation is v low effort, if it's already running it won't do anything. We can't guarantee no deadlock until I make the follow up fix to propagate cancellation to the backpressure waiter. |
ping me for review once @israbbani's comments are addressed |
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
addressed / responded to comments |
Meta note before I forget: all three documented follow ups here are critical. @dayshah please make sure we file and follow up on all of them (rest of the team can help take some work). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding correctly, the current implementation can cancel the task and resubmits the generator concurrently. This means there can be two invocations of the same streaming generator running at the same time (the one we're cancelling and the one we're resubmitting).
We document actor methods as having "at least once" semantics from the user's perspective, and I believe this can already happen under RPC failure conditions, so it isn't new but worth thinking about. For example, do we gracefully handle the case where the cancelled generator reports its results after the new generator has begun running and reported results earlier in the stream?
for i in range(3): | ||
yield np.zeros(10 * 1024 * 1024, dtype=np.uint8) | ||
ray.get(signal_actor.wait.remote()) | ||
time.sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we SIGINT
the task in the case of cancellation? If that's the case, we can catch the SIGINT
and explicitly record it (e.g., ping a signal actor). That would be much more obvious/self-documenting test behavior.
# Recovery periodical runner runs every 100ms | ||
time.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what exactly are we waiting for here and why?
CancelGenerator(cancel_retry_timer_, client, spec.TaskId(), spec.CallerWorkerId()); | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we call CancelGenerator
outside of the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to call inside mutex, expected everything left to be thread safe but the timer actually isn't. So putting the timer behind an absl guard too, so can't make this mistake in the future.
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
It waits for the PushTask RPC to come back before resubmitting. If the PushTask RPC fails due to network failure, we'll go to the raylet to ask what's happening with the worker, and still go down that path even with this change. So it should never be possible for two of the same to be running at the same time, that can lead to a host of other problems.
This is strictly an RPC ordering question right, the ReportGeneratorItem comes in after the PushTaskReply is handled(resubmit happens)? I'll look into this, but I'm assuming this is handled somehow, streaming generators are used p heavily, if just ordering being off = bug I'd assume we'd have seen it... |
Problem
Consider this sequence of events:
For concrete examples, see the Python tests added here. They would hang forever without this fix.
Solution
The solution is to cancel the running generator and resubmit it. We have to cancel and can't just wait for regular completion to resubmit because deadlock is possible due to generator backpressure, e.g. if calling
ray.get(next(generator_ref))
is dependent on the completion of another task that needs the previous output of the generator. Note: cancelling in backpressure waiter is a follow-up.Now if we find out that a streaming generator output is lost and we can't pin a secondary location, this is what will happen based on the current generator task status:
Follow ups
We own the backpressure waiter, it's not user-defined code. If the generator is blocked on the executor due to backpressure, we can signal it when we get a cancel request to make it exit out there. This ensures deadlock is impossible.
CancelTask currently doesn't handle transient network failures. This is a change with a larger scope than this.
Currently, if multiple objects from the same generator are queued up to be recovered when the recovery periodical runner runs, we could resubmit for the first object and then cancel and resubmit for the second if argument resolution and sequence numbering lines up. Since this doesn't actually affect correctness and requires a bit of refactoring, it'll be in a follow-up PR.