Open
Description
What happened + What you expected to happen
Since 2.10.0
, DeploymentResponses._to_object_ref() functions await the final result from the task, rather than returning the ObjectRef of the running task once it is scheduled. This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).
You can find more details here.
Versions / Dependencies
This is an issue with Ray[Serve]>=2.10.0
Reproduction script
import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI
app = FastAPI()
async def _to_object_ref_or_gen(
self,
hold=False
):
"""
This replicates the behaviour locally for easier debugging.
"""
obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
if hold:
obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
else:
obj_ref_or_gen = obj_ref_or_gen.__anext__()
self._object_ref_or_gen = obj_ref_or_gen
return self._object_ref_or_gen
@serve.deployment(
ray_actor_options={"num_cpus": 0},
num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
def __init__(self, foo_handler: DeploymentHandle):
self.foo_handler = foo_handler
@app.post("/")
async def entry(self, hold: bool):
handle = None
metrics = {}
try:
start = time.time()
handle = self.foo_handler.remote()
metrics["call_time"], start = time.time() - start, time.time()
ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
metrics["scheduling_time"], start = time.time() - start, time.time()
result = await ref
metrics["worker_time"] = time.time() - start
except TimeoutError:
if handle is not None:
handle.cancel()
raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
finally:
print(f"\n\nMetrics: {metrics}\n\n")
return result
@serve.deployment(
ray_actor_options={"num_cpus": 0},
num_replicas=1
)
class Foo:
def __call__(self):
time.sleep(10)
return True
foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)
if __name__ == "__main__":
from ray.cluster_utils import Cluster
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 4,
"num_gpus": 0,
"resources": {"head": 1},
"dashboard_host": "0.0.0.0",
},
)
worker_node = cluster.add_node(
num_cpus=4,
num_gpus=2,
resources={"worker": 1},
)
cluster.wait_for_nodes(2)
deployment = serve.run(service)
print("\n\nTesting programmatically...\n\n")
deployment.entry.remote(hold=False).result()
deployment.entry.remote(hold=True).result()
input("\n\nTest script completed. Press Enter to shutdown.\n\n")
serve.shutdown()
Issue Severity
High: It blocks me from completing my task.