8000 GitHub - BurakG01/kafka-konsumer: Easy implementation of kafka consumer with built-in exception manager (kafka-cronsumer)
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

BurakG01/kafka-konsumer

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Konsumer 🔨Build And Test 🔨IntegrationTest

Description

Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).

Guide

Installation

go get github.com/Trendyol/kafka-konsumer@latest

Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Simple Consumer
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        ConsumeFn:    consumeFn,
        RetryEnabled: false,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
Simple Consumer With Retry/Exception Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        ConsumeFn: consumeFn,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
         
89E5
   GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            MessageGroupDuration: time.Second,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil
}

With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.

grafana

With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.

Configurations

config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
commitInterval indicates the interval at which offsets are committed to the broker. 1s
rack see doc
clientId see doc
dial.Timeout see doc no timeout
dial.KeepAlive see doc not enabled
transport.DialTimeout see doc 5s
transport.IdleTimeout see doc 30s
transport.MetadataTTL see doc 6s
transport.MetadataTopics see doc all topics in cluster
retryConfiguration.clientId see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.messageGroupDuration Maximum time to wait for a batch
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics

Monitoring

Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics

Metric Name Description Value Type
kafka_konsumer_processed_messages_total_current Total number of processed messages. Counter
kafka_konsumer_unprocessed_messages_total_current Total number of unprocessed messages. Counter
kafka_konsumer_processed_batch_messages_total_current Total number of processed batch messages. Counter
kafka_konsumer_unprocessed_batch_messages_total_current Total number of unprocessed batch messages. Counter

NOTE: kafka_konsumer_processed_batch_messages_total_current and kafka_konsumer_unprocessed_batch_messages_total_current will be deprecated in the next releases. Please use kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current instead.

About

Easy implementation of kafka consumer with built-in exception manager (kafka-cronsumer)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 97.5%
  • Makefile 2.5%
0