Open
Description
stream.group_by does not appear to set it's topic's type properly, if it's different than the source stream's key type. Here's an example program that demonstrates converting from int key to str key:
import faust
app = faust.App('example', broker='kafka://')
class StringModel(faust.Record):
string_value: str
int_topic = app.topic(
'int_topic',
key_type=int,
value_type=StringModel,
internal=True,
)
@app.agent(int_topic)
async def int_agent(int_topic):
async for k, v in int_topic.group_by(StringModel.string_value).items():
print(f'{k}: {v}')
@app.task
async def example_sender(app):
await int_topic.send(key='5', value=StringModel(string_value="five"))
if __name__ == '__main__':
app.main()
run the program like python bugdemo.py worker
Running the program generates a KeyDecodeError. It should not do that. A workaround is to explicitly specify the group_by topic to one with the correct key type
Full traceback
Traceback (most recent call last):
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
return cast(K, self._prepare_payload(typ, payload))
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
return int(want_str(value))
ValueError: invalid literal for int() with base 10: 'five'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/agents/agent.py", line 646, in _execute_actor
await coro
File "/Users/tapple/cabbage/parsers/common/deepminer/async2/bugdemo.py", line 20, in int_agent
async for k, v in int_topic.group_by(StringModel.string_value).items():
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 290, in items
async for event in self.events():
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 299, in events
async for _ in self: # noqa: F841
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/streams.py", line 768, in _c_aiter
value, sensor_state = await it.next() # noqa: B305
File "faust/_cython/streams.pyx", line 87, in next
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/channels.py", line 502, in __anext__
return await self.queue.get()
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/mode/utils/queues.py", line 129, in get
return await super().get()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/queues.py", line 167, in get
yield from getter
File "faust/transport/_cython/conductor.pyx", line 54, in faust.transport._cython.conductor.ConductorHandler.__call__
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 137, in decode
k: K = schema_loads_key(app, message, loads=loads_key)
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/schemas.py", line 75, in loads_key
serializer=serializer or self.key_serializer,
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 61, in loads_key
sys.exc_info()[2]) from exc
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 56, in loads_key
return cast(K, self._prepare_payload(typ, payload))
File "/Users/tapple/.virtualenvs/deepminer/lib/python3.6/site-packages/faust/serializers/registry.py", line 111, in _prepare_payload
return int(want_str(value))
faust.exceptions.KeyDecodeError: invalid literal for int() with base 10: 'five'
Versions
- Python version 3.6.4
- Faust version 1.10.2
- Operating system MacOS 10.15.3
- Kafka version 2.4.0
- RocksDB version (if applicable)
Metadata
Metadata
Assignees
Labels
No labels