diff --git a/docs/docs/en/getting-started/publishing/decorator.md b/docs/docs/en/getting-started/publishing/decorator.md index 98e2ead988..0b1a460f61 100644 --- a/docs/docs/en/getting-started/publishing/decorator.md +++ b/docs/docs/en/getting-started/publishing/decorator.md @@ -21,7 +21,7 @@ It creates a structured DataPipeline unit with an input and output. The order of :material-checkbox-marked:{.checked_mark} **Easy to use** - Publishing messages in **FastStream** is intuitive and requires minimal effort. - :material-checkbox-marked:{.checked_mark} **AsyncAPI support** - [```AsyncAPI```](../asyncapi/export.md#section{.css-styles}) is a specification for describing asynchronous APIs used in messaging applications. This method currently does not support this standard. + :material-checkbox-marked:{.checked_mark} **AsyncAPI support** - [```AsyncAPI```](../asyncapi/export.md#section{.css-styles}) is a specification for describing asynchronous APIs used in messaging applications. :fontawesome-solid-square-xmark:{.x_mark} **No testing support** - This method lacks full [```Testing```](./test.md#section{.css-styles}) support. diff --git a/docs/docs/en/getting-started/subscription/filtering.md b/docs/docs/en/getting-started/subscription/filtering.md index b5b93c7558..ec0f252e12 100644 --- a/docs/docs/en/getting-started/subscription/filtering.md +++ b/docs/docs/en/getting-started/subscription/filtering.md @@ -98,3 +98,48 @@ And this one will be delivered to the `default_handler` ```python hl_lines="2" {!> docs_src/getting_started/subscription/redis/filter.py [ln:29.5,30.5,31.5,32.5] !} ``` + +--- + +## Technical Information + +Let's break down how message filtering works in a subscription mechanism. + +### Core Filtering Logic + +Consider a simple example of a filter implementation: + +```python +for handler in subscriber.handlers: + if await handler.filter(msg): + return await handler.process(msg) + +raise HandlerNotFoundError +``` + +This code selects the first suitable handler to process the message. This means the **default handler should be placed last** in the list. If no logical handlers match, the message must still be processed. For this, we need a special trash handler that defines the system's default behavior for such cases. + +### Implementing the Default Handler + +The default handler should be declared as follows: + +```python +subscriber = broker.subscriber() + +@subscriber(filter=...) +async def handler(): ... + +@subscriber() +async def default_handler(): ... +``` + +Here, `@subscriber()` is equivalent to `@subscriber(filter=lambda _: True)`, meaning it **accepts all** messages. This ensures that no message goes unprocessed, even if no specific handler is found. + +### Summary + +- Handlers are checked in order, and the first matching one processes the message. +- The default handler must be **placed last** to ensure all messages are handled. +- `@subscriber()` without parameters acts as a universal handler, accepting everything. +- A trash handler must properly finalize the subscription and inform the broker about unnecessary data. + +Properly managing subscribers allows for precise message processing control and prevents data loss.