-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[core] Use core worker client pool in GCS #53654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: dayshah <dhyey2019@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the GCS components to replace the per‐call core worker client factory with a shared core worker client pool, enabling the use of retryable gRPC clients for interactions between GCS and workers. Key changes include:
- Replacing all client_factory usages with a shared worker_client_pool across tests and production code.
- Updating GCS server components (job manager, actor scheduler, actor manager, etc.) to use the new worker_client_pool API.
- Adjusting test cases in various modules to work with the shared pool.
Reviewed Changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc | Updated to use worker_client_pool_ instead of client_factory. |
src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc | Modified to instantiate and use worker_client_pool_ accordingly. |
src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc | Refactored to inject worker_client_pool_ in place of direct client creation. |
src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc | Updated to use worker_client_pool_ for actor manager tests. |
src/ray/gcs/gcs_server/test/export_api/gcs_actor_manager_export_event_test.cc | Modified to use worker_client_pool_ in export event tests. |
src/ray/gcs/gcs_server/gcs_worker_manager.cc | Replaced uses of a temporary client_factory with appropriate worker_client_pool_ calls. |
src/ray/gcs/gcs_server/gcs_server.h | Added include and member for the core worker client pool. |
src/ray/gcs/gcs_server/gcs_server.cc | Refactored worker pool initialization and its usage in job and actor managers. |
src/ray/gcs/gcs_server/gcs_job_manager.h & .cc | Updated constructor and calls to use the worker_client_pool. |
src/ray/gcs/gcs_server/gcs_actor_scheduler.h & .cc | Replaced core_worker_client factory usage with the shared worker_client_pool_. |
src/ray/gcs/gcs_server/gcs_actor_manager.h & .cc | Revised parameters and calls to use worker_client_pool_ for actor communication. |
Comments suppressed due to low confidence (1)
src/ray/gcs/gcs_server/gcs_job_manager.cc:370
- Previously a disconnect call was made when a job was marked dead. Verify that omitting worker_client_pool_.Disconnect(worker_id) here is intentional and that stale connections are properly handled elsewhere.
reply->mutable_job_info_list(jj)->set_is_running_tasks(false);
Signed-off-by: dayshah <dhyey2019@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
How do we GC clients inside the pool? |
@@ -769,6 +781,7 @@ void GcsServer::InstallEventListeners() { | |||
[this](const std::shared_ptr<rpc::WorkerTableData> &worker_failure_data) { | |||
auto &worker_address = worker_failure_data->worker_address(); | |||
auto worker_id = WorkerID::FromBinary(worker_address.worker_id()); | |||
worker_client_pool_.Disconnect(worker_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jjyao cleaning up here on dead worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need it when the node dies, since if node dies, we won't call this listener for each individual worker on the node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I expected the worker death to be propagated there in case of node death too.
Anyways updated to disconnect on node death too
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Why are these changes needed?
Replacing all uses of a core worker client factory / unshared client pool with a shared core worker client pool for the whole gcs server. This is needed to use the retryable grpc client in the gcs. Will be implementing retries for rpc's that exist and new gcs -> worker rpc's will need retries, ex. #51653
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.