8000 [core] Fix GCS subscribers map race condition by dayshah · Pull Request #53781 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[core] Fix GCS subscribers map race condition #53781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ void GcsServer::InstallEventListeners() {
gcs_job_manager_->OnNodeDead(node_id);
raylet_client_pool_->Disconnect(node_id);
gcs_healthcheck_manager_->RemoveNode(node_id);
pubsub_handler_->RemoveSubscriberFrom(node_id.Binary());
pubsub_handler_->AsyncRemoveSubscriberFrom(node_id.Binary());
gcs_autoscaler_state_manager_->OnNodeDead(node_id);
});

Expand All @@ -782,7 +782,7 @@ void GcsServer::InstallEventListeners() {
worker_failure_data->exit_detail(),
creation_task_exception);
gcs_placement_group_scheduler_->HandleWaitingRemovedBundles();
pubsub_handler_->RemoveSubscriberFrom(worker_id.Binary());
pubsub_handler_->AsyncRemoveSubscriberFrom(worker_id.Binary());
gcs_task_manager_->OnWorkerDead(worker_id, worker_failure_data);
});

Expand Down
22 changes: 13 additions & 9 deletions src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,19 @@ void InternalPubSubHandler::HandleGcsUnregisterSubscriber(
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) {
auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
return;
}
for (auto &subscriber_id : iter->second) {
gcs_publisher_.GetPublisher().UnregisterSubscriber(subscriber_id);
}
sender_to_subscribers_.erase(iter);
void InternalPubSubHandler::AsyncRemoveSubscriberFrom(const std::string &sender_id) {
io_service_.post(
[this, sender_id]() {
auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
return;
}
for (auto &subscriber_id : iter->second) {
gcs_publisher_.GetPublisher().UnregisterSubscriber(subscriber_id);
}
sender_to_subscribers_.erase(iter);
},
"RemoveSubscriberFrom");
}

} // namespace gcs
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/pubsub_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class InternalPubSubHandler : public rpc::InternalPubSubHandler {
rpc::GcsUnregisterSubscriberReply *reply,
rpc::SendReplyCallback send_reply_callback) final;

std::string DebugString() const;

void RemoveSubscriberFrom(const std::string &sender_id);
/// This function is only for external callers. Internally, can just erase from
/// sender_to_subscribers_ and everything should be on the Publisher's io_service_.
void AsyncRemoveSubscriberFrom(const std::string &sender_id);

private:
/// Not owning the io service, to allow sharing it with pubsub::Publisher.
Expand Down
0