8000 docs: Add information on how you can organize DLQ message queue for different queue managers. · Issue #2210 · ag2ai/faststream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

docs: Add information on how you can organize DLQ message queue for different queue managers. #2210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
powersemmi opened this issue May 7, 2025 · 1 comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request

Comments

@powersemmi
Copy link

Summary

I think part of the community has that need.

Motivation

I think there is such a need to specify in the documentation whether there are simple ways to implement DLQ queues in different queue managers.
Built-in features like in Rabbit, etc
And where it is necessary to write a crutch, as, for example, in Redis, Kafka.

Proposed Solution

  1. Rabbit
  2. Redis
  3. Kafka

It may be worthwhile to draw a table somewhere to show which queue manager supports which queue. (and what FS supports)

@powersemmi powersemmi added the enhancement New feature or request label May 7, 2025
@Lancetnik Lancetnik added the documentation Improvements or additions to documentation label May 7, 2025
@Flosckow
Copy link
Contributor
Flosckow commented May 15, 2025

In nats, you can implement dlq like this using jetstream - https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#dead-letter-queues-type-functionality

# publisher
await test_publisher.publish(stream="stream", subject="test", message={"message": message})

# consumer
config = ConsumerConfig(max_deliver=5)
@broker.subscriber(
	subject="test",
	stream="stream",
	pull_sub=PullSub(batch_size=10),
	config=config
)          
async def handle_pull_test(data: dict, msg: NatsMessage):
    try:
        div_result = 5 / 0   # Throws ZeroDivisionError
        await msg.ack()
    except Exception as e:
        await msg.nack()
        print(msg)

After 5 attempts, the message will drop to stream_dlq, then you can work with it, but it is important to note that if the message reaches the maximum number of delivery attempts, it will still remain in the stream until it is deleted or manually confirmed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants
0