8000 fix(redis): prevent memory leak by deleting acknowledged stream messages, added test coverage for message deletion behaviour by shashank-sarvam · Pull Request #2232 · ag2ai/faststream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(redis): prevent memory leak by deleting acknowledged stream messages, added test coverage for message deletion behaviour #2232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

shashank-sarvam 8000
Copy link

Description

This PR implements automatic deletion of Redis Stream messages after acknowledgment to prevent memory accumulation in long-running FastStream applications.

Problem: Redis Stream messages were only being acknowledged (XACK) but never deleted from the stream, causing Redis memory to grow indefinitely as processed messages accumulated.

Solution: Added XDEL call immediately after XACK in the _RedisStreamMessageMixin.ack() method to automatically remove messages from the stream after successful processing.

Impact: This changes Redis Streams behavior from persistent event log to message queue semantics, preventing memory leaks while maintaining backward compatibility for typical use cases.

Fixes #2223

Type of change

Please delete options that are not relevant.

  • [x ] New feature (a non-breaking change that adds functionality)

Checklist

  • [ x] My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • [ x] I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • [x ] My changes do not generate any new warnings
  • [ x] I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • [ x] Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh
  • [ x] I have ensured that static analysis tests are passing by running scripts/static-analysis.sh
  • I have included code examples to illustrate the modifications

@CLAassistant
Copy link
CLAassistant commented May 26, 2025

CLA assistant check
All committers have signed the CLA.

@@ -128,6 +128,7 @@ async def ack(
ids = self.raw_message["message_ids"]
channel = self.raw_message["channel"]
await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call]
await redis.xdel(channel, *ids) # type: ignore[no-untyped-call]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but this is not that I ment. We shouldn't ack and remove messages by the same call. Also, it shouldn't be a default behavior as well. Please, move this logic to separated message.delete() method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature: Delete data from the Redis Stream to which Faststream has subscribed to
3 participants
0