-
Notifications
You must be signed in to change notification settings - Fork 233
Bug: assert self._producer, NOT_CONNECTED_YET. Please, connect()
the broker first.
#2013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
If you are interested in detailed traceback: https://pastebin.com/JvfLd66b |
@JohnConnor123 logs are great, but can you show me the code with the problem reproduction? |
I can't show the code because it's confidential information. I can only say that the problem was reproducible when calling .publish on all team members and on all operating systems. |
Sorry, but I can't help you without the code. Just create a special MRE without your production logic. Smth like from faststream.kafka import KafkaBroker
from faststream import FastStream
broker = KafkaBroker()
app = FastStream(broker)
publisher = broker.publisher("out")
@broker.subscriber("test")
async def handler(msg):
await publisher.publish(msg)
@app.after_startup
async def _():
await broker.publish(None, "test") |
Btw, is you code work only with 0.5.29? What about 0.5.30? |
So, @JohnConnor123 can you help me with the problem reproducing? |
I didn't test this with 0.5.30, only with 0.29 and 0.33 |
Well, but I can just do nothing without additional information from your side. Please, let me solve the problem and provide me any example I can use to reproduce it. |
Well, I think, it's enough - I'll take care about it |
TL;DRThe issue likely occurs because publishers are created after My storyI encountered a similar issue when attempting to call class NatsProvider(Provider):
@provide(scope=Scope.APP)
async def broker(self) -> AsyncIterable[NatsBroker]:
broker = NatsBroker()
include_routers(broker) # Just `broker.include_routers(my_router)`
await broker.connect() # Trap is here
yield broker
await broker.close() My mistake was that
ExamplesExample with this error: import asyncio
from faststream.nats import NatsBroker
async def main() -> None:
broker = NatsBroker()
await broker.connect() # connect before creating publisher
await broker.publish("message1", "topic") # no error
publisher = broker.publisher("topic")
await publisher.publish("message2") # AssertionError: Please, `connect()` the broker first.
if __name__ == "__main__":
asyncio.run(main()) Working example: import asyncio
from faststream.nats import NatsBroker
async def main() -> None:
broker = NatsBroker()
publisher = broker.publisher("topic")
await broker.connect() # connect after creating publisher
await broker.publish("message1", "topic") # no error
await publisher.publish("message2") # no error
if __name__ == "__main__":
asyncio.run(main()) A more complex example with an error using FastStream, FastAPI, and Dishka: di.pyfrom collections.abc import AsyncIterable
from dishka import provide, Provider, Scope
from faststream.nats import NatsBroker
class NatsProvider(Provider):
@provide(scope=Scope.APP)
async def broker(self) -> AsyncIterable[NatsBroker]:
broker = NatsBroker()
await broker.connect()
yield broker
await broker.close() query.pyimport asyncio
from dishka import FromDishka, make_async_container
from dishka.integrations.faststream import inject, setup_dishka
from faststream import FastStream
from faststream.nats import NatsBroker, NatsRouter
from di import NatsProvider
router = NatsRouter()
@router.subscriber("topic")
@inject
async def query_handler(message: str, broker: FromDishka[NatsBroker]) -> None:
if message == "message":
await broker.publisher("topic").publish(message * 2) # no error
print(len(message), message)
async def main() -> None:
container = make_async_container(NatsProvider())
broker = await container.get(NatsBroker)
broker.include_router(router)
app = FastStream(broker)
setup_dishka(container, app)
await app.run()
if __name__ == '__main__':
asyncio.run(main()) web.pyimport uvicorn
from dishka import FromDishka, make_async_container
from dishka.integrations.fastapi import inject, setup_dishka
from fastapi import FastAPI
from faststream.nats import NatsBroker
from di import NatsProvider
app = FastAPI()
@app.get("/error")
@inject
async def web_handler(broker: FromDishka[NatsBroker]) -> str:
await broker.publisher("topic").publish("message") # error
return "ok"
@app.get("/ok")
@inject
async def web_handler(broker: FromDishka[NatsBroker]) -> str:
await broker.publish("message", "topic") # ok
return "ok"
def main() -> None:
container = make_async_container(NatsProvider())
setup_dishka(container, app)
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == '__main__':
main() Now, if you run So, 0.5.29 vs 0.5.33I have no idea why the problem is not in version 0.5.29 and is in version 0.5.33. |
I tried to reproduce the problem by the following code import asyncio
from faststream.nats import NatsBroker
async def main() -> None:
async with NatsBroker() as broker:
publisher = broker.publisher("topic")
await publisher.publish("message2") # AssertionError: Please, `connect()` the broker first.
if __name__ == "__main__":
asyncio.run(main()) But it doesn't work with 0.5.29 and 0.5.33 both. So, it should be an another case, I suppose. But, I hope, that if we fix this one - the original case should work as well |
Also, it looks like a copy of #1954 |
Uh oh!
There was an error while loading. Please reload this page.
Version faststream[kafka]==0.5.33 does not work when connected to a server. 5 hours of work in the debugger with the whole team and, as a result, the error below was resolved by rolling back to version 0.5.29
The text was updated successfully, but these errors were encountered: