8000 Challenges with File Pulse Connector for Real-Time Log Streaming from Kubernetes: Meeting Requirements for Incremental Updates and Dynamic File Monitoring · Issue #661 · streamthoughts/kafka-connect-file-pulse · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
Challenges with File Pulse Connector for Real-Time Log Streaming from Kubernetes: Meeting Requirements for Incremental Updates and Dynamic File Monitoring #661
Open
@max533

Description

@max533

Provide details of the setup you're running

File Pulse Connector Version 2.14.1
Docker Version 20.10.13

Outline your question

I am currently working on using File Pulse Connector to achieve real-time log streaming from Kubernetes container applications. However, I am encountering difficulties in meeting the following requirements simultaneously, or it appears that a very large number of Kafka Connect workers may be needed to achieve this:

I have also reviewed related discussions on Stack Overflow and GitHub regarding these issues:

But to ensure I’m not making any configuration errors due to unfamiliarity, I’d like to consult with an expert to confirm whether these requirements can be met or if Kafka Connect might not be suitable for this use case.

Here are the requirements I need to achieve:

  1. Log files must be continuously monitored. When new entries are added to the log files—at irregular intervals (e.g., once a day or not at all)—the connector should only send the newly added portions to the Kafka topic, rather than re-sending the entire file content.
  2. The number of log files may increase significantly, potentially reaching hundreds or thousands. The system should be able to detect new files and automatically start monitoring them.

Below is my Kafka Connect file pulse connector configuration file

{
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "filters": "GroupMultilineException, ParseLog4jLog, AppendNode",
  "filters.GroupMultilineException.negate": "false",
  "filters.GroupMultilineException.pattern": "^[\\t]",
  "filters.GroupMultilineException.type": "io.streamthoughts.kafka.connect.filepulse.filter.MultiRowFilter",
  "filters.ParseLog4jLog.pattern": "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}",
  "filters.ParseLog4jLog.overwrite": "message",
  "filters.ParseLog4jLog.source": "message",
  "filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
  "filters.ParseLog4jLog.ignoreFailure": "true",
  "filters.AppendNode.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.AppendNode.field": "$.nodeName",
  "filters.AppendNode.value": "{{ extract_array( split($metadata.path, '/'), 3) }}",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
  "fs.cleanup.policy.triggered.on":"COMMITTED",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
  "fs.listing.directory.path": "/mnt/log",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.listing.interval.ms": "10000",
  "file.filter.regex.pattern":".*\\.log$",
  "offset.policy.class":"io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
  "offset.attributes.string": "hash",
  "read.max.wait.ms": "86400000",
  "ignore.committed.offsets": "false",
  "topic": "connect-file-pulse-quickstart-log4j",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
  "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
  "tasks.file.status.storage.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
  "tasks.file.status.storage.topic": "connect-file-pulse-status",
  "tasks.file.status.storage.topic.partitions": 10,
  "tasks.file.status.storage.topic.replication.factor": 1,
  "tasks.max": 1
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0