8000 feat: add cluster metadata request via aiokafka admin client in ping by murzinov01 · Pull Request #2212 · ag2ai/faststream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: add cluster metadata request via aiokafka admin client in ping #2212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 18, 2025
Merged
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ search:
- [KafkaRouter](api/faststream/kafka/router/KafkaRouter.md)
- schemas
- params
- [AdminClientConnectionParams](api/faststream/kafka/schemas/params/AdminClientConnectionParams.md)
- [ConsumerConnectionParams](api/faststream/kafka/schemas/params/ConsumerConnectionParams.md)
- security
- [parse_security](api/faststream/kafka/security/parse_security.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.schemas.params.AdminClientConnectionParams 8000
27 changes: 21 additions & 6 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import warnings
from contextlib import suppress
from functools import partial
from typing import (
TYPE_CHECKING,
Expand All @@ -18,6 +19,7 @@
)

import aiokafka
import aiokafka.admin
import anyio
from aiokafka.partitioner import DefaultPartitioner
from aiokafka.producer.producer import _missing
Expand All @@ -29,7 +31,10 @@
from faststream.kafka.broker.logging import KafkaLoggingBroker
from faststream.kafka.broker.registrator import KafkaRegistrator
from faststream.kafka.publisher.producer import AioKafkaFastProducer
from faststream.kafka.schemas.params import ConsumerConnectionParams
from faststream.kafka.schemas.params import (
AdminClientConnectionParams,
ConsumerConnectionParams,
)
from faststream.kafka.security import parse_security
from faststream.types import EMPTY
from faststream.utils.data import filter_by_dict
Expand All @@ -42,6 +47,7 @@

from aiokafka import ConsumerRecord
from aiokafka.abc import AbstractTokenProvider
from aiokafka.admin.client import AIOKafkaAdminClient
from fast_depends.dependencies import Depends
from typing_extensions import TypedDict, Unpack

Expand Down Expand Up @@ -236,6 +242,7 @@ class KafkaBroker(
):
url: List[str]
_producer: Optional["AioKafkaFastProducer"]
_admin_client: Optional["AIOKafkaAdminClient"]

def __init__(
self,
Expand Down Expand Up @@ -579,6 +586,7 @@ def __init__(

self.client_id = client_id
self._producer = None
self._admin_client = None

async def _close(
self,
Expand All @@ -589,6 +597,9 @@ async def _close(
if self._producer is not None: # pragma: no branch
await self._producer.stop()
self._producer = None
if self._admin_client is not None:
await self._admin_client.close()
self._admin_client = None

await super()._close(exc_type, exc_val, exc_tb)

Expand Down Expand Up @@ -637,11 +648,15 @@ async def _connect( # type: ignore[override]
security_params = parse_security(self.security)
kwargs.update(security_params)

self._admin_client = aiokafka.admin.client.AIOKafkaAdminClient(
**filter_by_dict(AdminClientConnectionParams, kwargs),
)
producer = aiokafka.AIOKafkaProducer(
**kwargs,
client_id=client_id,
)

await self._admin_client.start()
await producer.start()
self._producer = AioKafkaFastProducer(
producer=producer,
Expand Down Expand Up @@ -908,17 +923,17 @@ async def publish_batch(
async def ping(self, timeout: Optional[float]) -> bool:
sleep_time = (timeout or 10) / 10

with anyio.move_on_after(timeout) as cancel_scope:
if self._producer is None:
return False
if self._admin_client is None:
return False

with anyio.move_on_after(timeout) as cancel_scope:
while True:
if cancel_scope.cancel_called:
return False

if not self._producer._producer._closed:
with suppress(Exception):
await self._admin_client.describe_cluster()
return True

await anyio.sleep(sleep_time)

return False
54 changes: 53 additions & 1 deletion faststream/kafka/schemas/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,55 @@
from typing_extensions import TypedDict


class AdminClientConnectionParams(TypedDict, total=False):
"""A class to represent the connection parameters for an admin client.

Attributes:
bootstrap_servers : Required. The bootstrap servers to connect to.
loop : Optional. The event loop to use for asynchronous operations.
client_id : The client ID to use for the connection.
request_timeout_ms : The timeout for network requests in milliseconds.
retry_backoff_ms : The backoff time in milliseconds for retrying failed requests.
metadata_max_age_ms : The maximum age of metadata in milliseconds.
security_protocol : The security protocol to use for the connection. Must be one of "SSL" or "PLAINTEXT".
api_version : The API version to use for the connection.
connections_max_idle_ms : The maximum idle time in milliseconds before closing a connection.
ssl_context : Pre-configured SSLContext for wrapping socket connections.
sasl_mechanism : The SASL mechanism to use for authentication. Must be one of "PLAIN", "GSSAPI", "SCRAM-SHA-256", "SCRAM-SHA-512", or "OAUTHBEARER".
sasl_plain_password : The password to use for PLAIN SASL mechanism.
sasl_plain_username : The username to use for PLAIN SASL mechanism.
sasl_kerberos_service_name : Service name to include in GSSAPI sasl mechanism handshake.
sasl_kerberos_domain_name : Kerberos domain name to use in GSSAPI sasl mechanism handshake.
sasl_oauth_token_provider : OAuthBearer token provider instance.
"""

bootstrap_servers: Union[str, List[str]]
loop: Optional[AbstractEventLoop]
client_id: str
request_timeout_ms: int
retry_backoff_ms: int
metadata_max_age_ms: int
security_protocol: Literal[
"SSL",
"PLAINTEXT",
]
api_version: str
connections_max_idle_ms: int
sasl_mechanism: Literal[
"PLAIN",
"GSSAPI",
"SCRAM-SHA-256",
"SCRAM-SHA-512",
"OAUTHBEARER",
]
sasl_plain_password: str
sasl_plain_username: str
sasl_kerberos_service_name: str
sasl_kerberos_domain_name: str
ssl_context: ssl.SSLContext
sasl_oauth_token_provider: AbstractTokenProvider


class ConsumerConnectionParams(TypedDict, total=False):
"""A class to represent the connection parameters for a consumer.

Expand All @@ -19,10 +68,13 @@ class ConsumerConnectionParams(TypedDict, total=False):
security_protocol : The security protocol to use for the connection. Must be one of "SSL" or "PLAINTEXT".
api_version : The API version to use for the connection.
connections_max_idle_ms : The maximum idle time in milliseconds before closing a connection.
ssl_context : Pre-configured SSLContext for wrapping socket connections.
sasl_mechanism : The SASL mechanism to use for authentication. Must be one of "PLAIN", "GSSAPI", "SCRAM-SHA-256", "SCRAM-SHA-512", or "OAUTHBEARER".
sasl_plain_password : The password to use for PLAIN SASL mechanism.
sasl_plain_username : The username to use for PLAIN SASL mechanism.
sasl_kerberos_service_name : The service
sasl_kerberos_service_name : Service name to include in GSSAPI sasl mechanism handshake.
sasl_kerberos_domain_name : Kerberos domain name to use in GSSAPI sasl mechanism handshake.
sasl_oauth_token_provider : OAuthBearer token provider instance.
"""

bootstrap_servers: Union[str, List[str]]
Expand Down
6 changes: 5 additions & 1 deletion tests/a_docs/kafka/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
def patch_aio_consumer_and_producer() -> Tuple[MagicMock, MagicMock]:
try:
producer = MagicMock(return_value=AsyncMock())
admin_client = MagicMock(return_value=AsyncMock())

with patch("aiokafka.AIOKafkaProducer", new=producer):
with (
patch("aiokafka.AIOKafkaProducer", new=producer),
patch("aiokafka.admin.client.AIOKafkaAdminClient", new=admin_client),
):
yield producer
finally:
pass
Expand Down
Loading
0