8000 Resolve whole references tree by kleewho · Pull Request #250 · deviceinsight/kafkactl · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Resolve whole references tree #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions cmd/consume/consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consume_test

import (
"encoding/hex"
"fmt"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -149,6 +150,56 @@ func TestConsumeFromTimestampIntegration(t *testing.T) {
testutil.AssertArraysEquals(t, []string{"g", "h"}, messages)
}

func TestConsumeRegistryProtobufWithNestedDependenciesIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

bazMsg := `syntax = "proto3";
package baz;

message Baz {
string field = 1;
}
`

barMsg := `syntax = "proto3";
package bar;

import "baz/protobuf/baz.proto";

message Bar {
baz.Baz bazField = 1;
}
`

fooMsg := `syntax = "proto3";
package foo;

import "bar/protobuf/bar.proto";

message Foo {
bar.Bar barField = 1;
}`

value := `{"barField":{"bazField":{"field":"value"}}}`

testutil.RegisterSchema(t, "baz", bazMsg, srclient.Protobuf)
testutil.RegisterSchema(t, "bar", barMsg, srclient.Protobuf, srclient.Reference{Name: "baz/protobuf/baz.proto", Version: 1, Subject: "baz"})
topicName := testutil.CreateTopic(t, "consume-topic")
testutil.RegisterSchema(t, topicName+"-value", fooMsg, srclient.Protobuf, srclient.Reference{Name: "bar/protobuf/bar.proto", Version: 1, Subject: "bar"})

kafkaCtl := testutil.CreateKafkaCtlCommand()
if _, err := kafkaCtl.Execute("produce", topicName, "--key", "test-key", "--value", value); err != nil {
t.Fatalf("failed to execute command: %v", err)
}
testutil.AssertEquals(t, "message produced (partition=0\toffset=0)", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topicName, "--from-beginning", "--exit", "--print-keys"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, fmt.Sprintf("test-key#%s", value), kafkaCtl.GetStdOut())
}

func TestConsumeToTimestampIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

Expand Down
41 changes: 2 additions & 39 deletions internal/consume/RegistryProtobufMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/IBM/sarama"
"github.com/deviceinsight/kafkactl/v5/internal"
"github.com/deviceinsight/kafkactl/v5/internal/helpers/protobuf"
"github.com/deviceinsight/kafkactl/v5/internal/output"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/riferrei/srclient"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -64,12 +64,7 @@ func (deserializer RegistryProtobufMessageDeserializer) deserialize(rawData []by
return nil, err
}
output.Debugf("fetched schema %d", schemaID)
resolvedSchemas, err := deserializer.resolveDependencies(schema.References())
if err != nil {
return nil, err
}
resolvedSchemas["."] = schema.Schema()
fileDesc, err := deserializer.schemaToFileDescriptor(schema)
fileDesc, err := protobuf.SchemaToFileDescriptor(deserializer.registry, schema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,35 +130,3 @@ func findMessageDescriptor(indexes []int64, descriptors []*desc.MessageDescripto
}
return findMessageDescriptor(indexes[1:], descriptors)
}

func (deserializer RegistryProtobufMessageDeserializer) schemaToFileDescriptor(schema *srclient.Schema) (*desc.FileDescriptor, error) {
dependencies, err := deserializer.resolveDependencies(schema.References())
if err != nil {
return nil, err
}
dependencies["."] = schema.Schema()

return parseFileDescriptor(".", dependencies)
}

func (deserializer RegistryProtobufMessageDeserializer) resolveDependencies(references []srclient.Reference) (map[string]string, error) {
resolved := map[string]string{}
for _, r := range references {
latest, err := deserializer.registry.GetLatestSchema(r.Subject)
if err != nil {
return map[string]string{}, err
}
resolved[r.Subject] = latest.Schema()
}

return resolved, nil
}

