8000 Minor Improvements to ZStream#tapSink by adamgfraser · Pull Request #6211 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Minor Improvements to ZStream#tapSink #6211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to 8000
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/shared/src/main/scala/zio/ZHub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ object ZHub {
ZIO
.whenZIO(shutdownHook.succeed(())) {
ZIO.foreachPar(unsafePollAll(pollers))(_.interruptAs(fiberId)) *>
ZIO.succeed(subscription.unsubscribe())
ZIO.succeed(subscription.unsubscribe()) *>
ZIO.succeed(strategy.unsafeOnHubEmptySpace(hub, subscribers))
}
.unit
}.uninterruptible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3319,7 +3319,7 @@ object ZStreamSpec extends ZIOBaseSpec {
for {
ref <- Ref.make(0)
sink = ZSink.foreach[Any, Nothing, Int](n => ref.update(_ + n))
stream = ZStream(1, 1, 2, 3, 5, 8).tapSink(sink, 8)
stream = ZStream(1, 1, 2, 3, 5, 8).tapSink(sink)
elements <- stream.runCollect
done <- ref.get
} yield assertTrue(elements == Chunk(1, 1, 2, 3, 5, 8) && done == 20)
Expand All @@ -3328,7 +3328,7 @@ object ZStreamSpec extends ZIOBaseSpec {
for {
ref <- Ref.make(0)
sink = ZSink.take[Int](3).map(_.sum).mapZIO(n => ref.update(_ + n))
stream = ZStream(1, 1, 2, 3, 5, 8).tapSink(sink, 8)
stream = ZStream(1, 1, 2, 3, 5, 8).tapSink(sink)
elements <- stream.runCollect
done <- ref.get
} yield assertTrue(elements == Chunk(1, 1, 2, 3, 5, 8) && done == 4)
Expand All @@ -3337,7 +3337,7 @@ object ZStreamSpec extends ZIOBaseSpec {
for {
ref <- Ref.make(0)
sink = ZSink.fail("error")
stream = ZStream.never.tapSink(sink, 8)
stream = ZStream.never.tapSink(sink)
error <- stream.runCollect.flip
} yield assertTrue(error == "error")
}
Expand Down
13 changes: 7 additions & 6 deletions streams/shared/src/main/scala/zio/stream/ZStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2576,7 +2576,7 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A
)(implicit
trace: ZTraceElement
): ZStream[R1, E1, A] =
self.mergeEither(that).collectLeft
self.merge(that.drain)

/**
* Merges this stream and the specified stream together, discarding the values
Expand All @@ -2588,7 +2588,7 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A
)(implicit
trace: ZTraceElement
): ZStream[R1, E1, A2] =
self.mergeEither(that).collectRight
self.drain.merge(that)

/**
* Merges this stream and the specified stream together to a common element
Expand Down Expand Up @@ -3475,11 +3475,12 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A
* to emitting them.
*/
final def tapSink[R1 <: R, E1 >: E](
sink: => ZSink[R1, E1, A, Any, Any],
maximumLag: => Int
sink: => ZSink[R1, E1, A, Any, Any]
)(implicit trace: ZTraceElement): ZStream[R1, E1, A] =
ZStream.managed(broadcast(2, maximumLag)).flatMap { streams =>
streams(0).mergeLeft(ZStream.fromZIO(streams(1).run(sink)))
ZStream.managed(broadcastedQueues(2, 1)).flatMap { queues =>
val left = ZStream.fromQueue(queues(0)).flattenTake
val right = ZStream.fromQueueWithShutdown(queues(1)).flattenTake
left.merge(ZStream.execute(right.run(sink)))
}

/**
Expand Down
0