8000 UnknownError while consuming from a zstd-compressed topic - broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE) · Issue #1112 · aio-libs/aiokafka · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
UnknownError while consuming from a zstd-compressed topic - broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE) #1112
Open
@mesteruh

Description

@mesteruh

Describe the bug
When a topic is configured with compression.type=zstd, aiokafka consumers crash on the first fetch with:
Unexpected error while fetching data: UnknownError

Debug logging reveals that the broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE).
The same consumer works fine if the topic is switched back to compression.type=producer, even though the producer continues to publish zstd-compressed batches.

Root cause: aiokafka still sends FetchRequest v4, which does not advertise zstd support.
The broker therefore rejects the response whenever it knows it must serve zstd (topic-level compression.type=zstd).
If the broker is not forced to recompress (compression.type=producer), it happily streams the original zstd batches back, so the error does not appear.

Expected behaviour
If zstandard support is available (via pip install aiokafka[zstd]) and the broker supports zstd (Kafka ≥ 2.1.0), the consumer should:

  1. Automatically use FetchRequest v10+ when it detects it might receive zstd

Environment (please complete the following information):

  • aiokafka version 0.12.0
  • Kafka Broker version (kafka-topics.sh --version):
  • Other information (Confluent Cloud version, etc.):

Reproducible example

import asyncio
from aiokafka import AIOKafkaProducer

async def send_messages():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:29092', 
        compression_type='zstd',

    )
    await producer.start()
    try:
        # Отправка 10 сообщений
        for i in range(10):
            message = f"Сообщение {i}".encode('utf-8')
            await producer.send_and_wait("axixa2", message)
            print(f"Отправлено: {message.decode('utf-8')}")
            await asyncio.sleep(1)  # Пауза для наглядности
    finally:
        await producer.stop()

if __name__ == "__main__":
    asyncio.run(send_messages())
import asyncio
from aiokafka import AIOKafkaConsumer

async def 
6B4C
consume_messages():
    consumer = AIOKafkaConsumer(
        'axixa2',
        bootstrap_servers='localhost:29092',
        group_id='my-group',
        auto_offset_reset='earliest'
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Получено: {msg.value.decode('utf-8')} (партиция: {msg.partition})")
    finally:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume_messages())
version: '3.7'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      kafka-net:
        ipv4_address: 172.25.0.2

  kafka:
    image: confluentinc/cp-kafka:7.4.2
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092  # Изменено на localhost
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_COMPRESSION_TYPE: zstd
    networks:
      kafka-net:
        ipv4_address: 172.25.0.3
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "9092"]
      interval: 5s
      timeout: 10s
      retries: 10

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      kafka-net:
        ipv4_address: 172.25.0.4

  producer:
    image: confluentinc/cp-kafka:7.4.2
    container_name: producer
    depends_on:
      - kafka
    command: >
      bash -c "
        while ! nc -z kafka 9092; do sleep 1; done;
        kafka-topics --bootstrap-server kafka:9092 --create --topic zstd-test --partitions 3 --replication-factor 1 --if-not-exists;
        seq 1000 | kafka-console-producer --bootstrap-server kafka:9092 --topic zstd-test --compression-codec zstd;
        sleep infinity
      "
    networks:
      kafka-net:
        ipv4_address: 172.25.0.5

networks:
  kafka-net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.25.0.0/24

I opened mr

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0