8000 Promise[Throwable] fullfilled multiple times in KafkaProducer · Issue #1391 · fd4s/fs2-kafka · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Promise[Throwable] fullfilled multiple times in KafkaProducer #1391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ex-ratt opened this issue Feb 27, 2025 · 1 comment
Closed

Promise[Throwable] fullfilled multiple times in KafkaProducer #1391

ex-ratt opened this issue Feb 27, 2025 · 1 comment

Comments

@ex-ratt
Copy link
Contributor
ex-ratt commented Feb 27, 2025

I made some tests regarding rebalancing, where I used KafkaConsumer.partitionsMapStream with Stream.switchMap to interrupt the processing of streams whenever a rebalance happens (to prevent processing buffered records that may have been assigned to a different consumer). In one test, the interruption just happened to be in the moment that some records were being (transactionally) produced. For each producer, there were n records to produce and (n - 1) such exceptions:

11:21:04.522 ERROR: Error executing user-provided callback on message for topic-partition 'test-pstate-output-1'
java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise.complete(Promise.scala:57)
    at scala.concurrent.Promise.complete$(Promise.scala:56)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
    at scala.concurrent.Promise.failure(Promise.scala:109)
    at scala.concurrent.Promise.failure$(Promise.scala:109)
    at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
    at fs2.kafka.KafkaProducer$.$anonfun$produceRecord$7(KafkaProducer.scala:221)
    at scala.Option.foreach(Option.scala:437)
    at fs2.kafka.KafkaProducer$.$anonfun$produceRecord$6(KafkaProducer.scala:221)
    at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1567)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
    at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:199)
    at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1161)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:473)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:336)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
    at java.lang.Thread.run(Thread.java:1575)

In KafkaProducer.produce there is a flag called failFastProduce, which should cancel producing records if there is an exception. For this reason, a Promise is created on line 188 and passed to KafkaProducer.produceRecord on line 182 for each record that should be produced. In my test case, each record production attempt completes this same promise with an exception, and each but the first one produces the IllegalStateException seen above.

It suppose all records were sent to the output buffer of the org.apache.kafka.clients.producer.KafkaProducer, thus n callbacks were waiting for the send to be acknowledged. The cancellation caused by switchMap interrupted the sending and invoked each callback with an InterruptedException (I guess). Each of those callbacks then completed the promise with the exception, which lead to an IllegalStateException in all but the first completion attempt.

Maybe it would be enough to use tryFailure instead of failure on line 221?

Just in case you are curious, this is roughly what I did in my test (with multiple instances delayed by different amounts to force rebalances):

Stream.resource(consumer(groupId))
  .evalTap(_.subscribeTo(inputTopic))
  .flatMap(_.partitionsMapStream)
  .switchMap { assignedStreams =>
    val streams = assignedStreams.map { case (tp, inputStream) =>
      Stream.resource(producer(s"$transactionId-${tp.partition()}"))
       .flatMap { producer =>
          inputStream
            .metered(1.second)
            .map(processRecord)
            .groupWithin(chunkSize, chunkTimeout)
            .evalMap(producer.produce)
            .void
        }
        .delayBy(100.milliseconds)
    }
    streams.toList.parJoinUnbounded
  }
  .interruptAfter(duration)
  .delayBy(delay)
  .compile
  .drain
@aartigao
Copy link
Contributor
aartigao commented Mar 9, 2025

Maybe it would be enough to use tryFailure instead of failure on line 221?

Yes, that seems to me the best solution 👍🏽

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants
0