From 331747fc91435768dd2bbba62024b50824d4673b Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 22 Apr 2021 10:31:43 -0700 Subject: [PATCH 1/8] initial work --- .../src/main/scala/zio/ManagedAspect.scala | 5 ++ core/shared/src/main/scala/zio/ZAspect.scala | 57 +++++++++++++++++++ core/shared/src/main/scala/zio/ZIO.scala | 6 ++ core/shared/src/main/scala/zio/ZManaged.scala | 6 ++ .../main/scala/zio/stream/StreamAspect.scala | 17 ++++++ .../src/main/scala/zio/stream/ZStream.scala | 6 ++ 6 files changed, 97 insertions(+) create mode 100644 core/shared/src/main/scala/zio/ManagedAspect.scala create mode 100644 core/shared/src/main/scala/zio/ZAspect.scala create mode 100644 streams/shared/src/main/scala/zio/stream/StreamAspect.scala diff --git a/core/shared/src/main/scala/zio/ManagedAspect.scala b/core/shared/src/main/scala/zio/ManagedAspect.scala new file mode 100644 index 000000000000..e2468995abf1 --- /dev/null +++ b/core/shared/src/main/scala/zio/ManagedAspect.scala @@ -0,0 +1,5 @@ +package zio + +trait ManagedAspect[-R, +E] { + def apply[R1 <: R, E1 >: E, A](managed: ZManaged[R1, E1, A]): ZManaged[R1, E1, A] +} diff --git a/core/shared/src/main/scala/zio/ZAspect.scala b/core/shared/src/main/scala/zio/ZAspect.scala new file mode 100644 index 000000000000..b617a5b0da64 --- /dev/null +++ b/core/shared/src/main/scala/zio/ZAspect.scala @@ -0,0 +1,57 @@ +package zio + +import zio.internal.Executor +import scala.concurrent.ExecutionContext + +trait ZAspect[-R, +E] { + def apply[R1 <: R, E1 >: E, A](zio: ZIO[R1, E1, A]): ZIO[R1, E1, A] +} + +object ZAspect { + + /** + * An aspect that prints the results of effects to the console for debugging + * purposes. + */ + val debug: ZAspect[Any, Nothing] = + new ZAspect[Any, Nothing] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.debug + } + + /** + * As aspect that runs effects on the specified `Executor`. + */ + def lock(executor: Executor): ZAspect[Any, Nothing] = + new ZAspect[Any, Nothing] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.lock(executor) + } + + /** + * As aspect that runs effects on the specified `ExecutionContext`. + */ + def on(ec: ExecutionContext): ZAspect[Any, Nothing] = + new ZAspect[Any, Nothing] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.on(ec) + } + + /** + * As aspect that enables tracing for effects. + */ + val traced: ZAspect[Any, Nothing] = + new ZAspect[Any, Nothing] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.traced + } + + /** + * As aspect that disables tracing for effects. + */ + val untraced: ZAspect[Any, Nothing] = + new ZAspect[Any, Nothing] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.untraced + } +} \ No newline at end of file diff --git a/core/shared/src/main/scala/zio/ZIO.scala b/core/shared/src/main/scala/zio/ZIO.scala index d0543be2055e..b19a87c5f9ef 100644 --- a/core/shared/src/main/scala/zio/ZIO.scala +++ b/core/shared/src/main/scala/zio/ZIO.scala @@ -51,6 +51,12 @@ import scala.util.{Failure, Success} */ sealed trait ZIO[-R, +E, +A] extends Serializable with ZIOPlatformSpecific[R, E, A] { self => + /** + * Syntax for adding aspects. + */ + final def @@[R1 <: R, E1 >: E](aspect: ZAspect[R1, E1]): ZIO[R1, E1, A] = + aspect(self) + /** * Sequentially zips this effect with the specified effect, combining the * results into a tuple. diff --git a/core/shared/src/main/scala/zio/ZManaged.scala b/core/shared/src/main/scala/zio/ZManaged.scala index 19d6ab2c6a51..512ff569faaa 100644 --- a/core/shared/src/main/scala/zio/ZManaged.scala +++ b/core/shared/src/main/scala/zio/ZManaged.scala @@ -57,6 +57,12 @@ sealed abstract class ZManaged[-R, +E, +A] extends Serializable { self => */ def zio: ZIO[(R, ZManaged.ReleaseMap), E, (ZManaged.Finalizer, A)] + /** + * Syntax for adding aspects. + */ + final def @@[R1 <: R, E1 >: E](aspect: ManagedAspect[R1, E1]): ZManaged[R1, E1, A] = + aspect(self) + /** * Symbolic alias for zip. */ diff --git a/streams/shared/src/main/scala/zio/stream/StreamAspect.scala b/streams/shared/src/main/scala/zio/stream/StreamAspect.scala new file mode 100644 index 000000000000..ccdd4f852c79 --- /dev/null +++ b/streams/shared/src/main/scala/zio/stream/StreamAspect.scala @@ -0,0 +1,17 @@ +package zio.stream + +trait StreamAspect[-R, +E] { + def apply[R1 <: R, E1 >: E, A](stream: ZStream[R1, E1, A]): ZStream[R1, E1, A] +} + +object StreamAspect { + + /** + * An aspect that rechunks the stream into chunks of the specified size. + */ + def chunkN(n: Int): StreamAspect[Any, Nothing] = + new StreamAspect[Any, Nothing] { + def apply[R1, E1, A](stream: ZStream[R1,E1,A]): ZStream[R1,E1,A] = + stream.chunkN(n) + } +} \ No newline at end of file diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 0e2c55cea1cf..48908a347ec7 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -76,6 +76,12 @@ abstract class ZStream[-R, +E, +O](val process: ZManaged[R, Nothing, ZIO[R, Opti import ZStream.{BufferedPull, Pull, TerminationStrategy} + /** + * Syntax for adding aspects. + */ + final def @@[R1 <: R, E1 >: E](aspect: StreamAspect[R1, E1]): ZStream[R1, E1, O] = + aspect(self) + /** * Symbolic alias for [[ZStream#cross]]. */ From 2e648f6a8bde7c8acd74c50e9ece2dc3a5996ccb Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 22 Apr 2021 16:21:22 -0700 Subject: [PATCH 2/8] format --- core/shared/src/main/scala/zio/ZAspect.scala | 24 +++++++++---------- .../main/scala/zio/stream/StreamAspect.scala | 8 +++---- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/shared/src/main/scala/zio/ZAspect.scala b/core/shared/src/main/scala/zio/ZAspect.scala index b617a5b0da64..a06caf844198 100644 --- a/core/shared/src/main/scala/zio/ZAspect.scala +++ b/core/shared/src/main/scala/zio/ZAspect.scala @@ -10,9 +10,9 @@ trait ZAspect[-R, +E] { object ZAspect { /** - * An aspect that prints the results of effects to the console for debugging - * purposes. - */ + * An aspect that prints the results of effects to the console for debugging + * purposes. + */ val debug: ZAspect[Any, Nothing] = new ZAspect[Any, Nothing] { def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = @@ -20,8 +20,8 @@ object ZAspect { } /** - * As aspect that runs effects on the specified `Executor`. - */ + * As aspect that runs effects on the specified `Executor`. + */ def lock(executor: Executor): ZAspect[Any, Nothing] = new ZAspect[Any, Nothing] { def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = @@ -29,8 +29,8 @@ object ZAspect { } /** - * As aspect that runs effects on the specified `ExecutionContext`. - */ + * As aspect that runs effects on the specified `ExecutionContext`. + */ def on(ec: ExecutionContext): ZAspect[Any, Nothing] = new ZAspect[Any, Nothing] { def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = @@ -38,8 +38,8 @@ object ZAspect { } /** - * As aspect that enables tracing for effects. - */ + * As aspect that enables tracing for effects. + */ val traced: ZAspect[Any, Nothing] = new ZAspect[Any, Nothing] { def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = @@ -47,11 +47,11 @@ object ZAspect { } /** - * As aspect that disables tracing for effects. - */ + * As aspect that disables tracing for effects. + */ val untraced: ZAspect[Any, Nothing] = new ZAspect[Any, Nothing] { def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = zio.untraced } -} \ No newline at end of file +} diff --git a/streams/shared/src/main/scala/zio/stream/StreamAspect.scala b/streams/shared/src/main/scala/zio/stream/StreamAspect.scala index ccdd4f852c79..deba63bb1000 100644 --- a/streams/shared/src/main/scala/zio/stream/StreamAspect.scala +++ b/streams/shared/src/main/scala/zio/stream/StreamAspect.scala @@ -7,11 +7,11 @@ trait StreamAspect[-R, +E] { object StreamAspect { /** - * An aspect that rechunks the stream into chunks of the specified size. - */ + * An aspect that rechunks the stream into chunks of the specified size. + */ def chunkN(n: Int): StreamAspect[Any, Nothing] = new StreamAspect[Any, Nothing] { - def apply[R1, E1, A](stream: ZStream[R1,E1,A]): ZStream[R1,E1,A] = + def apply[R1, E1, A](stream: ZStream[R1, E1, A]): ZStream[R1, E1, A] = stream.chunkN(n) } -} \ No newline at end of file +} From 57a33369dbceb9a2c31b0e7d7e608437d59e2b6e Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 20 May 2021 13:11:56 -0700 Subject: [PATCH 3/8] format --- core/shared/src/main/scala/zio/ZAspect.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/shared/src/main/scala/zio/ZAspect.scala b/core/shared/src/main/scala/zio/ZAspect.scala index a06caf844198..301db4d361e7 100644 --- a/core/shared/src/main/scala/zio/ZAspect.scala +++ b/core/shared/src/main/scala/zio/ZAspect.scala @@ -1,6 +1,7 @@ package zio import zio.internal.Executor + import scala.concurrent.ExecutionContext trait ZAspect[-R, +E] { From c60671f44d088fb89325d879790bb8adacce6347 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 20 May 2021 13:50:38 -0700 Subject: [PATCH 4/8] generalize --- .../src/main/scala/zio/ManagedAspect.scala | 5 -- core/shared/src/main/scala/zio/ZAspect.scala | 58 -------------- core/shared/src/main/scala/zio/ZIO.scala | 4 +- .../shared/src/main/scala/zio/ZIOAspect.scala | 78 +++++++++++++++++++ core/shared/src/main/scala/zio/ZManaged.scala | 4 +- .../src/main/scala/zio/ZManagedAspect.scala | 7 ++ .../main/scala/zio/stream/StreamAspect.scala | 17 ---- .../src/main/scala/zio/stream/ZStream.scala | 6 -- .../zio/stream/experimental/ZStream.scala | 8 ++ .../stream/experimental/ZStreamAspect.scala | 19 +++++ 10 files changed, 118 insertions(+), 88 deletions(-) delete mode 100644 core/shared/src/main/scala/zio/ManagedAspect.scala delete mode 100644 core/shared/src/main/scala/zio/ZAspect.scala create mode 100644 core/shared/src/main/scala/zio/ZIOAspect.scala create mode 100644 core/shared/src/main/scala/zio/ZManagedAspect.scala delete mode 100644 streams/shared/src/main/scala/zio/stream/StreamAspect.scala create mode 100644 streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala diff --git a/core/shared/src/main/scala/zio/ManagedAspect.scala b/core/shared/src/main/scala/zio/ManagedAspect.scala deleted file mode 100644 index e2468995abf1..000000000000 --- a/core/shared/src/main/scala/zio/ManagedAspect.scala +++ /dev/null @@ -1,5 +0,0 @@ -package zio - -trait ManagedAspect[-R, +E] { - def apply[R1 <: R, E1 >: E, A](managed: ZManaged[R1, E1, A]): ZManaged[R1, E1, A] -} diff --git a/core/shared/src/main/scala/zio/ZAspect.scala b/core/shared/src/main/scala/zio/ZAspect.scala deleted file mode 100644 index 301db4d361e7..000000000000 --- a/core/shared/src/main/scala/zio/ZAspect.scala +++ /dev/null @@ -1,58 +0,0 @@ -package zio - -import zio.internal.Executor - -import scala.concurrent.ExecutionContext - -trait ZAspect[-R, +E] { - def apply[R1 <: R, E1 >: E, A](zio: ZIO[R1, E1, A]): ZIO[R1, E1, A] -} - -object ZAspect { - - /** - * An aspect that prints the results of effects to the console for debugging - * purposes. - */ - val debug: ZAspect[Any, Nothing] = - new ZAspect[Any, Nothing] { - def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = - zio.debug - } - - /** - * As aspect that runs effects on the specified `Executor`. - */ - def lock(executor: Executor): ZAspect[Any, Nothing] = - new ZAspect[Any, Nothing] { - def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = - zio.lock(executor) - } - - /** - * As aspect that runs effects on the specified `ExecutionContext`. - */ - def on(ec: ExecutionContext): ZAspect[Any, Nothing] = - new ZAspect[Any, Nothing] { - def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = - zio.on(ec) - } - - /** - * As aspect that enables tracing for effects. - */ - val traced: ZAspect[Any, Nothing] = - new ZAspect[Any, Nothing] { - def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = - zio.traced - } - - /** - * As aspect that disables tracing for effects. - */ - val untraced: ZAspect[Any, Nothing] = - new ZAspect[Any, Nothing] { - def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = - zio.untraced - } -} diff --git a/core/shared/src/main/scala/zio/ZIO.scala b/core/shared/src/main/scala/zio/ZIO.scala index b19a87c5f9ef..8c3d38086648 100644 --- a/core/shared/src/main/scala/zio/ZIO.scala +++ b/core/shared/src/main/scala/zio/ZIO.scala @@ -54,7 +54,9 @@ sealed trait ZIO[-R, +E, +A] extends Serializable with ZIOPlatformSpecific[R, E, /** * Syntax for adding aspects. */ - final def @@[R1 <: R, E1 >: E](aspect: ZAspect[R1, E1]): ZIO[R1, E1, A] = + final def @@[LowerR <: UpperR, UpperR <: R, LowerE >: E, UpperE >: LowerE, LowerA >: A, UpperA >: LowerA]( + aspect: ZIOAspect[LowerR, UpperR, LowerE, UpperE, LowerA, UpperA] + ): ZIO[UpperR, LowerE, LowerA] = aspect(self) /** diff --git a/core/shared/src/main/scala/zio/ZIOAspect.scala b/core/shared/src/main/scala/zio/ZIOAspect.scala new file mode 100644 index 000000000000..5f35782c82a9 --- /dev/null +++ b/core/shared/src/main/scala/zio/ZIOAspect.scala @@ -0,0 +1,78 @@ +package zio + +import zio.clock.Clock +import zio.duration._ +import zio.internal.Executor + +import scala.concurrent.ExecutionContext + +trait ZIOAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA](zio: ZIO[R, E, A]): ZIO[R, E, A] +} + +object ZAspect { + + /** + * An aspect that prints the results of effects to the console for debugging + * purposes. + */ + val debug: ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.debug + } + + /** + * As aspect that runs effects on the specified `Executor`. + */ + def lock(executor: Executor): ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.lock(executor) + } + + /** + * As aspect that runs effects on the specified `ExecutionContext`. + */ + def on(ec: ExecutionContext): ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.on(ec) + } + + /** + * An aspect that retries effects according to the specified schedule. + */ + def retry[R1 <: Clock, E1](schedule: Schedule[R1, E1, Any]): ZIOAspect[Nothing, R1, E1, E1, Nothing, Any] = + new ZIOAspect[Nothing, R1, E1, E1, Nothing, Any] { + def apply[R <: R1, E >: E1 <: E1, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.retry(schedule) + } + + /** + * An aspect that times out effects. + */ + def timeoutFail[E1](e: => E1)(d: Duration): ZIOAspect[Nothing, Clock, E1, Any, Nothing, Any] = + new ZIOAspect[Nothing, Clock, E1, Any, Nothing, Any] { + def apply[R <: Clock, E >: E1, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.timeoutFail(e)(d) + } + + /** + * As aspect that enables tracing for effects. + */ + val traced: ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.traced + } + + /** + * As aspect that disables tracing for effects. + */ + val untraced: ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZIOAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + zio.untraced + } +} diff --git a/core/shared/src/main/scala/zio/ZManaged.scala b/core/shared/src/main/scala/zio/ZManaged.scala index 512ff569faaa..7e5230647837 100644 --- a/core/shared/src/main/scala/zio/ZManaged.scala +++ b/core/shared/src/main/scala/zio/ZManaged.scala @@ -60,7 +60,9 @@ sealed abstract class ZManaged[-R, +E, +A] extends Serializable { self => /** * Syntax for adding aspects. */ - final def @@[R1 <: R, E1 >: E](aspect: ManagedAspect[R1, E1]): ZManaged[R1, E1, A] = + final def @@[LowerR <: UpperR, UpperR <: R, LowerE >: E, UpperE >: LowerE, LowerA >: A, UpperA >: LowerA]( + aspect: ZManagedAspect[LowerR, UpperR, LowerE, UpperE, LowerA, UpperA] + ): ZManaged[UpperR, LowerE, LowerA] = aspect(self) /** diff --git a/core/shared/src/main/scala/zio/ZManagedAspect.scala b/core/shared/src/main/scala/zio/ZManagedAspect.scala new file mode 100644 index 000000000000..9194d823307f --- /dev/null +++ b/core/shared/src/main/scala/zio/ZManagedAspect.scala @@ -0,0 +1,7 @@ +package zio + +trait ZManagedAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA]( + managed: ZManaged[R, E, A] + ): ZManaged[R, E, A] +} diff --git a/streams/shared/src/main/scala/zio/stream/StreamAspect.scala b/streams/shared/src/main/scala/zio/stream/StreamAspect.scala deleted file mode 100644 index deba63bb1000..000000000000 --- a/streams/shared/src/main/scala/zio/stream/StreamAspect.scala +++ /dev/null @@ -1,17 +0,0 @@ -package zio.stream - -trait StreamAspect[-R, +E] { - def apply[R1 <: R, E1 >: E, A](stream: ZStream[R1, E1, A]): ZStream[R1, E1, A] -} - -object StreamAspect { - - /** - * An aspect that rechunks the stream into chunks of the specified size. - */ - def chunkN(n: Int): StreamAspect[Any, Nothing] = - new StreamAspect[Any, Nothing] { - def apply[R1, E1, A](stream: ZStream[R1, E1, A]): ZStream[R1, E1, A] = - stream.chunkN(n) - } -} diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 48908a347ec7..0e2c55cea1cf 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -76,12 +76,6 @@ abstract class ZStream[-R, +E, +O](val process: ZManaged[R, Nothing, ZIO[R, Opti import ZStream.{BufferedPull, Pull, TerminationStrategy} - /** - * Syntax for adding aspects. - */ - final def @@[R1 <: R, E1 >: E](aspect: StreamAspect[R1, E1]): ZStream[R1, E1, O] = - aspect(self) - /** * Symbolic alias for [[ZStream#cross]]. */ diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index e31f5e3a6432..9423445c3c10 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -14,6 +14,14 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A import ZStream.TerminationStrategy + /** + * Syntax for adding aspects. + */ + final def @@[LowerR <: UpperR, UpperR <: R, LowerE >: E, UpperE >: LowerE, LowerA >: A, UpperA >: LowerA]( + aspect: ZStreamAspect[LowerR, UpperR, LowerE, UpperE, LowerA, UpperA] + ): ZStream[UpperR, LowerE, LowerA] = + aspect(self) + /** * Symbolic alias for [[ZStream#cross]]. */ diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala new file mode 100644 index 000000000000..56c62e34f830 --- /dev/null +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala @@ -0,0 +1,19 @@ +package zio.stream.experimental + +trait ZStreamAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA]( + stream: ZStream[R, E, A] + ): ZStream[R, E, A] +} + +object ZStreamAspect { + + /** + * An aspect that rechunks the stream into chunks of the specified size. + */ + def chunkN(n: Int): ZStreamAspect[Nothing, Any, Nothing, Any, Nothing, Any] = + new ZStreamAspect[Nothing, Any, Nothing, Any, Nothing, Any] { + def apply[R, E, A](stream: ZStream[R, E, A]): ZStream[R, E, A] = + stream.chunkN(n) + } +} From 3ac348f3791e8757453b68d8a06892e105748bd0 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Thu, 20 May 2021 16:37:03 -0700 Subject: [PATCH 5/8] reduce number of repetitions for flaky test --- .../shared/src/test/scala/zio/stream/ZStreamSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala index 6fcaa7642b46..fb1a2969c7a7 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala @@ -2037,7 +2037,7 @@ object ZStreamSpec extends ZIOBaseSpec { .run .map(_.interrupted) )(equalTo(false)) - } @@ nonFlaky(10) @@ TestAspect.jvmOnly, + } @@ TestAspect.jvmOnly, testM("interrupts pending tasks when one of the tasks fails") { for { interrupted <- Ref.make(0) From cc6245f96cd8fdafd09ed33460fadd3391e79f5d Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 21 May 2021 06:00:12 -0700 Subject: [PATCH 6/8] fix name --- core/shared/src/main/scala/zio/ZIOAspect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/zio/ZIOAspect.scala b/core/shared/src/main/scala/zio/ZIOAspect.scala index 5f35782c82a9..12ddf21f8ab1 100644 --- a/core/shared/src/main/scala/zio/ZIOAspect.scala +++ b/core/shared/src/main/scala/zio/ZIOAspect.scala @@ -10,7 +10,7 @@ trait ZIOAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA](zio: ZIO[R, E, A]): ZIO[R, E, A] } -object ZAspect { +object ZIOAspect { /** * An aspect that prints the results of effects to the console for debugging From 3a780df64ddf68a5d3bd5625d0f3d2c75424ef26 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 21 May 2021 08:26:28 -0700 Subject: [PATCH 7/8] implement aspect composition operators --- .../shared/src/main/scala/zio/ZIOAspect.scala | 32 ++++++++++++++++++- .../src/main/scala/zio/ZManagedAspect.scala | 32 ++++++++++++++++++- .../stream/experimental/ZStreamAspect.scala | 32 ++++++++++++++++++- 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/zio/ZIOAspect.scala b/core/shared/src/main/scala/zio/ZIOAspect.scala index 12ddf21f8ab1..5fa6abd161e6 100644 --- a/core/shared/src/main/scala/zio/ZIOAspect.scala +++ b/core/shared/src/main/scala/zio/ZIOAspect.scala @@ -6,8 +6,38 @@ import zio.internal.Executor import scala.concurrent.ExecutionContext -trait ZIOAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { +trait ZIOAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { self => + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA](zio: ZIO[R, E, A]): ZIO[R, E, A] + + def >>>[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZIOAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZIOAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + self.andThen(that) + + def andThen[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZIOAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZIOAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + new ZIOAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] { + def apply[R >: LowerR1 <: UpperR1, E >: LowerE1 <: UpperE1, A >: LowerA1 <: UpperA1]( + zio: ZIO[R, E, A] + ): ZIO[R, E, A] = + that(self(zio)) + } } object ZIOAspect { diff --git a/core/shared/src/main/scala/zio/ZManagedAspect.scala b/core/shared/src/main/scala/zio/ZManagedAspect.scala index 9194d823307f..26a05626b653 100644 --- a/core/shared/src/main/scala/zio/ZManagedAspect.scala +++ b/core/shared/src/main/scala/zio/ZManagedAspect.scala @@ -1,7 +1,37 @@ package zio -trait ZManagedAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { +trait ZManagedAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { self => + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA]( managed: ZManaged[R, E, A] ): ZManaged[R, E, A] + + def >>>[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZManagedAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZManagedAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + self.andThen(that) + + def andThen[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZManagedAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZManagedAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + new ZManagedAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] { + def apply[R >: LowerR1 <: UpperR1, E >: LowerE1 <: UpperE1, A >: LowerA1 <: UpperA1]( + managed: ZManaged[R, E, A] + ): ZManaged[R, E, A] = + that(self(managed)) + } } diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala index 56c62e34f830..55bed583cf2e 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala @@ -1,9 +1,39 @@ package zio.stream.experimental -trait ZStreamAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { +trait ZStreamAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { self => + def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA]( stream: ZStream[R, E, A] ): ZStream[R, E, A] + + def >>>[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + self.andThen(that) + + def andThen[ + LowerR1 >: LowerR, + UpperR1 <: UpperR, + LowerE1 >: LowerE, + UpperE1 <: UpperE, + LowerA1 >: LowerA, + UpperA1 <: UpperA + ]( + that: ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] + ): ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = + new ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] { + def apply[R >: LowerR1 <: UpperR1, E >: LowerE1 <: UpperE1, A >: LowerA1 <: UpperA1]( + stream: ZStream[R, E, A] + ): ZStream[R, E, A] = + that(self(stream)) + } } object ZStreamAspect { From 827fc9cfa6d6c3ebb33561ce14d9f08f98806afa Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Fri, 21 May 2021 08:28:09 -0700 Subject: [PATCH 8/8] generalize signature of retry aspect --- core/shared/src/main/scala/zio/ZIOAspect.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/zio/ZIOAspect.scala b/core/shared/src/main/scala/zio/ZIOAspect.scala index 5fa6abd161e6..77cba8f8986f 100644 --- a/core/shared/src/main/scala/zio/ZIOAspect.scala +++ b/core/shared/src/main/scala/zio/ZIOAspect.scala @@ -73,9 +73,9 @@ object ZIOAspect { /** * An aspect that retries effects according to the specified schedule. */ - def retry[R1 <: Clock, E1](schedule: Schedule[R1, E1, Any]): ZIOAspect[Nothing, R1, E1, E1, Nothing, Any] = - new ZIOAspect[Nothing, R1, E1, E1, Nothing, Any] { - def apply[R <: R1, E >: E1 <: E1, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = + def retry[R1 <: Clock, E1](schedule: Schedule[R1, E1, Any]): ZIOAspect[Nothing, R1, Nothing, E1, Nothing, Any] = + new ZIOAspect[Nothing, R1, Nothing, E1, Nothing, Any] { + def apply[R <: R1, E <: E1, A](zio: ZIO[R, E, A]): ZIO[R, E, A] = zio.retry(schedule) }