8000 Bug: assert self._producer, NOT_CONNECTED_YET. Please, `connect()` the broker first. · Issue #2013 · ag2ai/faststream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Bug: assert self._producer, NOT_CONNECTED_YET. Please, connect() the broker first. #2013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
JohnConnor123 opened this issue Dec 28, 2024 · 13 comments · Fixed by #2024
Closed
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working

Comments

@JohnConnor123
Copy link
JohnConnor123 commented Dec 28, 2024

Version faststream[kafka]==0.5.33 does not work when connected to a server. 5 hours of work in the debugger with the whole team and, as a result, the error below was resolved by rolling back to version 0.5.29

  File "E:\Poetry\virtualenvs\audioserver-E3nwuXL0-py3.12\Lib\site-packages\faststream\kafka\publisher\usecase.py", line 291, in publish
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
           ^^^^^^^^^^^^^^
AssertionError: Please, `connect()` the broker first.
@JohnConnor123 JohnConnor123 added the bug Something isn't working label Dec 28, 2024
@JohnConnor123
Copy link
Author

If you are interested in detailed traceback: https://pastebin.com/JvfLd66b

@Lancetnik
Copy link
Collaborator

@JohnConnor123 logs are great, but can you show me the code with the problem reproduction?

@JohnConnor123
Copy link
Author

@JohnConnor123 logs are great, but can you show me the code with the problem reproduction?

I can't show the code because it's confidential information. I can only say that the problem was reproducible when calling .publish on all team members and on all operating systems.

@Lancetnik
Copy link
Collaborator

Sorry, but I can't help you without the code. Just create a special MRE without your production logic. Smth like

from faststream.kafka import KafkaBroker
from faststream import FastStream

broker = KafkaBroker()
app = FastStream(broker)

publisher = broker.publisher("out")

@broker.subscriber("test")
async def handler(msg):
      await publisher.publish(msg)

@app.after_startup
async def _():
     await broker.publish(None, "test")

@Lancetnik
Copy link
Collaborator

Btw, is you code work only with 0.5.29? What about 0.5.30?

@Lancetnik Lancetnik added the AioKafka Issues related to `faststream.kafka` module label Dec 28, 2024
@Lancetnik
Copy link
Collaborator

So, @JohnConnor123 can you help me with the problem reproducing?

@JohnConnor123
Copy link
Author
JohnConnor123 commented Dec 28, 2024

Btw, is you code work only with 0.5.29? What about 0.5.30?

I didn't test this with 0.5.30, only with 0.29 and 0.33

@Lancetnik
Copy link
Collaborator

Btw, is you code work only with 0.5.29? What about 0.5.30?

I didn't test this with 0.5.30, only with 0.29 and 0.33

Well, but I can just do nothing without additional information from your side. Please, let me solve the problem and provide me any example I can use to reproduce it.

@JohnConnor123
Copy link
Author
JohnConnor123 commented Dec 29, 2024

Btw, is you code work only with 0.5.29? What about 0.5.30?

I didn't test this with 0.5.30, only with 0.29 and 0.33

Well, but I can just do nothing without additional information from your side. Please, let me solve the problem and provide me any example I can use to reproduce it.

P.s. Sorry for the screenshots instead of code. This is the maximum I can provide. Issue was created as a hint to fix the problem for other developers.
image
image
image

@Lancetnik
Copy link
Collaborator
Lancetnik commented Dec 30, 2024

Well, I think, it's enough - I'll take care about it

@K1rL3s
Copy link
K1rL3s commented Jan 5, 2025

TL;DR

The issue likely occurs because publishers are created after await broker.connect() (or .setup()).
Try to call await broker.connect() after invoking BrokerProvider.provide_publishers

My story

I encountered a similar issue when attempting to call await broker.publisher(...).publish(...) from another process. I had a FastStream instance running separately and a FastAPI instance that needed to publish messages to the queue. I created the broker using Dishka and then reused the provider in both instances:

class NatsProvider(Provider):
    @provide(scope=Scope.APP)
    async def broker(self) -> AsyncIterable[NatsBroker]:
        broker = NatsBroker()
        include_routers(broker)  # Just `broker.include_routers(my_router)`
        await broker.connect()  # Trap is here
        yield broker
        await broker.close()

My mistake was that await broker.connect() setups only the publishers created before its call

However, if you use a similar method await broker.start(), which also doesn't block the thread, new publishers will work. This is due to setting the flag self.running = True, which is later checked here to register new publishers

