Description
Provide details of the setup you're running
File Pulse Connector Version 2.14.1
Docker Version 20.10.13
Outline your question
I am currently working on using File Pulse Connector to achieve real-time log streaming from Kubernetes container applications. However, I am encountering difficulties in meeting the following requirements simultaneously, or it appears that a very large number of Kafka Connect workers may be needed to achieve this:
I have also reviewed related discussions on Stack Overflow and GitHub regarding these issues:
- Schedule new files while task is not completed. #195
- RowFileInputReader does not tail log files #81
- https://stackoverflow.com/questions/70558402/filepulse-sourceconnector
- https://stackoverflow.com/questions/65533732/kafka-connector-not-behaving-as-expected
But to ensure I’m not making any configuration errors due to unfamiliarity, I’d like to consult with an expert to confirm whether these requirements can be met or if Kafka Connect might not be suitable for this use case.
Here are the requirements I need to achieve:
- Log files must be continuously monitored. When new entries are added to the log files—at irregular intervals (e.g., once a day or not at all)—the connector should only send the newly added portions to the Kafka topic, rather than re-sending the entire file content.
- The number of log files may increase significantly, potentially reaching hundreds or thousands. The system should be able to detect new files and automatically start monitoring them.
Below is my Kafka Connect file pulse connector configuration file
{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"filters": "GroupMultilineException, ParseLog4jLog, AppendNode",
"filters.GroupMultilineException.negate": "false",
"filters.GroupMultilineException.pattern": "^[\\t]",
"filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
"filters.ParseLog4jLog.pattern": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
"filters.ParseLog4jLog.overwrite": "message",
"filters.ParseLog4jLog.source": "message",
"filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
"filters.ParseLog4jLog.ignoreFailure": "true",
"filters.AppendNode.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.AppendNode.field": "$.nodeName",
"filters.AppendNode.value": "{{ extract_array( split($metadata.path, '/'), 3) }}",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"fs.cleanup.policy.triggered.on":"COMMITTED",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/mnt/log",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"fs.listing.interval.ms": "10000",
"file.filter.regex.pattern":".*\\.log$",
"offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
"offset.attributes.string": "hash",
"read.max.wait.ms": "86400000",
"ignore.committed.offsets": "false",
"topic": "connect-file-pulse-quickstart-log4j",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"tasks.file.status.storage.topic.partitions": 10,
"tasks.file.status.storage.topic.replication.factor": 1,
"tasks.max": 1
}