From f4a4031118ff8b2b57b0584cc445b43a40a787a4 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sat, 7 Sep 2024 18:15:52 +0300 Subject: [PATCH 1/2] Inherit children created by stream merging --- core/shared/src/main/scala/zio/Fiber.scala | 7 +++++++ .../scala/zio/internal/FiberRuntime.scala | 6 ++++++ .../test/scala/zio/stream/ZStreamSpec.scala | 21 ++++++++++++++++++- .../src/main/scala/zio/stream/ZChannel.scala | 9 ++++++-- 4 files changed, 40 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/zio/Fiber.scala b/core/shared/src/main/scala/zio/Fiber.scala index eca2a9c0e823..f8d9141c051b 100644 --- a/core/shared/src/main/scala/zio/Fiber.scala +++ b/core/shared/src/main/scala/zio/Fiber.scala @@ -667,6 +667,13 @@ object Fiber extends FiberPlatformSpecific { */ private[zio] def transferChildren(scope: FiberScope): Unit + /** + * Transfers all children of the fiber executing the effect to this fiber + * + * '''NOTE''': This method must be invoked at the end of the child fiber's + * execution + */ + private[zio] def transplantChildren(implicit trace: Trace): UIO[Unit] } private[zio] object Runtime { diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index b1a8b51cfdae..2b15c6c5e4b4 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -1477,6 +1477,12 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } + private[zio] def transplantChildren(implicit trace: Trace): UIO[Unit] = + ZIO.withFiberRuntime[Any, Nothing, Unit] { (currentFiber, _) => + currentFiber.transferChildren(self.scope) + Exit.unit + } + /** * Updates a fiber ref belonging to this fiber by using the provided update * function. diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala index 8d1234a89933..d831da40983c 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala @@ -2371,7 +2371,26 @@ object ZStreamSpec extends ZIOBaseSpec { s3 = s1.zipLatest(s2).interruptWhen(ZIO.never).take(3) _ <- s3.runDrain } yield assertCompletes - } @@ exceptJS(nonFlaky) + } @@ exceptJS(nonFlaky), + test("i9091 - forked children are not interrupted early by interruptWhen") { + for { + requestQueue <- Queue.unbounded[String] + counter <- Ref.make(0) + _ <- ZStream + .unwrapScoped( + ZStream + .fromQueue(requestQueue) + .runForeach(_ => counter.update(_ + 1)) + .fork + .as(ZStream.succeed("") ++ ZStream.never) + ) + .interruptWhen(ZIO.never) + .runDrain + .fork + _ <- requestQueue.offer("some message").forever.fork + _ <- counter.get.repeatUntil(_ >= 10) + } yield assertCompletes + } @@ exceptJS(nonFlaky) @@ TestAspect.timeout(10.seconds) ), suite("interruptAfter")( test("interrupts after given duration") { diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index cfd8345a61c4..12936fb48ce3 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -968,8 +968,13 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon } } - ZChannel - .fromZIO(pullL.forkIn(scope).zipWith(pullR.forkIn(scope))(BothRunning(_, _, true): MergeState)) + ZChannel.fromZIO { + ZIO.withFiberRuntime[Env1, Nothing, MergeState] { (parent, _) => + val fL = pullL.ensuring(parent.transplantChildren).forkIn(scope) + val fR = pullR.ensuring(parent.transplantChildren).forkIn(scope) + fL.zipWith(fR)(BothRunning(_, _, true)) + } + } .flatMap(go) .embedInput(input) } From ab921ef4caa28779e5f9f684fe6afd15648b10bc Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Sat, 7 Sep 2024 18:20:10 +0300 Subject: [PATCH 2/2] Cleanup --- core/shared/src/main/scala/zio/Fiber.scala | 7 ------- .../src/main/scala/zio/internal/FiberRuntime.scala | 6 ------ .../shared/src/main/scala/zio/stream/ZChannel.scala | 12 ++++++++++-- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/shared/src/main/scala/zio/Fiber.scala b/core/shared/src/main/scala/zio/Fiber.scala index f8d9141c051b..eca2a9c0e823 100644 --- a/core/shared/src/main/scala/zio/Fiber.scala +++ b/core/shared/src/main/scala/zio/Fiber.scala @@ -667,13 +667,6 @@ object Fiber extends FiberPlatformSpecific { */ private[zio] def transferChildren(scope: FiberScope): Unit - /** - * Transfers all children of the fiber executing the effect to this fiber - * - * '''NOTE''': This method must be invoked at the end of the child fiber's - * execution - */ - private[zio] def transplantChildren(implicit trace: Trace): UIO[Unit] } private[zio] object Runtime { diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 2b15c6c5e4b4..b1a8b51cfdae 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -1477,12 +1477,6 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } } - private[zio] def transplantChildren(implicit trace: Trace): UIO[Unit] = - ZIO.withFiberRuntime[Any, Nothing, Unit] { (currentFiber, _) => - currentFiber.transferChildren(self.scope) - Exit.unit - } - /** * Updates a fiber ref belonging to this fiber by using the provided update * function. diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 12936fb48ce3..532e8d93443f 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -4,6 +4,7 @@ import zio.{ZIO, _} import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.internal.{AsyncInputConsumer, AsyncInputProducer, ChannelExecutor, SingleProducerAsyncInput} import ChannelExecutor.ChannelState +import zio.internal.FiberScope /** * A `ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]` is a nexus @@ -968,10 +969,17 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon } } + def inheritChildren(parentScope: FiberScope): UIO[Unit] = + ZIO.withFiberRuntime[Any, Nothing, Unit] { (state, _) => + state.transferChildren(parentScope) + Exit.unit + } + ZChannel.fromZIO { ZIO.withFiberRuntime[Env1, Nothing, MergeState] { (parent, _) => - val fL = pullL.ensuring(parent.transplantChildren).forkIn(scope) - val fR = pullR.ensuring(parent.transplantChildren).forkIn(scope) + val inherit = inheritChildren(parent.scope) + val fL = pullL.ensuring(inherit).forkIn(scope) + val fR = pullR.ensuring(inherit).forkIn(scope) fL.zipWith(fR)(BothRunning(_, _, true)) } }