Examples

Example with this error:

import asyncio
from faststream.nats import NatsBroker

async def main() -> None:
    broker = NatsBroker()
    await broker.connect()  # connect before creating publisher

    await broker.publish("message1", "topic")  # no error

    publisher = broker.publisher("topic")
    await publisher.publish("message2")  # AssertionError: Please, `connect()` the broker first.

if __name__ == "__main__":
    asyncio.run(main())

Working example:

import asyncio
from faststream.nats import NatsBroker

async def main() -> None:
    broker = NatsBroker()
    publisher = broker.publisher("topic")
    await broker.connect()  # connect after creating publisher

    await broker.publish("message1", "topic")  # no error

    await publisher.publish("message2")  # no error

if __name__ == "__main__":
    asyncio.run(main())

A more complex example with an error using FastStream, FastAPI, and Dishka:

di.py

from collections.abc import AsyncIterable

from dishka import provide, Provider, Scope
from faststream.nats import NatsBroker

class NatsProvider(Provider):
    @provide(scope=Scope.APP)
    async def broker(self) -> AsyncIterable[NatsBroker]:
        broker = NatsBroker()
        await broker.connect()
        yield broker
        await broker.close()

query.py

import asyncio

from dishka import FromDishka, make_async_container
from dishka.integrations.faststream import inject, setup_dishka
from faststream import FastStream
from faststream.nats import NatsBroker, NatsRouter

from di import NatsProvider

router = NatsRouter()

@router.subscriber("topic")
@inject
async def query_handler(message: str, broker: FromDishka[NatsBroker]) -> None:
    if message == "message":
        await broker.publisher("topic").publish(message * 2)  # no error
    print(len(message), message)

async def main() -> None:
    container = make_async_container(NatsProvider())
    broker = await container.get(NatsBroker)
    broker.include_router(router)
    app = FastStream(broker)
    setup_dishka(container, app)
    await app.run()

if __name__ == '__main__':
    asyncio.run(main())

web.py

import uvicorn
from dishka import FromDishka, make_async_container
from dishka.integrations.fastapi import inject, setup_dishka
from fastapi import FastAPI
from faststream.nats import NatsBroker

from di import NatsProvider

app = FastAPI()
@app.get("/error")
@inject
async def web_handler(broker: FromDishka[NatsBroker]) -> str:
    await broker.publisher("topic").publish("message")  # error
    return "ok"

@app.get("/ok")
@inject
async def web_handler(broker: FromDishka[NatsBroker]) -> str:
    await broker.publish("message", "topic")  # ok
    return "ok"

def main() -> None:
    container = make_async_container(NatsProvider())
    setup_dishka(container, app)
    uvicorn.run(app, host="0.0.0.0", port=8000)

if __name__ == '__main__':
    main()

Now, if you run web.py and query.py and visit localhost:8000/ok, everything will work. However, visiting localhost:8000/error will result in an error: AssertionError: Please, 'connect()' the broker first.

So, broker.publisher(...).publish(...) works inside a FastStream handlers but falis from another process.

0.5.29 vs 0.5.33

I have no idea why the problem is not in version 0.5.29 and is in version 0.5.33.
As a temporary workaround, call broker.connect() (or .setup()) after creating publishers (after calling BrokerProvider.provide_publishers)

@Lancetnik
Copy link
Collaborator
Lancetnik commented Jan 8, 2025

I tried to reproduce the problem by the following code

import asyncio

from faststream.nats import NatsBroker

async def main() -> None:
    async with NatsBroker() as broker:
        publisher = broker.publisher("topic")
        await publisher.publish("message2")  # AssertionError: Please, `connect()` the broker first.

if __name__ == "__main__":
    asyncio.run(main())

But it doesn't work with 0.5.29 and 0.5.33 both. So, it should be an another case, I suppose. But, I hope, that if we fix this one - the original case should work as well

@Lancetnik
Copy link
Collaborator

Also, it looks like a copy of #1954

@Lancetnik Lancetnik moved this to In Progress in FastStream Jan 8, 2025
@Lancetnik Lancetnik moved this from In Progress to Waiting for merge in FastStream Jan 8, 2025
github-merge-queue bot pushed a commit that referenced this issue Jan 8, 2025
…2024)

* fix: allow to create publisher in connected broker

* chore: bump version
@github-project-automation github-project-automation bot moved this from Waiting for merge to Done in FastStream Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants
0