Description
Search for existing answers
Many questions have been asked before. Please have a quick search to see if there is already an answer to your question.
There are two main resources for finding answers:
If you can't see an answer to your question, then please raise a new Github or Stackoverflow issue.
Provide details of the setup you're running
2.9.0
Outline your question
Reading the S3 file metadata and stream only filename to the topic.
I am seeing issue with reading the S3 file metadata and stream only filename to the topic.
Error Logs:
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic XXXXXXXXX:
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"file_name","type":["null","string"],"default":null},{"name":"message","type":["null","bytes"],"default":null},{"name":"id","type":["null","string"],"default":null}]}
Connector configuration:
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"topic": "business.direct-banking.cross-product.production-management.production-status-kakfa-connector.file-uploaded",
"tasks.max": "1",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3BytesArrayInputReader",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
"log4j.logger.io.streamthoughts.kafka.connect.filepulse": "DEBUG",
"log4j.logger.io.streamthoughts.kafka.connect.filepulse.offset": "TRACE",
"file.filter.regex.pattern": ".*",
"fs.listing.interval.ms": 30000,
"buffer.initial.bytes.size": 30000,
"s3.include.metadata": "true",
"read.max.wait.ms": 10000,
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.bootstrap.servers": "XXXXXXX",
"tasks.file.status.storage.topic": "connect-status-l77e04b0ebb9a84018a6fbc1626bca578f",
"task.file.content.extract.enabled" : "false",
"value.converter.schemas.enable": "false",
"value.converter.auto.register.schemas": "false",
"errors.log.include.messages": "true",
"errors.log.enable": "true",
"filters":"AppendFilter1",
"filters.AppendFilter1.type" : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.AppendFilter1.field" : "$.file_name",
"filters.AppendFilter1.value" : "abc.json",
"behavior.on.null.values":"ignore",
"value.converter.schema.registry.url": "XXXXX",
"log4j.logger.io.confluent.kafka.serializers":"DEBUG"
}';