func parseFileDescriptor(filename string, resolvedSchemas map[string]string) (*desc.FileDescriptor, error) {
parser := protoparse.Parser{Accessor: protoparse.FileContentsFromMap(resolvedSchemas)}
parsedFiles, err := parser.ParseFiles(filename)
if err != nil {
return nil, err
}
return parsedFiles[0], nil
}
33 changes: 24 additions & 9 deletions internal/helpers/protobuf/protobuf.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package protobuf

import (
"fmt"
"os"
"path/filepath"
"slices"
"strings"

"google.golang.org/protobuf/proto"
Expand All @@ -18,7 +18,7 @@ import (
)

func SchemaToFileDescriptor(registry srclient.ISchemaRegistryClient, schema *srclient.Schema) (*desc.FileDescriptor, error) {
dependencies, err := resolveDependencies(registry, schema.References())
dependencies, err := resolveSchemaDependencies(registry, schema)
if err != nil {
return nil, err
}
Expand All @@ -27,17 +27,32 @@ func SchemaToFileDescriptor(registry srclient.ISchemaRegistryClient, schema *src
return ParseFileDescriptor(".", dependencies)
}

func resolveDependencies(registry srclient.ISchemaRegistryClient, references []srclient.Reference) (map[string]string, error) {
resolved := map[string]string{}
func resolveSchemaDependencies(registry srclient.ISchemaRegistryClient, schema *srclient.Schema) (map[string]string, error) {
dependencies := map[string]string{}
err := resolveRefsDependencies(registry, dependencies, schema.References())
if err != nil {
return nil, err
}
return dependencies, nil
}

func resolveRefsDependencies(registry srclient.ISchemaRegistryClient, resolved map[string]string, references []srclient.Reference) error {
for _, r := range references {
refSchema, err := registry.GetSchemaByVersion(r.Subject, r.Version)
if _, ok := resolved[r.Name]; ok {
continue
}
latest, err := registry.GetSchemaByVersion(r.Subject, r.Version)
if err != nil {
return map[string]string{}, errors.Wrap(err, fmt.Sprintf("couldn't fetch latest schema for subject %s", r.Subject))
return err
}
resolved[r.Name] = latest.Schema()
err = resolveRefsDependencies(registry, resolved, latest.References())
if err != nil {
return err
}
resolved[r.Subject] = refSchema.Schema()
}

return resolved, nil
return nil
}

func ParseFileDescriptor(filename string, resolvedSchemas map[string]string) (*desc.FileDescriptor, error) {
Expand Down Expand Up @@ -92,7 +107,7 @@ func makeDescriptors(context internal.ProtobufConfig) []*desc.FileDescriptor {
var ret []*desc.FileDescriptor

ret = appendProtosets(ret, context.ProtosetFiles)
importPaths := append([]string{}, context.ProtoImportPaths...)
importPaths := slices.Clone(context.ProtoImportPaths)

// extend import paths with existing files directories
// this allows to specify only proto file path
Expand Down
30 changes: 12 additions & 18 deletions internal/testutil/helpers.go
6D40
Original file line numberDiff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
)

func CreateTopic(t *testing.T, topicPrefix string, flags ...string) string {

kafkaCtl := CreateKafkaCtlCommand()
topicName := GetPrefixedName(topicPrefix)

Expand All @@ -36,9 +35,19 @@ func CreateTopic(t *testing.T, topicPrefix string, flags ...string) string {
return topicName
}

func CreateTopicWithSchema(t *testing.T, topicPrefix, keySchema, valueSchema string, schemaType srclient.SchemaType,
flags ...string) string {
func RegisterSchema(t *testing.T, subjectName, schema string, schemaType srclient.SchemaType, references ...srclient.Reference) {
schemaRegistry := srclient.NewSchemaRegistryClient("http://localhost:18081")

if schema, err := schemaRegistry.CreateSchema(subjectName, schema, schemaType, references...); err != nil {
t.Fatalf("unable to register schema for value: %v", err)
} else {
output.TestLogf("registered schema %q with ID=%d", schema, schema.ID())
}
}

func CreateTopicWithSchema(t *testing.T, topicPrefix, keySchema, valueSchema string, schemaType srclient.SchemaType,
flags ...string,
) string {
topicName := CreateTopic(t, topicPrefix, flags...)

schemaRegistry := srclient.NewSchemaRegistryClient("http://localhost:18081")
Expand All @@ -63,12 +72,10 @@ func CreateTopicWithSchema(t *testing.T, topicPrefix, keySchema, valueSchema str
}

func VerifyTopicExists(t *testing.T, topic string) {

kafkaCtl := CreateKafkaCtlCommand()

findTopic := func(_ uint) error {
_, err := kafkaCtl.Execute("get", "topics", "-o", "compact")

if err != nil {
return err
}
Expand All @@ -84,7 +91,6 @@ func VerifyTopicExists(t *testing.T, topic string) {
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not find topic %s: %v", topic, err)
}
Expand All @@ -95,7 +101,6 @@ func VerifyTopicExists(t *testing.T, topic string) {
}

func CreateConsumerGroup(t *testing.T, groupPrefix string, topics ...string) string {

kafkaCtl := CreateKafkaCtlCommand()
groupName := GetPrefixedName(groupPrefix)

Expand All @@ -117,7 +122,6 @@ func CreateConsumerGroup(t *testing.T, groupPrefix string, topics ...string) str
}

func ProduceMessage(t *testing.T, topic, key, value string, expectedPartition, expectedOffset int64) {

kafkaCtl := CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topic, "--key", key, "--value", value); err != nil {
Expand All @@ -128,7 +132,6 @@ func ProduceMessage(t *testing.T, topic, key, value string, expectedPartition, e
}

func ProduceMessageOnPartition(t *testing.T, topic, key, value string, partition int32, expectedOffset int64) {

kafkaCtl := CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("produce", topic, "--key", key, "--value", value, "--partition", strconv.FormatInt(int64(partition), 10)); err != nil {
Expand All @@ -139,12 +142,10 @@ func ProduceMessageOnPartition(t *testing.T, topic, key, value string, partition
}

func VerifyGroupExists(t *testing.T, group string) {

kafkaCtl := CreateKafkaCtlCommand()

findConsumerGroup := func(_ uint) error {
_, err := kafkaCtl.Execute("get", "cg", "-o", "compact")

if err != nil {
return err
}
Expand All @@ -160,7 +161,6 @@ func VerifyGroupExists(t *testing.T, group string) {
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not find group %s: %v", group, err)
}
Expand All @@ -171,14 +171,12 @@ func VerifyGroupExists(t *testing.T, group string) {
}

func VerifyConsumerGroupOffset(t *testing.T, group, topic string, expectedConsumerOffset int) {

kafkaCtl := CreateKafkaCtlCommand()

consumerOffsetRegex, _ := regexp.Compile(`consumerOffset: (\d)`)

verifyConsumerOffset := func(_ uint) error {
_, err := kafkaCtl.Execute("describe", "cg", group, "--topic", topic, "-o", "yaml")

if err != nil {
return err
}
Expand All @@ -200,21 +198,18 @@ func VerifyConsumerGroupOffset(t *testing.T, group, topic string, expectedConsum
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("failed to verify offset for group=%s topic=%s: %v", group, topic, err)
}
}

func VerifyTopicNotInConsumerGroup(t *testing.T, group, topic string) {

kafkaCtl := CreateKafkaCtlCommand()

emptyTopicsRegex, _ := regexp.Compile(`topics: \[]`)

verifyTopicNotInGroup := func(_ uint) error {
_, err := kafkaCtl.Execute("describe", "cg", group, "--topic", topic, "-o", "yaml")

if err != nil {
return err
}
Expand All @@ -230,7 +225,6 @@ func VerifyTopicNotInConsumerGroup(t *testing.T, group, topic string) {
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("failed to verify topic=%s not in group=%s: %v", topic, group, err)
}
Expand Down
0