8000 Reading the S3 file metadata and stream only filename to the topic. · Issue #718 · streamthoughts/kafka-connect-file-pulse · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Reading the S3 file metadata and stream only filename to the topic. #718
Open
@saurabhpolshettiwar

Description

@saurabhpolshettiwar

Search for existing answers
Many questions have been asked before. Please have a quick search to see if there is already an answer to your question.
There are two main resources for finding answers:

  1. Existing FilePulse Github issues, including closed issues

If you can't see an answer to your question, then please raise a new Github or Stackoverflow issue.

Provide details of the setup you're running
2.9.0

Outline your question
Reading the S3 file metadata and stream only filename to the topic.

I am seeing issue with reading the S3 file metadata and stream only filename to the topic.

Error Logs:

org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic XXXXXXXXX:
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"file_name","type":["null","string"],"default":null},{"name":"message","type":["null","bytes"],"default":null},{"name":"id","type":["null","string"],"default":null}]}

Connector configuration:

						"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
						"topic": "business.direct-banking.cross-product.production-management.production-status-kakfa-connector.file-uploaded",
						"tasks.max": "1",
						"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3BytesArrayInputReader",
						"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",

						"log4j.logger.io.streamthoughts.kafka.connect.filepulse": "DEBUG",
						"log4j.logger.io.streamthoughts.kafka.connect.filepulse.offset": "TRACE",
						
						"file.filter.regex.pattern": ".*",
						"fs.listing.interval.ms": 30000,
						"buffer.initial.bytes.size": 30000,
						"s3.include.metadata": "true",
						"read.max.wait.ms": 10000,
 
						"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
 
 
						"tasks.file.status.storage.bootstrap.servers": "XXXXXXX",
						"tasks.file.status.storage.topic": "connect-status-l77e04b0ebb9a84018a6fbc1626bca578f",
						"task.file.content.extract.enabled" : "false",
						"value.converter.schemas.enable": "false",
						 "value.converter.auto.register.schemas": "false",
 
						"errors.log.include.messages": "true",
						"errors.log.enable": "true",
						
						"filters":"AppendFilter1",
						"filters.AppendFilter1.type" : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
						"filters.AppendFilter1.field" : "$.file_name",
						"filters.AppendFilter1.value" : "abc.json",
					
 
						"behavior.on.null.values":"ignore",
						"value.converter.schema.registry.url": "XXXXX",
						"log4j.logger.io.confluent.kafka.serializers":"DEBUG"
						
	}';

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0