8000 [Serve] DeploymentResponse._to_object_ref() blocks untill final results from actor · Issue #46893 · ray-project/ray · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
[Serve] DeploymentResponse._to_object_ref() blocks untill final results from actor #46893
Open
@kyle-v6x

Description

@kyle-v6x

What happened + What you expected to happen

Since 2.10.0, DeploymentResponses._to_object_ref() functions await the final result from the task, rather than returning the ObjectRef of the running task once it is scheduled. This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).

You can find more details here.

Versions / Dependencies

This is an issue with Ray[Serve]>=2.10.0

Reproduction script

import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI

app = FastAPI()


async def _to_object_ref_or_gen(
        self,
        hold=False
):
    """
    This replicates the behaviour locally for easier debugging.
    """
    obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
    if hold:
        obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
    else:
        obj_ref_or_gen = obj_ref_or_gen.__anext__()

    self._object_ref_or_gen = obj_ref_or_gen

    return self._object_ref_or_gen

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
    def __init__(self, foo_handler: DeploymentHandle):
        self.foo_handler = foo_handler

    @app.post("/")
    async def entry(self, hold: bool):
        handle = None
        metrics = {}
        try:
            start = time.time()
            handle = self.foo_handler.remote()
            metrics["call_time"], start = time.time() - start, time.time()
            ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
            metrics["scheduling_time"], start = time.time() - start, time.time()
            result = await ref
            metrics["worker_time"] = time.time() - start
        except TimeoutError:
            if handle is not None:
                handle.cancel()
            raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
        finally:
            print(f"\n\nMetrics: {metrics}\n\n")
        return result


@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
class Foo:
    def __call__(self):
        time.sleep(10)
        return True


foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)


if __name__ == "__main__":
    from ray.cluster_utils import Cluster

    cluster = Cluster(
        initialize_head=True,
        head_node_args={
            "num_cpus": 4,
            "num_gpus": 0,
            "resources": {"head": 1},
            "dashboard_host": "0.0.0.0",
        },
    )
    worker_node = cluster.add_node(
        num_cpus=4,
        num_gpus=2,
        resources={"worker": 1},
    )

    cluster.wait_for_nodes(2)
    deployment = serve.run(service)

    print("\n\nTesting programmatically...\n\n")
    deployment.entry.remote(hold=False).result()
    deployment.entry.remote(hold=True).result()

    input("\n\nTest script completed. Press Enter to shutdown.\n\n")
    serve.shutdown()

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

Labels

P0Issues that should be fixed in short orderbugSomething that is supposed to be working; but isn'tcoreIssues that should be addressed in Ray Corejira-sync

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions

    0