8000 KeyDecodeError when group_by changes key type · Issue #543 · robinhood/faust · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
KeyDecodeError when group_by changes key type #543
Open
@tapple

Description

@tapple

stream.group_by does not appear to set it's topic's type properly, if it's different than the source stream's key type. Here's an example program that demonstrates converting from int key to str key:

import faust

app = faust.App('example', broker='kafka://')


class StringModel(faust.Record):
    string_value: str


int_topic = app.topic(
    'int_topic',
    key_type=int,
    value_type=StringModel,
    internal=True,
)


@app.agent(int_topic)
async def int_agent(int_topic):
    async for k, v in int_topic.group_by(StringModel.string_value).items():
        print(f'{k}: {v}')


@app.task
async def example_sender(app):
    await int_topic.send(key='5', value=StringModel(string_value="five"))


if __name__ == '__main__':
    app.main()

run the program like python bugdemo.py worker

Running the program generates a KeyDecodeError. It should not do that. A workaround is to explicitly specify the group_by topic to one with the correct key type

Full traceback

Traceback (most recent call last):
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
    return int(want_str(value))
ValueError: invalid literal for int() with base 10: 'five'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/agents/agent.py", line 646, in _execute_actor
    await coro
  File "/Users/tapple/cabbage/parsers/common/deepminer/async2/bugdemo.py", line 20, in int_agent
    async for k, v in int_topic.group_by(StringModel.string_value).items():
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 290, in items
    async for event in self.events():
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 299, in events
    async for _ in self:  # noqa: F841
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 768, in _c_aiter
    value, sensor_state = await it.next()  # noqa: B305
  File "faust/_cython/streams.pyx", line 87, in next
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/channels.py", line 502, in __anext__
    return await self.queue.get()
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/mode/utils/queues.py", line 129, in get
    return await super().get()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/queues.py", line 167, in get
    yield from getter
  File "faust/transport/_cython/conductor.pyx", line 54, in faust.transport._cython.conductor.ConductorHandler.__call__
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 137, in decode
    k: K = schema_loads_key(app, message, loads=loads_key)
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 75, in loads_key
    serializer=serializer or self.key_serializer,
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 61, in loads_key
    sys.exc_info()[2]) from exc
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
    return cast(K, self._prepare_payload(typ, payload))
  File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
    return int(want_str(value))
faust.exceptions.KeyDecodeError: invalid literal for int() with base 10: 'five'

Versions

  • Python version 3.6.4
  • Faust version 1.10.2
  • Operating system MacOS 10.15.3
  • Kafka version 2.4.0
  • RocksDB version (if applicable)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0