Description
Describe the bug
When a topic is configured with compression.type=zstd
, aiokafka
consumers crash on the first fetch with:
Unexpected error while fetching data: UnknownError
Debug logging reveals that the broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE
).
The same consumer works fine if the topic is switched back to compression.type=producer
, even though the producer continues to publish zstd
-compressed batches.
Root cause: aiokafka
still sends FetchRequest v4, which does not advertise zstd
support.
The broker therefore rejects the response whenever it knows it must serve zstd
(topic-level compression.type=zstd
).
If the broker is not forced to recompress (compression.type=producer
), it happily streams the original zstd
batches back, so the error does not appear.
Expected behaviour
If zstandard
support is available (via pip install aiokafka[zstd]
) and the broker supports zstd
(Kafka ≥ 2.1.0), the consumer should:
- Automatically use FetchRequest v10+ when it detects it might receive
zstd
Environment (please complete the following information):
- aiokafka version 0.12.0
- Kafka Broker version (
kafka-topics.sh --version
): - Other information (Confluent Cloud version, etc.):
Reproducible example
import asyncio
from aiokafka import AIOKafkaProducer
async def send_messages():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:29092',
compression_type='zstd',
)
await producer.start()
try:
# Отправка 10 сообщений
for i in range(10):
message = f"Сообщение {i}".encode('utf-8')
await producer.send_and_wait("axixa2", message)
print(f"Отправлено: {message.decode('utf-8')}")
await asyncio.sleep(1) # Пауза для наглядности
finally:
await producer.stop()
if __name__ == "__main__":
asyncio.run(send_messages())
import asyncio
from aiokafka import AIOKafkaConsumer
async def
6B4C
consume_messages():
consumer = AIOKafkaConsumer(
'axixa2',
bootstrap_servers='localhost:29092',
group_id='my-group',
auto_offset_reset='earliest'
)
await consumer.start()
try:
async for msg in consumer:
print(f"Получено: {msg.value.decode('utf-8')} (партиция: {msg.partition})")
finally:
await consumer.stop()
if __name__ == "__main__":
asyncio.run(consume_messages())
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
networks:
kafka-net:
ipv4_address: 172.25.0.2
kafka:
image: confluentinc/cp-kafka:7.4.2
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 # Изменено на localhost
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_COMPRESSION_TYPE: zstd
networks:
kafka-net:
ipv4_address: 172.25.0.3
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "9092"]
interval: 5s
timeout: 10s
retries: 10
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
networks:
kafka-net:
ipv4_address: 172.25.0.4
producer:
image: confluentinc/cp-kafka:7.4.2
container_name: producer
depends_on:
- kafka
command: >
bash -c "
while ! nc -z kafka 9092; do sleep 1; done;
kafka-topics --bootstrap-server kafka:9092 --create --topic zstd-test --partitions 3 --replication-factor 1 --if-not-exists;
seq 1000 | kafka-console-producer --bootstrap-server kafka:9092 --topic zstd-test --compression-codec zstd;
sleep infinity
"
networks:
kafka-net:
ipv4_address: 172.25.0.5
networks:
kafka-net:
driver: bridge
ipam:
config:
- subnet: 172.25.0.0/24
I opened mr