You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I made some tests regarding rebalancing, where I used KafkaConsumer.partitionsMapStream with Stream.switchMap to interrupt the processing of streams whenever a rebalance happens (to prevent processing buffered records that may have been assigned to a different consumer). In one test, the interruption just happened to be in the moment that some records were being (transactionally) produced. For each producer, there were n records to produce and (n - 1) such exceptions:
11:21:04.522 ERROR: Error executing user-provided callback on message for topic-partition 'test-pstate-output-1'
java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise.complete(Promise.scala:57)
at scala.concurrent.Promise.complete$(Promise.scala:56)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:104)
at scala.concurrent.Promise.failure(Promise.scala:109)
at scala.concurrent.Promise.failure$(Promise.scala:109)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:104)
at fs2.kafka.KafkaProducer$.$anonfun$produceRecord$7(KafkaProducer.scala:221)
at scala.Option.foreach(Option.scala:437)
at fs2.kafka.KafkaProducer$.$anonfun$produceRecord$6(KafkaProducer.scala:221)
at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1567)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:199)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1161)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:473)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:336)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:1575)
In KafkaProducer.produce there is a flag called failFastProduce, which should cancel producing records if there is an exception. For this reason, a Promise is created on line 188 and passed to KafkaProducer.produceRecord on line 182 for each record that should be produced. In my test case, each record production attempt completes this same promise with an exception, and each but the first one produces the IllegalStateException seen above.
It suppose all records were sent to the output buffer of the org.apache.kafka.clients.producer.KafkaProducer, thus n callbacks were waiting for the send to be acknowledged. The cancellation caused by switchMap interrupted the sending and invoked each callback with an InterruptedException (I guess). Each of those callbacks then completed the promise with the exception, which lead to an IllegalStateException in all but the first completion attempt.
Maybe it would be enough to use tryFailure instead of failure on line 221?
Just in case you are curious, this is roughly what I did in my test (with multiple instances delayed by different amounts to force rebalances):
I made some tests regarding rebalancing, where I used
KafkaConsumer.partitionsMapStream
withStream.switchMap
to interrupt the processing of streams whenever a rebalance happens (to prevent processing buffered records that may have been assigned to a different consumer). In one test, the interruption just happened to be in the moment that some records were being (transactionally) produced. For each producer, there were n records to produce and (n - 1) such exceptions:In KafkaProducer.produce there is a flag called
failFastProduce
, which should cancel producing records if there is an exception. For this reason, aPromise
is created on line 188 and passed toKafkaProducer.produceRecord
on line 182 for each record that should be produced. In my test case, each record production attempt completes this same promise with an exception, and each but the first one produces theIllegalStateException
seen above.It suppose all records were sent to the output buffer of the
org.apache.kafka.clients.producer.KafkaProducer
, thus n callbacks were waiting for the send to be acknowledged. The cancellation caused byswitchMap
interrupted the sending and invoked each callback with anInterruptedException
(I guess). Each of those callbacks then completed the promise with the exception, which lead to anIllegalStateException
in all but the first completion attempt.Maybe it would be enough to use
tryFailure
instead offailure
on line 221?Just in case you are curious, this is roughly what I did in my test (with multiple instances delayed by different amounts to force rebalances):
The text was updated successfully, but these errors were encountered: