Closed
Description
With certain setup, a stream seems to block for no reason by just adding interruptWhen(promise)
with a completely unrelated promise
to another blocking stream, when those two streams were intertwined within ZStream.unwrap()
. See the reproducer below.
Reproducer
//> using dep dev.zio::zio:2.1.7
//> using dep dev.zio::zio-streams:2.1.7
import zio.*
import zio.stream.*
object Main extends ZIOAppDefault {
def run = for {
requestQueue <- Queue.unbounded[String]
totallyUnrelatedPromise <- Promise.make[Throwable, Unit]
_ <- ZStream
.unwrap(
ZStream
.fromQueue(requestQueue)
.runForeach(msg => ZIO.debug(s"message received: $msg"))
.fork
.as(ZStream.succeed("") ++ ZStream.never)
)
// commenting and uncommenting the line below changes the behavior
.interruptWhen(totallyUnrelatedPromise)
.runDrain
.fork
_ <- requestQueue.offer("some message").delay(250.millis).forever
} yield ()
}
@ scala-cli run repro.scala # When `.interruptWhen(...)` is commented out: works as intended
message received: some message
message received: some message
message received: some message
message received: some message
message received: some message
...
^C
@ scala-cli run repro.scala # When `.interruptWhen(...)` is NOT commented out: blocks indefinitely
^C
Related issues: