10000 Add support for pipes and consume --header flag to filter out messages by defined headers by valichek · Pull Request #402 · birdayz/kaf · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add support for pipes and consume --header flag to filter out messages by defined headers #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

valichek
Copy link
@valichek valichek commented Jul 4, 2025

PR adds support for pipes.
Output of kaf consume can be used as input for kaf produce.
This is possible by adding json-each-row input/output format.

echo {"data":1} | kaf produce mqtt.messages.incoming --key 1 --header h1:hv1
> Sent record to partition 0 at offset 0.

kaf consume mqtt.messages.incoming --output json-each-row
> {"topic":"mqtt.messages.incoming","partition":0,"offset":0,"timestamp":"2025-07-04T12:53:46.841+02:00","headers":[{"key":"h1","value":"hv1"}],"key":"1","payload":"{data:1}"}


echo '{"topic":"mqtt.messages.incoming","partition":0,"offset":0,"timestamp":"2025-07-04T12:53:46.841+02:00","headers":[{"key":"h1","value":"hv1"}],"key":"1","payload":"{data:1}"}' | kaf produce mqtt.messages.incoming --input json-each-row

kaf consume mqtt.messages.incoming --output json-each-row
> {"topic":"mqtt.messages.incoming","partition":0,"offset":1,"timestamp":"2025-07-04T13:05:57.9+02:00","headers":[{"key":"h1","value":"hv1"}],"key":"1","payload":"{data:1}"}

Pipe from one topic to another
kaf consume topic-a --output json-each-row -f | kaf produce topic-b --input json-each-row

Important: kaf produce will overwrite key, partition and all the headers of input messages if provided
May solve #373, related to #108

PR adds consume --header flag to filter out messages by defined headers.

Consume messages with filtering by header
kaf consume mqtt.messages.incoming --header h1:hv1

@valichek valichek changed the title Add support for pipes Add support for pipes and consume --header flag to filter out messages by defined headers Jul 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0