Open
Description
Hello,
I have KafkaConnect with 2 workers and FilePulse connector with 4 tasks. During the rolling restart of the workers, connector stops cleaning up files from S3. At the same time once the restart is finished, new files are processed and cleaned up successfully.
All not-cleaned files have the status "COMMITTED" in the status topic.
here is my connector config:
aws.s3.region: "us-east-1"
aws.s3.bucket.name: "test-xxxxx-data-us-east-1"
aws.s3.bucket.prefix: "emails/"
topic: "test-xxxxx-data"
fs.listing.class: io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing
fs.listing.interval.ms: 5000
fs.cleanup.policy.class: io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy
fs.cleanup.policy.move.success.aws.prefix.path: "success"
fs.cleanup.policy.move.failure.aws.prefix.path: "failure"
allow.tasks.reconfiguration.after.timeout.ms: 120000
tasks.reader.class: io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3BytesArrayInputReader
tasks.file.status.storage.bootstrap.servers: kafka-cluster-kafka-bootstrap:9093
tasks.file.status.storage.topic: test-xxxxx-data-status-internal
tasks.file.status.storage.topic.partitions: 10
tasks.file.status.storage.topic.replication.factor: 3
There are no errors in the logs.
Just a thought, is it possible/makes sense to check file status by AmazonS3MoveCleanupPolicy retrospectively and if it has "COMMITTED" status clean it up?
Metadata
Metadata
Assignees
Labels
No labels