From d2cdba44b6637aa7a2d2d899a1ee4cad85a221dc Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sat, 30 Oct 2021 09:56:24 -0600 Subject: [PATCH 01/15] initial work --- .../zio/stream/experimental/ZPipeline.scala | 908 ++++++++++++------ 1 file changed, 609 insertions(+), 299 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 0a66a4880ff1..f4b9e43ffe00 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -1,94 +1,242 @@ package zio.stream.experimental import zio._ -import zio.stacktracer.TracingImplicits.disableAutoTrace /** - * A `ZPipeline[Env, Err, In, Out]` is a polymorphic stream transformer. Pipelines accept a stream - * as input, and return the transformed stream as output. + * A `ZPipeline` is a polymorphic stream transformer. Pipelines + * accept a stream as input, and return the transformed stream as output. * - * Pipelines can be thought of as a recipe for calling a bunch of methods on a source stream, to - * yield a new (transformed) stream. A nice mental model is the following type alias: + * Pipelines can be thought of as a recipe for calling a bunch of methods on a + * source stream, to yield a new (transformed) stream. A nice mental model is + * the following type alias: * * {{{ - * type ZPipeline[Env, Err, In, Out] = ZStream[Env, Err, In] => ZStream[Env, Err, Out] + * type ZPipeline[Env, Err, In, Out] = + * ZStream[Env, Err, In] => ZStream[Env, Err, Out] * }}} * - * This encoding of a pipeline with a type alias is not used because it does not infer well. - * In its place, this trait captures the polymorphism inherent to many pipelines, which can - * therefore be more flexible about the environment and error types of the streams they transform. + * This encoding of a pipeline with a type alias is not used because it does + * not infer well. In its place, this trait captures the polymorphism inherent + * to many pipelines, which can therefore be more flexible about the + * environment and error types of the streams they transform. * - * There is no fundamental requirement for pipelines to exist, because everything pipelines do - * can be done directly on a stream. However, because pipelines separate the stream transformation - * from the source stream itself, it becomes possible to abstract over stream transformations at the - * level of values, creating, storing, and passing around reusable transformation pipelines that can - * be applied to many different streams. + * There is no fundamental requirement for pipelines to exist, because + * everything pipelines do can be done directly on a stream. However, because + * pipelines separate the stream transformation from the source stream itself, + * it becomes possible to abstract over stream transformations at the level of + * values, creating, storing, and passing around reusable transformation + * pipelines that can be applied to many different streams. * - * The most common way to create a pipeline is to convert a sink into a pipeline (in general, - * transforming elements of a stream requires the power of a sink). However, the companion object - * has lots of other pipeline constructors based on the methods of stream. + * The most common way to create a pipeline is to convert a sink into a + * pipeline (in general, transforming elements of a stream requires the power + * of a sink). However, the companion object has lots of other pipeline + * constructors based on the methods of stream. */ -trait ZPipeline[-Env, +Err, -In, +Out] { self => - - /** - * Composes two pipelines into one pipeline, by first applying the transformation of this - * pipeline, and then applying the transformation of the specified pipeline. - */ - final def >>>[Env1 <: Env, Err1 >: Err, Out2]( - that: ZPipeline[Env1, Err1, Out, Out2] - ): ZPipeline[Env1, Err1, In, Out2] = - new ZPipeline[Env1, Err1, In, Out2] { - def apply[Env0 <: Env1, Err0 >: Err1](stream: ZStream[Env0, Err0, In])(implicit - trace: ZTraceElement - ): ZStream[Env0, Err0, Out2] = - that(self(stream)) - } - - /** - * Composes two pipelines into one pipeline, by first applying the transformation of the - * specified pipeline, and then applying the transformation of this pipeline. - */ - final def <<<[Env1 <: Env, Err1 >: Err, In2]( - that: ZPipeline[Env1, Err1, In2, In] - ): ZPipeline[Env1, Err1, In2, Out] = - new ZPipeline[Env1, Err1, In2, Out] { - def apply[Env0 <: Env1, Err0 >: Err1](stream: ZStream[Env0, Err0, In2])(implicit - trace: ZTraceElement - ): ZStream[Env0, Err0, Out] = - self(that(stream)) - } - - /** - * A named version of the `>>>` operator. - */ - final def andThen[Env1 <: Env, Err1 >: Err, Out2]( - that: ZPipeline[Env1, Err1, Out, Out2] - ): ZPipeline[Env1, Err1, In, Out2] = - self >>> that - - def apply[Env1 <: Env, Err1 >: Err](stream: ZStream[Env1, Err1, In])(implicit +trait ZPipeline[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] { self => + type OutEnv[Env] + type OutErr[Err] + type OutElem[Elem] + + def apply[Env >: LowerEnv <: UpperEnv, Err >: LowerErr <: UpperErr, Elem >: LowerElem <: UpperElem]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, Out] - - /** - * A named version of the `<<<` operator. - */ - final def compose[Env1 <: Env, Err1 >: Err, In2]( - that: ZPipeline[Env1, Err1, In2, In] - ): ZPipeline[Env1, Err1, In2, Out] = - self <<< that + ): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] } + object ZPipeline { - /** - * A shorter version of [[ZPipeline.identity]], which can facilitate more compact definition - * of pipelines. - * - * {{{ - * ZPipeline[Int].filter(_ % 2 != 0) - * }}} - */ - def apply[I]: ZPipeline[Any, Nothing, I, I] = identity[I] + type WithOut[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem, OutEnv0[Env], OutErr0[Err], Out0[ + Elem + ]] = + ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] { + type OutEnv[Env] = OutEnv0[Env] + type OutErr[Err] = OutErr0[Err] + type OutElem[Elem] = Out0[Elem] + } + + implicit final class ZPipelineSyntax[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem, OutEnv[ + Env + ], OutErr[Err], OutElem[Elem]]( + private val self: ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + OutEnv, + OutErr, + OutElem + ] + ) extends AnyVal { + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of the specified pipeline, and then applying the + * transformation of this pipeline. + */ + def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ + In + ]]( + that: ZPipeline.WithOut[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + that >>> self + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of this pipeline, and then applying the transformation of + * the specified pipeline. + */ + def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ + In + ]]( + that: ZPipeline.WithOut[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) + right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + + /** + * A named version of the `>>>` operator. + */ + def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ + Err + ], OutElem2[ + In + ]]( + that: ZPipeline.WithOut[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + self >>> that + + /** + * A named version of the `<<<` operator. + */ + def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ + Err + ], OutElem2[ + In + ]]( + that: ZPipeline.WithOut[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + self <<< that + } /** * Creates a pipeline that collects elements with the specified partial function. @@ -97,24 +245,29 @@ object ZPipeline { * ZPipeline.collect[Option[Int], Int] { case Some(v) => v } * }}} */ - def collect[In, Out](f: PartialFunction[In, Out]): ZPipeline[Any, Nothing, In, Out] = - new ZPipeline[Any, Nothing, In, Out] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def collect[In, Out]( + f: PartialFunction[In, Out] + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Out })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Out + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, Out] = + ): ZStream[Env, Err, Out] = stream.collect(f) } - /** - * Creates a transducer that always dies with the specified exception. - * - * {{{ - * ZPipeline.die(new IllegalStateException) - * }}} - */ - def die(e: => Throwable): ZPipeline[Any, Nothing, Any, Nothing] = - ZPipeline.failCause(Cause.die(e)) - /** * Creates a pipeline that drops elements until the specified predicate evaluates to true. * @@ -122,11 +275,26 @@ object ZPipeline { * ZPipeline.dropUntil[Int](_ > 100) * }}} */ - def dropUntil[In](f: In => Boolean): ZPipeline[Any, Nothing, In, In] = - new ZPipeline[Any, Nothing, In, In] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def dropUntil[In]( + f: In => Boolean + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, In] = + ): ZStream[Env, Err, Elem] = stream.dropUntil(f) } @@ -137,258 +305,400 @@ object ZPipeline { * ZPipeline.dropWhile[Int](_ <= 100) * }}} */ - def dropWhile[In](f: In => Boolean): ZPipeline[Any, Nothing, In, In] = - new ZPipeline[Any, Nothing, In, In] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def dropWhile[In]( + f: In => Boolean + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, In] = + ): ZStream[Env, Err, Elem] = stream.dropWhile(f) } - /** - * Creates a pipeline that always fails with the specified value. - */ - def fail[E](e: => E): ZPipeline[Any, E, Any, Nothing] = - ZPipeline.failCause(Cause.fail(e)) - - /** - * Creates a transducer that always dies with the specified exception. - */ - def failCause[E](cause: => Cause[E]): ZPipeline[Any, E, Any, Nothing] = - new ZPipeline[Any, E, Any, Nothing] { - def apply[Env1 <: Any, Err1 >: E](stream: ZStream[Env1, Err1, Any])(implicit - trace: ZTraceElement - ): ZStream[Env1, Err1, Nothing] = - ZStream.failCause(cause) - } - /** * Creates a pipeline that filters elements according to the specified predicate. */ - def filter[In](f: In => Boolean): ZPipeline[Any, Nothing, In, In] = - new ZPipeline[Any, Nothing, In, In] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def filter[In]( + f: In => Boolean + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, In] = + ): ZStream[Env, Err, Elem] = stream.filter(f) } - /** - * Creates a pipeline from the provided fold function, which operates on the state and the - * elements of the source stream. - * - * {{{ - * val counter = ZPipeline.foldLeft(0)((count, _) => count + 1) - * }}} - */ - def foldLeft[In, Out](z: Out)(f: (Out, In) => Out): ZPipeline[Any, Nothing, In, Out] = - fold(z)(_ => true)(f) - - /** - * Creates a transducer by effectfully folding over a structure of type `O`. The transducer will - * fold the inputs until the stream ends, resulting in a stream with one element. - */ - @deprecated("use foldLeftZIO", "2.0.0") - def foldLeftM[R, E, I, O](z: O)(f: (O, I) => ZIO[R, E, O]): ZPipeline[R, E, I, O] = - foldLeftZIO(z)(f) - - /** - * Creates a transducer by effectfully folding over a structure of type `O`. The transducer will - * fold the inputs until the stream ends, resulting in a stream with one element. - */ - def foldLeftZIO[R, E, I, O](z: O)(f: (O, I) => ZIO[R, E, O]): ZPipeline[R, E, I, O] = - foldZIO(z)(_ => true)(f) - - /** - * A stateful fold that will emit the state and reset to the starting state every time the - * specified predicate returns false. - */ - def fold[I, O](out0: O)(contFn: O => Boolean)(f: (O, I) => O): ZPipeline[Any, Nothing, I, O] = - new ZPipeline[Any, Nothing, I, O] { - def apply[Env1 <: Any, Err1 >: Nothing]( - stream: ZStream[Env1, Err1, I] - )(implicit trace: ZTraceElement): ZStream[Env1, Err1, O] = - stream - .mapAccum(out0) { case (o0, i) => - val o = f(o0, i) - - if (contFn(o)) (o, Some(o)) - else (out0, None) - } - .collect { case Some(v) => v } - } - - /** - * A stateful fold that will emit the state and reset to the starting state every time the - * specified predicate returns false. - */ - @deprecated("use foldZIO", "2.0.0") - def foldM[R, E, I, O](out0: O)(contFn: O => Boolean)(f: (O, I) => ZIO[R, E, O]): ZPipeline[R, E, I, O] = - foldZIO(out0)(contFn)(f) - - /** - * Creates a transducer that folds elements of type `I` into a structure - * of type `O` until `max` elements have been folded. - * - * Like foldWeighted, but with a constant cost function of 1. - */ - def foldUntil[I, O](z: O, max: Long)(f: (O, I) => O): ZPipeline[Any, Nothing, I, O] = - fold[I, (O, Long)]((z, 0))(_._2 < max) { case ((o, count), i) => - (f(o, i), count + 1) - } >>> ZPipeline.map(_._1) - - /** - * Creates a transducer that effectfully folds elements of type `I` into a structure - * of type `O` until `max` elements have been folded. - * - * Like foldWeightedM, but with a constant cost function of 1. - */ - @deprecated("use foldUntilZIO", "2.0.0") - def foldUntilM[R, E, I, O](z: O, max: Long)(f: (O, I) => ZIO[R, E, O])(implicit - trace: ZTraceElement - ): ZPipeline[R, E, I, O] = - foldUntilZIO(z, max)(f) - - /** - * Creates a transducer that effectfully folds elements of type `I` into a structure - * of type `O` until `max` elements have been folded. - * - * Like foldWeightedM, but with a constant cost function of 1. - */ - def foldUntilZIO[R, E, I, O](z: O, max: Long)(f: (O, I) => ZIO[R, E, O])(implicit - trace: ZTraceElement - ): ZPipeline[R, E, I, O] = - foldZIO[R, E, I, (O, Long)]((z, 0))(_._2 < max) { case ((o, count), i) => - f(o, i).map((_, count + 1)) - } >>> ZPipeline.map(_._1) - - /** - * A stateful fold that will emit the state and reset to the starting state every time the - * specified predicate returns false. - */ - def foldZIO[R, E, I, O](out0: O)(contFn: O => Boolean)(f: (O, I) => ZIO[R, E, O]): ZPipeline[R, E, I, O] = - new ZPipeline[R, E, I, O] { - def apply[Env1 <: R, Err1 >: E]( - stream: ZStream[Env1, Err1, I] - )(implicit trace: ZTraceElement): ZStream[Env1, Err1, O] = - stream - .mapAccumZIO(out0) { case (o, i) => - f(o, i).map { o => - if (contFn(o)) (o, Some(o)) - else (out0, None) - } - } - .collect { case Some(v) => v } - } - - /** - * Creates a pipeline that effectfully maps elements to the specified effectfully-computed - * value. - */ - @deprecated("use fromZIO", "2.0.0") - def fromEffect[R, E, A](zio: ZIO[R, E, A]): ZPipeline[R, E, Any, A] = - fromZIO(zio) - - /** - * Creates a pipeline that effectfully maps elements to the specified effectfully-computed - * value. - */ - def fromZIO[R, E, A](zio: ZIO[R, E, A]): ZPipeline[R, E, Any, A] = - new ZPipeline[R, E, Any, A] { - def apply[Env1 <: R, Err1 >: E](stream: ZStream[Env1, Err1, Any])(implicit - trace: ZTraceElement - ): ZStream[Env1, Err1, A] = - stream.mapZIO(_ => zio) - } - - /** - * Creates a pipeline from a sink, by transforming input streams with [[ZStream#transduce]]. - */ - // TODO -// def fromSink[Env, Err, In, Out](sink: ZSink[Env, Err, In, In, Out]): ZPipeline[Env, Err, In, Out] = -// new ZPipeline[Env, Err, In, Out] { -// def apply[Env1 <: Env, Err1 >: Err](stream: ZStream[Env1, Err1, In]): ZStream[Env1, Err1, Out] = -// stream.transduce(sink) -// } - - /** - * Creates a transducer that always dies with the specified exception. - */ - @deprecated("use failCause", "2.0.0") - def halt[E](cause: => Cause[E]): ZPipeline[Any, E, Any, Nothing] = - failCause(cause) - /** * The identity pipeline, which does not modify streams in any way. */ - def identity[A]: ZPipeline[Any, Nothing, A, A] = - new ZPipeline[Any, Nothing, A, A] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, A])(implicit + val identity: ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + Any, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, A] = + ): ZStream[Env, Err, Elem] = stream } /** * Creates a pipeline that maps elements with the specified function. */ - def map[In, Out](f: In => Out): ZPipeline[Any, Nothing, In, Out] = - new ZPipeline[Any, Nothing, In, Out] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def map[In, Out]( + f: In => Out + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Out })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Out + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, Out] = + ): ZStream[Env, Err, Out] = stream.map(f) } /** - * Creates a pipeline that maps elements with the specified effectful function. - */ - @deprecated("use mapZIO", "2.0.0") - def mapM[Env0, Err0, In, Out](f: In => ZIO[Env0, Err0, Out]): ZPipeline[Env0, Err0, In, Out] = - mapZIO(f) - - /** - * Creates a pipeline that maps elements with the specified effectful function. - */ - def mapZIO[Env0, Err0, In, Out](f: In => ZIO[Env0, Err0, Out]): ZPipeline[Env0, Err0, In, Out] = - new ZPipeline[Env0, Err0, In, Out] { - def apply[Env1 <: Env0, Err1 >: Err0](stream: ZStream[Env1, Err1, In])(implicit + * Creates a pipeline that provides the specified environment. + */ + def provide[Env]( + env: Env + ): ZPipeline.WithOut[ + Env, + Any, + Nothing, + Any, + Nothing, + Any, + ({ type OutEnv[Env] = Any })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Env, Any, Nothing, Any, Nothing, Any] { + type OutEnv[Env] = Any + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env1 >: Env, Err, In](stream: ZStream[Env1, Err, In])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, Out] = - stream.mapZIO(f) + ): ZStream[Any, Err, In] = + stream.provide(env) } /** - * Creates a pipeline that always succeeds with the specified value. - */ - def succeed[A](a: => A): ZPipeline[Any, Nothing, Any, A] = - new ZPipeline[Any, Nothing, Any, A] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, Any])(implicit + * Creates a pipeline that scans elements with the specified function. + */ + def scan[In, Out]( + s: Out + )( + f: (Out, In) => Out + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Out })#OutElem + ] = + scanZIO(s)((out, in) => ZIO.succeedNow(f(out, in))) + + /** + * Creates a pipeline that scans elements with the specified function. + */ + def scanZIO[Env, Err, In, Out]( + s: Out + )( + f: (Out, In) => ZIO[Env, Err, Out] + ): ZPipeline.WithOut[ + Nothing, + Env, + Err, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Out })#OutElem + ] = + new ZPipeline[Nothing, Env, Err, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Out + def apply[Env1 <: Env, Err1 >: Err, Elem <: In](stream: ZStream[Env1, Err1, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, A] = - ZStream.succeed(a) + ): ZStream[Env1, Err1, Out] = + stream.scanZIO(s)(f) } /** * Creates a pipeline that takes elements until the specified predicate evaluates to true. */ - def takeUntil[In](f: In => Boolean): ZPipeline[Any, Nothing, In, In] = - new ZPipeline[Any, Nothing, In, In] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def takeUntil[In]( + f: In => Boolean + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, In] = + ): ZStream[Env, Err, Elem] = stream.takeUntil(f) } /** * Creates a pipeline that takes elements while the specified predicate evaluates to true. */ - def takeWhile[In](f: In => Boolean): ZPipeline[Any, Nothing, In, In] = - new ZPipeline[Any, Nothing, In, In] { - def apply[Env1 <: Any, Err1 >: Nothing](stream: ZStream[Env1, Err1, In])(implicit + def takeWhile[In]( + f: In => Boolean + ): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit trace: ZTraceElement - ): ZStream[Env1, Err1, In] = + ): ZStream[Env, Err, Elem] = stream.takeWhile(f) } +} + +trait Compose[-LeftLower, -LeftUpper, LeftOut[In], -RightLower, -RightUpper, RightOut[In]] { + type Lower + type Upper + type Out[In] +} +object Compose extends ComposeLowPriorityImplicits { + type WithOut[LeftLower, LeftUpper, LeftOut[In], RightLower, RightUpper, RightOut[In], Lower0, Upper0, Out0[In]] = + Compose[LeftLower, LeftUpper, LeftOut, RightLower, RightUpper, RightOut] { + type Lower = Lower0 + type Upper = Upper0 + type Out[In] = Out0[In] + } + + implicit def compose[ + LeftLower, + LeftUpper, + LeftOut >: RightLower <: RightUpper, + RightLower, + RightUpper, + RightOut + ]: Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out, + LeftLower, + LeftUpper, + ({ type Out[In] = RightOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper + type Out[In] = RightOut + } + + implicit def identity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper]: Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out, + RightLower, + LeftUpper with RightUpper, + ({ type Out[In] = In })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out + ] { + type Lower = RightLower + type Upper = LeftUpper with RightUpper + type Out[In] = In + } + + implicit def leftIdentity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper, RightOut]: Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out, + RightLower, + LeftUpper with RightUpper, + ({ type Out[In] = RightOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = RightLower + type Upper = LeftUpper with RightUpper + type Out[In] = RightOut + } + + implicit def rightIdentity[LeftLower, LeftUpper, LeftOut >: RightLower <: RightUpper, RightLower, RightUpper] + : Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out, + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper + type Out[In] = LeftOut + } +} + +trait ComposeLowPriorityImplicits { + + implicit def identityLowPriority[LeftLowerElem, LeftUpperElem, RightLowerElem <: LeftLowerElem, RightUpperElem] + : Compose.WithOut[ + LeftLowerElem, + LeftUpperElem, + ({ type Out[In] = In })#Out, + RightLowerElem, + RightUpperElem, + ({ type Out[In] = In })#Out, + LeftLowerElem, + LeftUpperElem with RightUpperElem, + ({ type Out[In] = In })#Out + ] = + new Compose[ + LeftLowerElem, + LeftUpperElem, + ({ type Out[In] = In })#Out, + RightLowerElem, + RightUpperElem, + ({ type Out[In] = In })#Out + ] { + type Lower = LeftLowerElem + type Upper = LeftUpperElem with RightUpperElem + type Out[In] = In + } + + implicit def leftIdentityLowPriority[LeftLower, LeftUpper, RightLower <: LeftLower, RightUpper, RightOut] + : Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out, + LeftLower, + LeftUpper with RightUpper, + ({ type Out[In] = RightOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper with RightUpper + type Out[In] = RightOut + } } From 6d5b688f13c0a26511511de7133e66f80617b994 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 08:40:08 -0600 Subject: [PATCH 02/15] resolve version specific issues --- .../ZPipelineCompanionVersionSpecific.scala | 21 + .../ZPipelineVersionSpecific.scala | 234 +++++++++ .../ZPipelineCompanionVersionSpecific.scala | 182 +++++++ .../ZPipelineVersionSpecific.scala | 19 + .../zio/stream/experimental/ZPipeline.scala | 482 ++++++------------ 5 files changed, 614 insertions(+), 324 deletions(-) create mode 100644 streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala create mode 100644 streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala create mode 100644 streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala create mode 100644 streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala diff --git a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala new file mode 100644 index 000000000000..ddb9120a890d --- /dev/null +++ b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2020-2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.stream.experimental + +import zio._ + +trait ZPipelineCompanionVersionSpecific diff --git a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala new file mode 100644 index 000000000000..0cdaa97cd863 --- /dev/null +++ b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala @@ -0,0 +1,234 @@ +/* + * Copyright 2020-2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.stream.experimental + +import zio._ + +import scala.annotation.unchecked.uncheckedVariance + +trait ZPipelineVersionSpecific[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] { + self: ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] => + import ZPipeline._ + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of the specified pipeline, and then applying the + * transformation of this pipeline. + */ + def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv] @uncheckedVariance, + composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr] @uncheckedVariance, + composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] @uncheckedVariance + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](right) + left.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of this pipeline, and then applying the transformation of + * the specified pipeline. + */ + def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv] @uncheckedVariance, + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr] @uncheckedVariance, + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] @uncheckedVariance + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) + right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + + /** + * A named version of the `>>>` operator. + */ + def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv] @uncheckedVariance, + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr] @uncheckedVariance, + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] @uncheckedVariance + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) + right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + + /** + * A named version of the `<<<` operator. + */ + def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv] @uncheckedVariance, + composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr] @uncheckedVariance, + composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] @uncheckedVariance + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](right) + left.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } +} diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala new file mode 100644 index 000000000000..edad6a6030a8 --- /dev/null +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -0,0 +1,182 @@ +/* + * Copyright 2020-2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.stream.experimental + +import zio._ + +trait ZPipelineCompanionVersionSpecific { + import ZPipeline._ + + implicit final class ZPipelineSyntax[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem, OutEnv[ + Env + ], OutErr[Err], OutElem[Elem]]( + private val self: ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + OutEnv, + OutErr, + OutElem + ] + ) { + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of the specified pipeline, and then applying the + * transformation of this pipeline. + */ + def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + that >>> self + + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of this pipeline, and then applying the transformation of + * the specified pipeline. + */ + def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) + right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + + /** + * A named version of the `>>>` operator. + */ + def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + self >>> that + + /** + * A named version of the `<<<` operator. + */ + def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + self <<< that + } +} diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala new file mode 100644 index 000000000000..c49c732e842d --- /dev/null +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2020-2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.stream.experimental + +trait ZPipelineVersionSpecific[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index f4b9e43ffe00..8c290c96ed96 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2020-2021 John A. De Goes and the ZIO Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package zio.stream.experimental import zio._ @@ -32,7 +48,8 @@ import zio._ * of a sink). However, the companion object has lots of other pipeline * constructors based on the methods of stream. */ -trait ZPipeline[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] { self => +trait ZPipeline[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] + extends ZPipelineVersionSpecific[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] { self => type OutEnv[Env] type OutErr[Err] type OutElem[Elem] @@ -44,7 +61,7 @@ trait ZPipeline[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperEl ): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] } -object ZPipeline { +object ZPipeline extends ZPipelineCompanionVersionSpecific { type WithOut[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem, OutEnv0[Env], OutErr0[Err], Out0[ Elem @@ -55,189 +72,6 @@ object ZPipeline { type OutElem[Elem] = Out0[Elem] } - implicit final class ZPipelineSyntax[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem, OutEnv[ - Env - ], OutErr[Err], OutElem[Elem]]( - private val self: ZPipeline.WithOut[ - LowerEnv, - UpperEnv, - LowerErr, - UpperErr, - LowerElem, - UpperElem, - OutEnv, - OutErr, - OutElem - ] - ) extends AnyVal { - - /** - * Composes two pipelines into one pipeline, by first applying the - * transformation of the specified pipeline, and then applying the - * transformation of this pipeline. - */ - def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ - In - ]]( - that: ZPipeline.WithOut[ - LowerEnv2, - UpperEnv2, - LowerErr2, - UpperErr2, - LowerElem2, - UpperElem2, - OutEnv2, - OutErr2, - OutElem2 - ] - )(implicit - composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], - composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], - composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] - ): ZPipeline.WithOut[ - composeEnv.Lower, - composeEnv.Upper, - composeErr.Lower, - composeErr.Upper, - composeElem.Lower, - composeElem.Upper, - composeEnv.Out, - composeErr.Out, - composeElem.Out - ] = - that >>> self - - /** - * Composes two pipelines into one pipeline, by first applying the - * transformation of this pipeline, and then applying the transformation of - * the specified pipeline. - */ - def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ - In - ]]( - that: ZPipeline.WithOut[ - LowerEnv2, - UpperEnv2, - LowerErr2, - UpperErr2, - LowerElem2, - UpperElem2, - OutEnv2, - OutErr2, - OutElem2 - ] - )(implicit - composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], - composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], - composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] - ): ZPipeline.WithOut[ - composeEnv.Lower, - composeEnv.Upper, - composeErr.Lower, - composeErr.Upper, - composeElem.Lower, - composeElem.Upper, - composeEnv.Out, - composeErr.Out, - composeElem.Out - ] = - new ZPipeline[ - composeEnv.Lower, - composeEnv.Upper, - composeErr.Lower, - composeErr.Upper, - composeElem.Lower, - composeElem.Upper - ] { - type OutEnv[Env] = composeEnv.Out[Env] - type OutErr[Err] = composeErr.Out[Err] - type OutElem[Elem] = composeElem.Out[Elem] - def apply[ - Env >: composeEnv.Lower <: composeEnv.Upper, - Err >: composeErr.Lower <: composeErr.Upper, - Elem >: composeElem.Lower <: composeElem.Upper - ]( - stream: ZStream[Env, Err, Elem] - )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { - val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) - val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) - right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] - } - } - - /** - * A named version of the `>>>` operator. - */ - def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ - Err - ], OutElem2[ - In - ]]( - that: ZPipeline.WithOut[ - LowerEnv2, - UpperEnv2, - LowerErr2, - UpperErr2, - LowerElem2, - UpperElem2, - OutEnv2, - OutErr2, - OutElem2 - ] - )(implicit - composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], - composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], - composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] - ): ZPipeline.WithOut[ - composeEnv.Lower, - composeEnv.Upper, - composeErr.Lower, - composeErr.Upper, - composeElem.Lower, - composeElem.Upper, - composeEnv.Out, - composeErr.Out, - composeElem.Out - ] = - self >>> that - - /** - * A named version of the `<<<` operator. - */ - def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ - Err - ], OutElem2[ - In - ]]( - that: ZPipeline.WithOut[ - LowerEnv2, - UpperEnv2, - LowerErr2, - UpperErr2, - LowerElem2, - UpperElem2, - OutEnv2, - OutErr2, - OutElem2 - ] - )(implicit - composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], - composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], - composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] - ): ZPipeline.WithOut[ - composeEnv.Lower, - composeEnv.Upper, - composeErr.Lower, - composeErr.Upper, - composeElem.Lower, - composeElem.Upper, - composeEnv.Out, - composeErr.Out, - composeElem.Out - ] = - self <<< that - } - /** * Creates a pipeline that collects elements with the specified partial function. * @@ -529,176 +363,176 @@ object ZPipeline { ): ZStream[Env, Err, Elem] = stream.takeWhile(f) } -} -trait Compose[-LeftLower, -LeftUpper, LeftOut[In], -RightLower, -RightUpper, RightOut[In]] { - type Lower - type Upper - type Out[In] -} + trait Compose[LeftLower, LeftUpper, LeftOut[In], RightLower, RightUpper, RightOut[In]] { + type Lower + type Upper + type Out[In] + } -object Compose extends ComposeLowPriorityImplicits { - type WithOut[LeftLower, LeftUpper, LeftOut[In], RightLower, RightUpper, RightOut[In], Lower0, Upper0, Out0[In]] = - Compose[LeftLower, LeftUpper, LeftOut, RightLower, RightUpper, RightOut] { - type Lower = Lower0 - type Upper = Upper0 - type Out[In] = Out0[In] - } + object Compose extends ComposeLowPriorityImplicits { + type WithOut[LeftLower, LeftUpper, LeftOut[In], RightLower, RightUpper, RightOut[In], Lower0, Upper0, Out0[In]] = + Compose[LeftLower, LeftUpper, LeftOut, RightLower, RightUpper, RightOut] { + type Lower = Lower0 + type Upper = Upper0 + type Out[In] = Out0[In] + } - implicit def compose[ - LeftLower, - LeftUpper, - LeftOut >: RightLower <: RightUpper, - RightLower, - RightUpper, - RightOut - ]: Compose.WithOut[ - LeftLower, - LeftUpper, - ({ type Out[In] = LeftOut })#Out, - RightLower, - RightUpper, - ({ type Out[In] = RightOut })#Out, - LeftLower, - LeftUpper, - ({ type Out[In] = RightOut })#Out - ] = - new Compose[ + implicit def compose[ LeftLower, LeftUpper, - ({ type Out[In] = LeftOut })#Out, + LeftOut >: RightLower <: RightUpper, RightLower, RightUpper, - ({ type Out[In] = RightOut })#Out - ] { - type Lower = LeftLower - type Upper = LeftUpper - type Out[In] = RightOut - } - - implicit def identity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper]: Compose.WithOut[ - LeftLower, - LeftUpper, - ({ type Out[In] = In })#Out, - RightLower, - RightUpper, - ({ type Out[In] = In })#Out, - RightLower, - LeftUpper with RightUpper, - ({ type Out[In] = In })#Out - ] = - new Compose[ + RightOut + ]: Compose.WithOut[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + ({ type Out[In] = LeftOut })#Out, RightLower, RightUpper, - ({ type Out[In] = In })#Out - ] { - type Lower = RightLower - type Upper = LeftUpper with RightUpper - type Out[In] = In - } - - implicit def leftIdentity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper, RightOut]: Compose.WithOut[ - LeftLower, - LeftUpper, - ({ type Out[In] = In })#Out, - RightLower, - RightUpper, - ({ type Out[In] = RightOut })#Out, - RightLower, - LeftUpper with RightUpper, - ({ type Out[In] = RightOut })#Out - ] = - new Compose[ + ({ type Out[In] = RightOut })#Out, LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, - RightLower, - RightUpper, ({ type Out[In] = RightOut })#Out - ] { - type Lower = RightLower - type Upper = LeftUpper with RightUpper - type Out[In] = RightOut - } + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper + type Out[In] = RightOut + } - implicit def rightIdentity[LeftLower, LeftUpper, LeftOut >: RightLower <: RightUpper, RightLower, RightUpper] - : Compose.WithOut[ + implicit def identity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper]: Compose.WithOut[ LeftLower, LeftUpper, - ({ type Out[In] = LeftOut })#Out, - RightLower, - RightUpper, ({ type Out[In] = In })#Out, - LeftLower, - LeftUpper, - ({ type Out[In] = LeftOut })#Out - ] = - new Compose[ - LeftLower, - LeftUpper, - ({ type Out[In] = LeftOut })#Out, RightLower, RightUpper, - ({ type Out[In] = In })#Out - ] { - type Lower = LeftLower - type Upper = LeftUpper - type Out[In] = LeftOut - } -} - -trait ComposeLowPriorityImplicits { - - implicit def identityLowPriority[LeftLowerElem, LeftUpperElem, RightLowerElem <: LeftLowerElem, RightUpperElem] - : Compose.WithOut[ - LeftLowerElem, - LeftUpperElem, - ({ type Out[In] = In })#Out, - RightLowerElem, - RightUpperElem, ({ type Out[In] = In })#Out, - LeftLowerElem, - LeftUpperElem with RightUpperElem, + RightLower, + LeftUpper with RightUpper, ({ type Out[In] = In })#Out ] = - new Compose[ - LeftLowerElem, - LeftUpperElem, - ({ type Out[In] = In })#Out, - RightLowerElem, - RightUpperElem, - ({ type Out[In] = In })#Out - ] { - type Lower = LeftLowerElem - type Upper = LeftUpperElem with RightUpperElem - type Out[In] = In - } + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out + ] { + type Lower = RightLower + type Upper = LeftUpper with RightUpper + type Out[In] = In + } - implicit def leftIdentityLowPriority[LeftLower, LeftUpper, RightLower <: LeftLower, RightUpper, RightOut] - : Compose.WithOut[ + implicit def leftIdentity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper, RightOut]: Compose.WithOut[ LeftLower, LeftUpper, ({ type Out[In] = In })#Out, RightLower, RightUpper, ({ type Out[In] = RightOut })#Out, - LeftLower, + RightLower, LeftUpper with RightUpper, ({ type Out[In] = RightOut })#Out ] = - new Compose[ - LeftLower, - LeftUpper, - ({ type Out[In] = In })#Out, - RightLower, - RightUpper, - ({ type Out[In] = RightOut })#Out - ] { - type Lower = LeftLower - type Upper = LeftUpper with RightUpper - type Out[In] = RightOut - } + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = RightLower + type Upper = LeftUpper with RightUpper + type Out[In] = RightOut + } + + implicit def rightIdentity[LeftLower, LeftUpper, LeftOut >: RightLower <: RightUpper, RightLower, RightUpper] + : Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out, + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = LeftOut })#Out, + RightLower, + RightUpper, + ({ type Out[In] = In })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper + type Out[In] = LeftOut + } + } + + trait ComposeLowPriorityImplicits { + + implicit def identityLowPriority[LeftLowerElem, LeftUpperElem, RightLowerElem <: LeftLowerElem, RightUpperElem] + : Compose.WithOut[ + LeftLowerElem, + LeftUpperElem, + ({ type Out[In] = In })#Out, + RightLowerElem, + RightUpperElem, + ({ type Out[In] = In })#Out, + LeftLowerElem, + LeftUpperElem with RightUpperElem, + ({ type Out[In] = In })#Out + ] = + new Compose[ + LeftLowerElem, + LeftUpperElem, + ({ type Out[In] = In })#Out, + RightLowerElem, + RightUpperElem, + ({ type Out[In] = In })#Out + ] { + type Lower = LeftLowerElem + type Upper = LeftUpperElem with RightUpperElem + type Out[In] = In + } + + implicit def leftIdentityLowPriority[LeftLower, LeftUpper, RightLower <: LeftLower, RightUpper, RightOut] + : Compose.WithOut[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out, + LeftLower, + LeftUpper with RightUpper, + ({ type Out[In] = RightOut })#Out + ] = + new Compose[ + LeftLower, + LeftUpper, + ({ type Out[In] = In })#Out, + RightLower, + RightUpper, + ({ type Out[In] = RightOut })#Out + ] { + type Lower = LeftLower + type Upper = LeftUpper with RightUpper + type Out[In] = RightOut + } + } } From 67997b8df787107f7ad2055a1ae3d0f75857edbe Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 09:49:28 -0600 Subject: [PATCH 03/15] add tests --- .../stream/experimental/ZPipelineSpec.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 streams-tests/shared/src/test/scala/zio/stream/experimental/ZPipelineSpec.scala diff --git a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZPipelineSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZPipelineSpec.scala new file mode 100644 index 000000000000..0cc97df48b47 --- /dev/null +++ b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZPipelineSpec.scala @@ -0,0 +1,67 @@ +package zio.stream.experimental + +import zio._ +import zio.test._ + +object ZPipelineSpec extends ZIOBaseSpec { + + def spec = suite("ZPipelineSpec")( + test("pipelines are polymorphic") { + val pipeline = ZPipeline.identity + val stream1 = ZStream(1, 2, 3) + val stream2 = ZStream("foo", "bar", "baz") + for { + result1 <- pipeline(stream1).runCollect + result2 <- pipeline(stream2).runCollect + } yield assertTrue(result1 == Chunk(1, 2, 3)) && + assertTrue(result2 == Chunk("foo", "bar", "baz")) + }, + test("polymorphic pipelines can be composed") { + val pipeline1 = ZPipeline.identity + val pipeline2 = ZPipeline.identity + val pipeline3 = pipeline1 >>> pipeline2 + val stream = ZStream(1, 2, 3) + for { + result <- pipeline3(stream).runCollect + } yield assertTrue(result == Chunk(1, 2, 3)) + }, + test("monomorphic pipelines can be composed") { + val pipeline1 = ZPipeline.map[String, Double](_.toDouble) + val pipeline2 = ZPipeline.map[Double, Int](_.toInt) + val pipeline3 = pipeline1 >>> pipeline2 + val stream = ZStream("1", "2", "3") + for { + result <- pipeline3(stream).runCollect + } yield assertTrue(result == Chunk(1, 2, 3)) + }, + test("monomorphic and polymorphic pipelines can be composed") { + val pipeline1 = ZPipeline.map[String, Double](_.toDouble) + val pipeline2 = ZPipeline.map[Double, Int](_.toInt) + val pipeline3 = pipeline1 >>> pipeline2 + val pipeline4 = ZPipeline.identity + val pipeline5 = pipeline3 >>> pipeline4 + val stream = ZStream("1", "2", "3") + for { + result <- pipeline5(stream).runCollect + } yield assertTrue(result == Chunk(1, 2, 3)) + }, + test("polymorphic and monomorphic pipelines can be composed") { + val pipeline1 = ZPipeline.map[String, Double](_.toDouble) + val pipeline2 = ZPipeline.map[Double, Int](_.toInt) + val pipeline3 = pipeline1 >>> pipeline2 + val pipeline4 = ZPipeline.identity + val pipeline5 = pipeline4 >>> pipeline3 + val stream = ZStream("1", "2", "3") + for { + result <- pipeline5(stream).runCollect + } yield assertTrue(result == Chunk(1, 2, 3)) + }, + test("pipelines can provide the environment") { + val pipeline = ZPipeline.provide(42) + val stream = ZStream.environment[Int] + for { + result <- pipeline(stream).runCollect + } yield assertTrue(result == Chunk(42)) + } + ) +} From dd4bf366140eddc1894dbe092ec5183263a48cf9 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 10:12:20 -0600 Subject: [PATCH 04/15] remove groupadjactby for now --- .../zio/stream/experimental/ZPipeline.scala | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index cac38df88112..8c290c96ed96 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -188,45 +188,6 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { stream.filter(f) } - /** - * Creates a pipeline that groups on adjacent keys, calculated by function f. - */ - def groupAdjacentBy[I, K]( - f: I => K - )(implicit trace: ZTraceElement): ZPipeline[Any, Nothing, I, (K, NonEmptyChunk[I])] = - new ZPipeline[Any, Nothing, I, (K, NonEmptyChunk[I])] { - type O = (K, NonEmptyChunk[I]) - def go(in: Chunk[I], state: Option[O]): (Chunk[O], Option[O]) = - in.foldLeft[(Chunk[O], Option[O])]((Chunk.empty, state)) { - case ((os, None), i) => - (os, Some((f(i), NonEmptyChunk(i)))) - case ((os, Some(agg @ (k, aggregated))), i) => - val k2 = f(i) - if (k == k2) - (os, Some((k, aggregated :+ i))) - else - (os :+ agg, Some((k2, NonEmptyChunk(i)))) - } - - def apply[R, E](stream: ZStream[R, E, I])(implicit trace: ZTraceElement): ZStream[R, E, O] = { - def chunkAdjacent(buffer: Option[O]): ZChannel[R, E, Chunk[I], Any, E, Chunk[O], Unit] = - ZChannel.readWithCause[R, E, Chunk[I], Any, E, Chunk[O], Unit]( - in = chunk => { - val (outputs, newBuffer) = go(chunk, buffer) - ZChannel.write(outputs) *> chunkAdjacent(newBuffer) - }, - halt = ZChannel.failCause(_), - done = _ => - buffer match { - case Some(o) => ZChannel.write(Chunk.single(o)) - case None => ZChannel.unit - } - ) - - new ZStream(stream.channel >>> chunkAdjacent(None)) - } - } - /** * The identity pipeline, which does not modify streams in any way. */ From 15da89f1f2387e193bd8dd2a7845a020bf31965f Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 12:54:28 -0600 Subject: [PATCH 05/15] fix scala 3 compiler crash --- .../ZPipelineCompanionVersionSpecific.scala | 68 ++++++++++++------- .../zio/stream/experimental/ZPipeline.scala | 36 +++++----- 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala index edad6a6030a8..87e581801590 100644 --- a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -42,19 +42,24 @@ trait ZPipelineCompanionVersionSpecific { * transformation of the specified pipeline, and then applying the * transformation of this pipeline. */ - def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( - that: ZPipeline[ + def <<<[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ + Elem + ]]( + that: ZPipeline.WithOut[ LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, - UpperElem2 + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 ] )(implicit - composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv], - composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr], - composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] + composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] ): ZPipeline.WithOut[ composeEnv.Lower, composeEnv.Upper, @@ -73,19 +78,24 @@ trait ZPipelineCompanionVersionSpecific { * transformation of this pipeline, and then applying the transformation of * the specified pipeline. */ - def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( - that: ZPipeline[ + def >>>[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ + Elem + ]]( + that: ZPipeline.WithOut[ LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, - UpperElem2 + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 ] )(implicit - composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv], - composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr], - composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] ): ZPipeline.WithOut[ composeEnv.Lower, composeEnv.Upper, @@ -124,19 +134,24 @@ trait ZPipelineCompanionVersionSpecific { /** * A named version of the `>>>` operator. */ - def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( - that: ZPipeline[ + def andThen[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ + Err + ], OutElem2[Elem]]( + that: ZPipeline.WithOut[ LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, - UpperElem2 + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 ] )(implicit - composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv], - composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr], - composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] ): ZPipeline.WithOut[ composeEnv.Lower, composeEnv.Upper, @@ -153,19 +168,24 @@ trait ZPipelineCompanionVersionSpecific { /** * A named version of the `<<<` operator. */ - def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( - that: ZPipeline[ + def compose[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[ + Err + ], OutElem2[Elem]]( + that: ZPipeline.WithOut[ LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, - UpperElem2 + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 ] )(implicit - composeEnv: Compose[LowerEnv2, UpperEnv2, that.OutEnv, LowerEnv, UpperEnv, OutEnv], - composeErr: Compose[LowerErr2, UpperErr2, that.OutErr, LowerErr, UpperErr, OutErr], - composeElem: Compose[LowerElem2, UpperElem2, that.OutElem, LowerElem, UpperElem, OutElem] + composeEnv: Compose[LowerEnv2, UpperEnv2, OutEnv2, LowerEnv, UpperEnv, OutEnv], + composeErr: Compose[LowerErr2, UpperErr2, OutErr2, LowerErr, UpperErr, OutErr], + composeElem: Compose[LowerElem2, UpperElem2, OutElem2, LowerElem, UpperElem, OutElem] ): ZPipeline.WithOut[ composeEnv.Lower, composeEnv.Upper, diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 8c290c96ed96..e1c22d39dc3e 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -72,6 +72,8 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { type OutElem[Elem] = Out0[Elem] } + type Identity[A] = A + /** * Creates a pipeline that collects elements with the specified partial function. * @@ -364,7 +366,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { stream.takeWhile(f) } - trait Compose[LeftLower, LeftUpper, LeftOut[In], RightLower, RightUpper, RightOut[In]] { + trait Compose[+LeftLower, -LeftUpper, LeftOut[In], +RightLower, -RightUpper, RightOut[In]] { type Lower type Upper type Out[In] @@ -412,21 +414,21 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { implicit def identity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper]: Compose.WithOut[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, LeftUpper with RightUpper, - ({ type Out[In] = In })#Out + Identity ] = new Compose[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, - ({ type Out[In] = In })#Out + Identity ] { type Lower = RightLower type Upper = LeftUpper with RightUpper @@ -436,7 +438,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { implicit def leftIdentity[LeftLower <: RightLower, LeftUpper, RightLower, RightUpper, RightOut]: Compose.WithOut[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, ({ type Out[In] = RightOut })#Out, @@ -447,7 +449,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { new Compose[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, ({ type Out[In] = RightOut })#Out @@ -464,7 +466,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { ({ type Out[In] = LeftOut })#Out, RightLower, RightUpper, - ({ type Out[In] = In })#Out, + Identity, LeftLower, LeftUpper, ({ type Out[In] = LeftOut })#Out @@ -475,7 +477,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { ({ type Out[In] = LeftOut })#Out, RightLower, RightUpper, - ({ type Out[In] = In })#Out + Identity ] { type Lower = LeftLower type Upper = LeftUpper @@ -489,21 +491,21 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { : Compose.WithOut[ LeftLowerElem, LeftUpperElem, - ({ type Out[In] = In })#Out, + Identity, RightLowerElem, RightUpperElem, - ({ type Out[In] = In })#Out, + Identity, LeftLowerElem, LeftUpperElem with RightUpperElem, - ({ type Out[In] = In })#Out + Identity ] = new Compose[ LeftLowerElem, LeftUpperElem, - ({ type Out[In] = In })#Out, + Identity, RightLowerElem, RightUpperElem, - ({ type Out[In] = In })#Out + Identity ] { type Lower = LeftLowerElem type Upper = LeftUpperElem with RightUpperElem @@ -514,7 +516,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { : Compose.WithOut[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, ({ type Out[In] = RightOut })#Out, @@ -525,7 +527,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { new Compose[ LeftLower, LeftUpper, - ({ type Out[In] = In })#Out, + Identity, RightLower, RightUpper, ({ type Out[In] = RightOut })#Out From afa7183a75176f024e9168df62f03815e9c8508c Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 14:21:41 -0600 Subject: [PATCH 06/15] remove some other newly added operators for now --- .../zio/stream/experimental/ZStreamSpec.scala | 85 ------------------- .../zio/stream/experimental/ZPipeline.scala | 6 -- .../zio/stream/experimental/ZStream.scala | 54 ------------ 3 files changed, 145 deletions(-) diff --git a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala index 57e01be26d02..e758764740de 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala @@ -367,91 +367,6 @@ object ZStreamSpec extends ZIOBaseSpec { } yield assert(leftAssoc -> rightAssoc)(equalTo(true -> true)) ) ), - suite("branchAfter")( - test("switches pipelines") { - check(Gen.chunkOf(Gen.int)) { data => - val test = - ZStream - .fromChunk(0 +: data) - .branchAfter(1) { values => - values.toList match { - case 0 :: Nil => ZPipeline[Int] - case _ => ZPipeline.fail("boom") - } - } - .runCollect - assertM(test.exit)(succeeds(equalTo(data))) - } - }, - // test("finalizes transducers") { - // check(Gen.chunkOf(Gen.int)) { data => - // val test = - // Ref.make(0).flatMap { ref => - // ZStream - // .fromChunk(data) - // .branchAfter(1) { values => - // values.toList match { - // case _ => - // ZTransducer { - // Managed.acquireReleaseWith( - // ref - // .update(_ + 1) - // .as[Option[Chunk[Int]] => UIO[Chunk[Int]]] { - // case None => ZIO.succeedNow(Chunk.empty) - // case Some(c) => ZIO.succeedNow(c) - // } - // )(_ => ref.update(_ - 1)) - // } - // } - // } - // .runDrain *> ref.get - // } - // assertM(test.exit)(succeeds(equalTo(0))) - // } - // }, - // test("finalizes transducers - inner transducer fails") { - // check(Gen.chunkOf(Gen.int)) { data => - // val test = - // Ref.make(0).flatMap { ref => - // ZStream - // .fromChunk(data) - // .branchAfter(1) { values => - // values.toList match { - // case _ => - // ZTransducer { - // Managed.acquireReleaseWith( - // ref - // .update(_ + 1) - // .as[Option[Chunk[Int]] => IO[String, Chunk[Int]]] { case _ => - // ZIO.fail("boom") - // } - // )(_ => ref.update(_ - 1)) - // } - // } - // } - // .runDrain - // .ignore *> ref.get - // } - // assertM(test.exit)(succeeds(equalTo(0))) - // } - // }, - test("emits data if less than n are collected") { - val gen = - for { - data <- Gen.chunkOf(Gen.int) - n <- Gen.int.filter(_ > data.length) - } yield (data, n) - - check(gen) { case (data, n) => - val test = - ZStream - .fromChunk(data) - .branchAfter(n)(ZPipeline.prepend) - .runCollect - assertM(test.exit)(succeeds(equalTo(data))) - } - } - ), suite("broadcast")( test("Values") { ZStream diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 4f3b50435714..e1c22d39dc3e 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -74,12 +74,6 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { type Identity[A] = A - def branchAfter[R, E, I](n: Int)(f: Chunk[I] => ZPipeline[R, E, I, I]): ZPipeline[R, E, I, I] = - new ZPipeline[R, E, I, I] { - def apply[R1 <: R, E1 >: E](stream: ZStream[R1, E1, I])(implicit trace: ZTraceElement): ZStream[R1, E1, I] = - stream.branchAfter(n)(f) - } - /** * Creates a pipeline that collects elements with the specified partial function. * diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index 6bf3981960bf..f32800178aaa 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -73,14 +73,6 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A def >>=[R1 <: R, E1 >: E, A2](f0: A => ZStream[R1, E1, A2])(implicit trace: ZTraceElement): ZStream[R1, E1, A2] = flatMap(f0) - /** - * Symbolic alias for [[ZStream#via]]. - */ - def >>>[R1 <: R, E1 >: E, A2 >: A, A3](pipeline: ZPipeline[R1, E1, A2, A3])(implicit - trace: ZTraceElement - ): ZStream[R1, E1, A3] = - via(pipeline) - /** * Symbolic alias for [[[zio.stream.ZStream!.run[R1<:R,E1>:E,B]*]]]. */ @@ -247,44 +239,6 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A def as[A2](A2: => A2)(implicit trace: ZTraceElement): ZStream[R, E, A2] = map(_ => A2) - /** - * Reads the first n values from the stream and uses them to choose the pipeline that will be - * used for the remainder of the stream. - */ - def branchAfter[R1 <: R, E1 >: E, A1 >: A]( - n: Int - )(f: Chunk[A1] => ZPipeline[R1, E1, A1, A1])(implicit trace: ZTraceElement): ZStream[R1, E1, A1] = { - def collecting(buf: Chunk[A1]): ZChannel[R1, E1, Chunk[A1], Any, E1, Chunk[A1], Any] = - ZChannel.readWithCause( - (chunk: Chunk[A1]) => { - val newBuf = buf ++ chunk - if (newBuf.length >= n) { - val (is, is1) = newBuf.splitAt(n) - val pipeline = f(is) - pipeline(ZStream.fromChunk(is1)).channel *> emitting(pipeline) - } else - collecting(newBuf) - }, - (cause: Cause[E1]) => ZChannel.failCause(cause), - (_: Any) => - if (buf.isEmpty) - ZChannel.unit - else { - val pipeline = f(buf) - pipeline(ZStream.empty).channel - } - ) - - def emitting(pipeline: ZPipeline[R1, E1, A1, A1]): ZChannel[R1, E1, Chunk[A1], Any, E1, Chunk[A1], Any] = - ZChannel.readWithCause( - (chunk: Chunk[A1]) => pipeline(ZStream.fromChunk(chunk)).channel *> emitting(pipeline), - (cause: Cause[E1]) => ZChannel.failCause(cause), - (_: Any) => ZChannel.unit - ) - - new ZStream(self.channel >>> collecting(Chunk.empty)) - } - /** * Returns a stream whose failure and success channels have been mapped by * the specified pair of functions, `f` and `g`. @@ -3855,14 +3809,6 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A ): ZStream[R2, E2, A2] = f(self) - /** - * Threads the stream through a transformation pipeline. - */ - final def via[R2 <: R, E2 >: E, A2 >: A, A3](pipeline: ZPipeline[R2, E2, A2, A3])(implicit - trace: ZTraceElement - ): ZStream[R2, E2, A3] = - pipeline(self) - /** * Returns this stream if the specified condition is satisfied, otherwise returns an empty stream. */ From d367fae820d58eabd0d13b8b4daa7df0bc0c0e86 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 16:42:34 -0600 Subject: [PATCH 07/15] delete stream aspect --- .../ZPipelineVersionSpecific.scala | 53 +++++++++++++ .../ZPipelineCompanionVersionSpecific.scala | 36 +++++++++ .../zio/stream/experimental/ZPipeline.scala | 24 ++++++ .../zio/stream/experimental/ZStream.scala | 39 ++++++++-- .../stream/experimental/ZStreamAspect.scala | 75 ------------------- 5 files changed, 144 insertions(+), 83 deletions(-) delete mode 100644 streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala diff --git a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala index 0cdaa97cd863..e26e2b5833ad 100644 --- a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala @@ -130,6 +130,59 @@ trait ZPipelineVersionSpecific[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +Lowe } } + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of this pipeline, and then applying the transformation of + * the specified pipeline. + */ + def @@[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2]( + that: ZPipeline[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, that.OutEnv] @uncheckedVariance, + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, that.OutErr] @uncheckedVariance, + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, that.OutElem] @uncheckedVariance + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + new ZPipeline[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper + ] { + type OutEnv[Env] = composeEnv.Out[Env] + type OutErr[Err] = composeErr.Out[Err] + type OutElem[Elem] = composeElem.Out[Elem] + def apply[ + Env >: composeEnv.Lower <: composeEnv.Upper, + Err >: composeErr.Lower <: composeErr.Upper, + Elem >: composeElem.Lower <: composeElem.Upper + ]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]] = { + val left = self.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](stream) + val right = that.asInstanceOf[ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any]](left) + right.asInstanceOf[ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]] + } + } + /** * A named version of the `>>>` operator. */ diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala index 87e581801590..10ada8fa7e1c 100644 --- a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -131,6 +131,42 @@ trait ZPipelineCompanionVersionSpecific { } } + /** + * Composes two pipelines into one pipeline, by first applying the + * transformation of this pipeline, and then applying the transformation of + * the specified pipeline. + */ + def @@[LowerEnv2, UpperEnv2, LowerErr2, UpperErr2, LowerElem2, UpperElem2, OutEnv2[Env], OutErr2[Err], OutElem2[ + Elem + ]]( + that: ZPipeline.WithOut[ + LowerEnv2, + UpperEnv2, + LowerErr2, + UpperErr2, + LowerElem2, + UpperElem2, + OutEnv2, + OutErr2, + OutElem2 + ] + )(implicit + composeEnv: Compose[LowerEnv, UpperEnv, OutEnv, LowerEnv2, UpperEnv2, OutEnv2], + composeErr: Compose[LowerErr, UpperErr, OutErr, LowerErr2, UpperErr2, OutErr2], + composeElem: Compose[LowerElem, UpperElem, OutElem, LowerElem2, UpperElem2, OutElem2] + ): ZPipeline.WithOut[ + composeEnv.Lower, + composeEnv.Upper, + composeErr.Lower, + composeErr.Upper, + composeElem.Lower, + composeElem.Upper, + composeEnv.Out, + composeErr.Out, + composeElem.Out + ] = + self >>> that + /** * A named version of the `>>>` operator. */ diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index e1c22d39dc3e..c62a9b754eb9 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -266,6 +266,30 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { stream.provide(env) } + /** + * A pipeline that rechunks the stream into chunks of the specified size. + */ + def rechunk(n: Int): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + Any, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, Any] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem](stream: ZStream[Env, Err, Elem])(implicit + trace: ZTraceElement + ): ZStream[Env, Err, Elem] = + stream.rechunk(n) + } + /** * Creates a pipeline that scans elements with the specified function. */ diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index f32800178aaa..dc123e561bef 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -16,14 +16,6 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A import ZStream.TerminationStrategy - /** - * Syntax for adding aspects. - */ - final def @@[LowerR <: UpperR, UpperR <: R, LowerE >: E, UpperE >: LowerE, LowerA >: A, UpperA >: LowerA]( - aspect: ZStreamAspect[LowerR, UpperR, LowerE, UpperE, LowerA, UpperA] - )(implicit trace: ZTraceElement): ZStream[UpperR, LowerE, LowerA] = - aspect(self) - /** * Symbolic alias for [[ZStream#cross]]. */ @@ -6151,4 +6143,35 @@ object ZStream extends ZStreamPlatformSpecificConstructors { self.combineChunks[R1, E1, State, (K, B), (K, C)](that)(PullBoth)(pull) } } + + /** + * Provides syntax for applying pipelines to streams. + */ + implicit class PipelineSyntax[Env, Err, Elem](private val self: ZStream[Env, Err, Elem]) extends AnyVal { + + /** + * Symbolic alias for [[ZStream#via]]. + */ + @deprecated("2.0.0", "use @@") + def >>>[LowerEnv <: Env, UpperEnv >: Env, LowerErr <: Err, UpperErr >: Err, LowerElem <: Elem, UpperElem >: Elem]( + pipeline: ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] + )(implicit trace: ZTraceElement): ZStream[pipeline.OutEnv[Env], pipeline.OutErr[Err], pipeline.OutElem[Elem]] = + pipeline(self) + + /** + * Syntax for applying pipelines. + */ + def @@[LowerEnv <: Env, UpperEnv >: Env, LowerErr <: Err, UpperErr >: Err, LowerElem <: Elem, UpperElem >: Elem]( + pipeline: ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] + )(implicit trace: ZTraceElement): ZStream[pipeline.OutEnv[Env], pipeline.OutErr[Err], pipeline.OutElem[Elem]] = + pipeline(self) + + /** + * Threads the stream through a transformation pipeline. + */ + def via[LowerEnv <: Env, UpperEnv >: Env, LowerErr <: Err, UpperErr >: Err, LowerElem <: Elem, UpperElem >: Elem]( + pipeline: ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] + )(implicit trace: ZTraceElement): ZStream[pipeline.OutEnv[Env], pipeline.OutErr[Err], pipeline.OutElem[Elem]] = + pipeline(self) + } } diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala deleted file mode 100644 index b018eac626cd..000000000000 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStreamAspect.scala +++ /dev/null @@ -1,75 +0,0 @@ -package zio.stream.experimental - -import zio.ZTraceElement -import zio.stacktracer.TracingImplicits.disableAutoTrace - -trait ZStreamAspect[+LowerR, -UpperR, +LowerE, -UpperE, +LowerA, -UpperA] { self => - - def apply[R >: LowerR <: UpperR, E >: LowerE <: UpperE, A >: LowerA <: UpperA]( - stream: ZStream[R, E, A] - )(implicit trace: ZTraceElement): ZStream[R, E, A] - - def >>>[ - LowerR1 >: LowerR, - UpperR1 <: UpperR, - LowerE1 >: LowerE, - UpperE1 <: UpperE, - LowerA1 >: LowerA, - UpperA1 <: UpperA - ]( - that: ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] - ): ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = - self.andThen(that) - - /** - * Returns a new aspect that represents the sequential composition of this - * aspect with the specified one. - */ - def @@[ - LowerR1 >: LowerR, - UpperR1 <: UpperR, - LowerE1 >: LowerE, - UpperE1 <: UpperE, - LowerA1 >: LowerA, - UpperA1 <: UpperA - ]( - that: ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] - ): ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = - self >>> that - - def andThen[ - LowerR1 >: LowerR, - UpperR1 <: UpperR, - LowerE1 >: LowerE, - UpperE1 <: UpperE, - LowerA1 >: LowerA, - UpperA1 <: UpperA - ]( - that: ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] - ): ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] = - new ZStreamAspect[LowerR1, UpperR1, LowerE1, UpperE1, LowerA1, UpperA1] { - def apply[R >: LowerR1 <: UpperR1, E >: LowerE1 <: UpperE1, A >: LowerA1 <: UpperA1]( - stream: ZStream[R, E, A] - )(implicit trace: ZTraceElement): ZStream[R, E, A] = - that(self(stream)) - } -} - -object ZStreamAspect { - - /** - * An aspect that rechunks the stream into chunks of the specified size. - */ - @deprecated("use rechunk", "2.0.0") - def chunkN(n: Int): ZStreamAspect[Nothing, Any, Nothing, Any, Nothing, Any] = - rechunk(n) - - /** - * An aspect that rechunks the stream into chunks of the specified size. - */ - def rechunk(n: Int): ZStreamAspect[Nothing, Any, Nothing, Any, Nothing, Any] = - new ZStreamAspect[Nothing, Any, Nothing, Any, Nothing, Any] { - def apply[R, E, A](stream: ZStream[R, E, A])(implicit trace: ZTraceElement): ZStream[R, E, A] = - stream.rechunk(n) - } -} From 05637ade641bb19e2015c6ed7f97029796e3de2d Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 18:00:50 -0600 Subject: [PATCH 08/15] implement more operators --- .../zio/stream/experimental/ZStreamSpec.scala | 85 +++++++++++++++++++ .../zio/stream/experimental/ZPipeline.scala | 49 +++++++++++ .../zio/stream/experimental/ZStream.scala | 69 +++++++++++++++ 3 files changed, 203 insertions(+) diff --git a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala index e758764740de..cccab270aa2b 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala @@ -367,6 +367,91 @@ object ZStreamSpec extends ZIOBaseSpec { } yield assert(leftAssoc -> rightAssoc)(equalTo(true -> true)) ) ), + suite("branchAfter")( + test("switches pipelines") { + check(Gen.chunkOf(Gen.int)) { data => + val test = + ZStream + .fromChunk(0 +: data) + .branchAfter(1) { values => + values.toList match { + case 0 :: Nil => ZPipeline.identity + case _ => ??? + } + } + .runCollect + assertM(test.exit)(succeeds(equalTo(data))) + } + }, + // test("finalizes transducers") { + // check(Gen.chunkOf(Gen.int)) { data => + // val test = + // Ref.make(0).flatMap { ref => + // ZStream + // .fromChunk(data) + // .branchAfter(1) { values => + // values.toList match { + // case _ => + // ZTransducer { + // Managed.acquireReleaseWith( + // ref + // .update(_ + 1) + // .as[Option[Chunk[Int]] => UIO[Chunk[Int]]] { + // case None => ZIO.succeedNow(Chunk.empty) + // case Some(c) => ZIO.succeedNow(c) + // } + // )(_ => ref.update(_ - 1)) + // } + // } + // } + // .runDrain *> ref.get + // } + // assertM(test.exit)(succeeds(equalTo(0))) + // } + // }, + // test("finalizes transducers - inner transducer fails") { + // check(Gen.chunkOf(Gen.int)) { data => + // val test = + // Ref.make(0).flatMap { ref => + // ZStream + // .fromChunk(data) + // .branchAfter(1) { values => + // values.toList match { + // case _ => + // ZTransducer { + // Managed.acquireReleaseWith( + // ref + // .update(_ + 1) + // .as[Option[Chunk[Int]] => IO[String, Chunk[Int]]] { case _ => + // ZIO.fail("boom") + // } + // )(_ => ref.update(_ - 1)) + // } + // } + // } + // .runDrain + // .ignore *> ref.get + // } + // assertM(test.exit)(succeeds(equalTo(0))) + // } + // }, + test("emits data if less than n are collected") { + val gen = + for { + data <- Gen.chunkOf(Gen.int) + n <- Gen.int.filter(_ > data.length) + } yield (data, n) + + check(gen) { case (data, n) => + val test = + ZStream + .fromChunk(data) + .branchAfter(n)(ZPipeline.prepend) + .runCollect + assertM(test.exit)(succeeds(equalTo(data))) + } + } + ), suite("broadcast")( test("Values") { ZStream diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index c62a9b754eb9..11e3af3e7edd 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -190,6 +190,31 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { stream.filter(f) } + /** + * Creates a pipeline that groups on adjacent keys, calculated by the + * specified keying function. + */ + def groupAdjacentBy[In, Key](f: In => Key): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + Nothing, + In, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = (Key, NonEmptyChunk[Elem]) })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, Nothing, In] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = (Key, NonEmptyChunk[Elem]) + def apply[Env, Err, Elem <: In](stream: ZStream[Env, Err, Elem])(implicit + trace: ZTraceElement + ): ZStream[Env, Err, (Key, NonEmptyChunk[Elem])] = + stream.groupAdjacentBy(f) + } + /** * The identity pipeline, which does not modify streams in any way. */ @@ -240,6 +265,30 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { stream.map(f) } + /** + * Emits the provided chunk before emitting any other value. + */ + def prepend[In](values: Chunk[In]): ZPipeline.WithOut[ + Nothing, + Any, + Nothing, + Any, + In, + Any, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[Nothing, Any, Nothing, Any, In, Any] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env, Err, Elem >: In](stream: ZStream[Env, Err, Elem])(implicit + trace: ZTraceElement + ): ZStream[Env, Err, Elem] = + ZStream.fromChunk(values) ++ stream + } + /** * Creates a pipeline that provides the specified environment. */ diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index dc123e561bef..2e616a78c80a 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -6166,6 +6166,75 @@ object ZStream extends ZStreamPlatformSpecificConstructors { )(implicit trace: ZTraceElement): ZStream[pipeline.OutEnv[Env], pipeline.OutErr[Err], pipeline.OutElem[Elem]] = pipeline(self) + /** + * Reads the first n values from the stream and uses them to choose the pipeline that will be + * used for the remainder of the stream. + */ + def branchAfter[ + LowerEnv <: Env, + UpperEnv >: Env, + LowerErr <: Err, + UpperErr >: Err, + LowerElem <: Elem, + UpperElem >: Elem + ]( + n: Int + )( + f: Chunk[Elem] => ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] + )(implicit trace: ZTraceElement): ZStream[Env, Err, Elem] = { + def collecting(buf: Chunk[Elem]): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = + ZChannel.readWithCause( + (chunk: Chunk[Elem]) => { + val newBuf = buf ++ chunk + if (newBuf.length >= n) { + val (is, is1) = newBuf.splitAt(n) + val pipeline = f(is) + pipeline(ZStream.fromChunk(is1)).channel *> emitting(pipeline) + } else + collecting(newBuf) + }, + (cause: Cause[Err]) => ZChannel.failCause(cause), + (_: Any) => + if (buf.isEmpty) + ZChannel.unit + else { + val pipeline = f(buf) + pipeline(ZStream.empty).channel + } + ) + + def emitting( + pipeline: ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] + ): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = + ZChannel.readWithCause( + (chunk: Chunk[Elem]) => pipeline(ZStream.fromChunk(chunk)).channel *> emitting(pipeline), + (cause: Cause[Err]) => ZChannel.failCause(cause), + (_: Any) => ZChannel.unit + ) + + new ZStream(self.channel >>> collecting(Chunk.empty)) + } + /** * Threads the stream through a transformation pipeline. */ From 2651e83971c08d273dc8e69083382ee5e3646e84 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 20:45:33 -0600 Subject: [PATCH 09/15] generalize branchafter --- .../zio/stream/experimental/ZPipeline.scala | 34 +++++++++++++++++++ .../zio/stream/experimental/ZStream.scala | 15 ++++---- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 11e3af3e7edd..901b5d066266 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -17,6 +17,7 @@ package zio.stream.experimental import zio._ +import zio.internal.stacktracer.Tracer /** * A `ZPipeline` is a polymorphic stream transformer. Pipelines @@ -74,6 +75,39 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { type Identity[A] = A + def branchAfter[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem, OutElem0[Elem]](n: Int)( + f: Chunk[UpperElem] => ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + OutElem0 + ] + ): ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + OutElem0 + ] = + new ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = OutElem0[Elem] + def apply[Env >: LowerEnv <: UpperEnv, Err >: LowerErr <: UpperErr, Elem >: LowerElem <: UpperElem]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[Env, Err, OutElem[Elem]] = + stream.branchAfter(n)(f) + } + /** * Creates a pipeline that collects elements with the specified partial function. * diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index 2e616a78c80a..98932dabd1b0 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -6176,7 +6176,8 @@ object ZStream extends ZStreamPlatformSpecificConstructors { LowerErr <: Err, UpperErr >: Err, LowerElem <: Elem, - UpperElem >: Elem + UpperElem >: Elem, + OutElem[Elem] ]( n: Int )( @@ -6189,10 +6190,10 @@ object ZStream extends ZStreamPlatformSpecificConstructors { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - ({ type OutElem[Elem] = Elem })#OutElem + OutElem ] - )(implicit trace: ZTraceElement): ZStream[Env, Err, Elem] = { - def collecting(buf: Chunk[Elem]): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = + )(implicit trace: ZTraceElement): ZStream[Env, Err, OutElem[Elem]] = { + def collecting(buf: Chunk[Elem]): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[OutElem[Elem]], Any] = ZChannel.readWithCause( (chunk: Chunk[Elem]) => { val newBuf = buf ++ chunk @@ -6209,7 +6210,7 @@ object ZStream extends ZStreamPlatformSpecificConstructors { ZChannel.unit else { val pipeline = f(buf) - pipeline(ZStream.empty).channel + pipeline[UpperEnv, LowerErr, Elem](ZStream.empty).channel } ) @@ -6223,9 +6224,9 @@ object ZStream extends ZStreamPlatformSpecificConstructors { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - ({ type OutElem[Elem] = Elem })#OutElem + OutElem ] - ): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = + ): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[OutElem[Elem]], Any] = ZChannel.readWithCause( (chunk: Chunk[Elem]) => pipeline(ZStream.fromChunk(chunk)).channel *> emitting(pipeline), (cause: Cause[Err]) => ZChannel.failCause(cause), From bc6d3ebd88c03b7c16c3a6842a9ce07e25df3822 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 21:05:16 -0600 Subject: [PATCH 10/15] add back some tests --- .../zio/stream/experimental/ZStreamSpec.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala index cccab270aa2b..337d7b0c81fc 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala @@ -1752,6 +1752,39 @@ object ZStreamSpec extends ZIOBaseSpec { sum <- ref.get } yield assert(sum)(equalTo(10)) }, + suite("groupAdjacentBy")( + test("groupAdjacentBy 1")(check(Gen.chunkOf(Gen.chunkOf(Gen.int))) { iss => + val keyFn = (x: Int) => x % 2 == 0 + + ZStream.fromChunk(iss).flattenChunks.groupAdjacentBy(keyFn).runCollect.map { oss => + val splat = oss.foldLeft[Chunk[Int]](Chunk.empty) { case (acc, (_, is)) => acc ++ is.toChunk } + + def verifyInside(in: Chunk[(Boolean, NonEmptyChunk[Int])]) = + in.map { case (k, xs) => xs.forall(keyFn(_) == k) } + + def verifyAdjacentKeys(in: Chunk[(Boolean, Any)]): Boolean = + in.sliding(2, 1) + .foldLeft(true)((res, chunk) => res && (chunk.length == 1 || chunk(0)._1 != chunk(1)._1)) + + assert(splat)(equalTo(iss.flatten)) && + assert(verifyInside(oss))(forall(isTrue)) && + assert(verifyAdjacentKeys(oss))(isTrue) + } + }), + test("groupAdjacentBy 2") { + assertM( + ZStream((1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (1, 4)).groupAdjacentBy(_._1).runCollect + )( + equalTo( + Chunk( + (1, NonEmptyChunk((1, 1), (1, 2), (1, 3))), + (2, NonEmptyChunk((2, 1), (2, 2))), + (1, NonEmptyChunk((1, 4))) + ) + ) + ) + } + ), suite("groupBy")( test("values XYZ") { val words = List.fill(100)(0 to 100).flatten.map(_.toString()) From f27bd7c6b5da9fe80a82de42680623e50d3875c2 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 21:53:00 -0600 Subject: [PATCH 11/15] fix compilation error --- .../scala/zio/stream/experimental/ZPipeline.scala | 8 ++++---- .../scala/zio/stream/experimental/ZStream.scala | 15 +++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 901b5d066266..744440d769a6 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -85,7 +85,7 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - OutElem0 + ({ type OutElem[Elem] = Elem })#OutElem ] ): ZPipeline.WithOut[ LowerEnv, @@ -96,15 +96,15 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - OutElem0 + ({ type OutElem[Elem] = Elem })#OutElem ] = new ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] { type OutEnv[Env] = Env type OutErr[Err] = Err - type OutElem[Elem] = OutElem0[Elem] + type OutElem[Elem] = Elem def apply[Env >: LowerEnv <: UpperEnv, Err >: LowerErr <: UpperErr, Elem >: LowerElem <: UpperElem]( stream: ZStream[Env, Err, Elem] - )(implicit trace: ZTraceElement): ZStream[Env, Err, OutElem[Elem]] = + )(implicit trace: ZTraceElement): ZStream[Env, Err, Elem] = stream.branchAfter(n)(f) } diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala index 98932dabd1b0..2e616a78c80a 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZStream.scala @@ -6176,8 +6176,7 @@ object ZStream extends ZStreamPlatformSpecificConstructors { LowerErr <: Err, UpperErr >: Err, LowerElem <: Elem, - UpperElem >: Elem, - OutElem[Elem] + UpperElem >: Elem ]( n: Int )( @@ -6190,10 +6189,10 @@ object ZStream extends ZStreamPlatformSpecificConstructors { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - OutElem + ({ type OutElem[Elem] = Elem })#OutElem ] - )(implicit trace: ZTraceElement): ZStream[Env, Err, OutElem[Elem]] = { - def collecting(buf: Chunk[Elem]): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[OutElem[Elem]], Any] = + )(implicit trace: ZTraceElement): ZStream[Env, Err, Elem] = { + def collecting(buf: Chunk[Elem]): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = ZChannel.readWithCause( (chunk: Chunk[Elem]) => { val newBuf = buf ++ chunk @@ -6210,7 +6209,7 @@ object ZStream extends ZStreamPlatformSpecificConstructors { ZChannel.unit else { val pipeline = f(buf) - pipeline[UpperEnv, LowerErr, Elem](ZStream.empty).channel + pipeline(ZStream.empty).channel } ) @@ -6224,9 +6223,9 @@ object ZStream extends ZStreamPlatformSpecificConstructors { UpperElem, ({ type OutEnv[Env] = Env })#OutEnv, ({ type OutErr[Err] = Err })#OutErr, - OutElem + ({ type OutElem[Elem] = Elem })#OutElem ] - ): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[OutElem[Elem]], Any] = + ): ZChannel[Env, Err, Chunk[Elem], Any, Err, Chunk[Elem], Any] = ZChannel.readWithCause( (chunk: Chunk[Elem]) => pipeline(ZStream.fromChunk(chunk)).channel *> emitting(pipeline), (cause: Cause[Err]) => ZChannel.failCause(cause), From 8be138bb71b312217958d32e7a559b961daef4e7 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 22:26:20 -0600 Subject: [PATCH 12/15] add pipeline version of branchafter --- .../zio/stream/experimental/ZPipeline.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 11e3af3e7edd..99ca863d901a 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -74,6 +74,39 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific { type Identity[A] = A + def branchAfter[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem, OutElem0[Elem]](n: Int)( + f: Chunk[UpperElem] => ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] + ): ZPipeline.WithOut[ + LowerEnv, + UpperEnv, + LowerErr, + UpperErr, + LowerElem, + UpperElem, + ({ type OutEnv[Env] = Env })#OutEnv, + ({ type OutErr[Err] = Err })#OutErr, + ({ type OutElem[Elem] = Elem })#OutElem + ] = + new ZPipeline[LowerEnv, UpperEnv, LowerErr, UpperErr, LowerElem, UpperElem] { + type OutEnv[Env] = Env + type OutErr[Err] = Err + type OutElem[Elem] = Elem + def apply[Env >: LowerEnv <: UpperEnv, Err >: LowerErr <: UpperErr, Elem >: LowerElem <: UpperElem]( + stream: ZStream[Env, Err, Elem] + )(implicit trace: ZTraceElement): ZStream[Env, Err, Elem] = + stream.branchAfter(n)(f) + } + /** * Creates a pipeline that collects elements with the specified partial function. * From b9a2fcc44d462ffa931bd9d87bf7e3d89c01d8df Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 22:41:44 -0600 Subject: [PATCH 13/15] remove unused import --- .../src/main/scala/zio/stream/experimental/ZPipeline.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 744440d769a6..99ca863d901a 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -17,7 +17,6 @@ package zio.stream.experimental import zio._ -import zio.internal.stacktracer.Tracer /** * A `ZPipeline` is a polymorphic stream transformer. Pipelines From 992fdcd44537c85c98af3d77709f500d875e190e Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Sun, 31 Oct 2021 22:54:32 -0600 Subject: [PATCH 14/15] delete test --- .../zio/stream/experimental/ZStreamSpec.scala | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala index 337d7b0c81fc..cccab270aa2b 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/experimental/ZStreamSpec.scala @@ -1752,39 +1752,6 @@ object ZStreamSpec extends ZIOBaseSpec { sum <- ref.get } yield assert(sum)(equalTo(10)) }, - suite("groupAdjacentBy")( - test("groupAdjacentBy 1")(check(Gen.chunkOf(Gen.chunkOf(Gen.int))) { iss => - val keyFn = (x: Int) => x % 2 == 0 - - ZStream.fromChunk(iss).flattenChunks.groupAdjacentBy(keyFn).runCollect.map { oss => - val splat = oss.foldLeft[Chunk[Int]](Chunk.empty) { case (acc, (_, is)) => acc ++ is.toChunk } - - def verifyInside(in: Chunk[(Boolean, NonEmptyChunk[Int])]) = - in.map { case (k, xs) => xs.forall(keyFn(_) == k) } - - def verifyAdjacentKeys(in: Chunk[(Boolean, Any)]): Boolean = - in.sliding(2, 1) - .foldLeft(true)((res, chunk) => res && (chunk.length == 1 || chunk(0)._1 != chunk(1)._1)) - - assert(splat)(equalTo(iss.flatten)) && - assert(verifyInside(oss))(forall(isTrue)) && - assert(verifyAdjacentKeys(oss))(isTrue) - } - }), - test("groupAdjacentBy 2") { - assertM( - ZStream((1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (1, 4)).groupAdjacentBy(_._1).runCollect - )( - equalTo( - Chunk( - (1, NonEmptyChunk((1, 1), (1, 2), (1, 3))), - (2, NonEmptyChunk((2, 1), (2, 2))), - (1, NonEmptyChunk((1, 4))) - ) - ) - ) - } - ), suite("groupBy")( test("values XYZ") { val words = List.fill(100)(0 to 100).flatten.map(_.toString()) From 2ce970db20f662e5ef57252a26260f515a7c2135 Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Mon, 1 Nov 2021 00:07:03 -0600 Subject: [PATCH 15/15] disable automatic tracing --- .../stream/experimental/ZPipelineCompanionVersionSpecific.scala | 2 +- .../zio/stream/experimental/ZPipelineVersionSpecific.scala | 1 + .../stream/experimental/ZPipelineCompanionVersionSpecific.scala | 1 + .../zio/stream/experimental/ZPipelineVersionSpecific.scala | 2 ++ .../src/main/scala/zio/stream/experimental/ZPipeline.scala | 1 + 5 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala index ddb9120a890d..d203ed54803f 100644 --- a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -16,6 +16,6 @@ package zio.stream.experimental -import zio._ +import zio.stacktracer.TracingImplicits.disableAutoTrace trait ZPipelineCompanionVersionSpecific diff --git a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala index e26e2b5833ad..663f7e3549ad 100644 --- a/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.11-2.12/zio/stream/experimental/ZPipelineVersionSpecific.scala @@ -17,6 +17,7 @@ package zio.stream.experimental import zio._ +import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.unchecked.uncheckedVariance diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala index 10ada8fa7e1c..54c148679a33 100644 --- a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineCompanionVersionSpecific.scala @@ -17,6 +17,7 @@ package zio.stream.experimental import zio._ +import zio.stacktracer.TracingImplicits.disableAutoTrace trait ZPipelineCompanionVersionSpecific { import ZPipeline._ diff --git a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala index c49c732e842d..b53d0551858e 100644 --- a/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala +++ b/streams/shared/src/main/scala-2.13+/zio/stream/experimental/ZPipelineVersionSpecific.scala @@ -16,4 +16,6 @@ package zio.stream.experimental +import zio.stacktracer.TracingImplicits.disableAutoTrace + trait ZPipelineVersionSpecific[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem] diff --git a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala index 99ca863d901a..07c3b2f0657e 100644 --- a/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/experimental/ZPipeline.scala @@ -17,6 +17,7 @@ package zio.stream.experimental import zio._ +import zio.stacktracer.TracingImplicits.disableAutoTrace /** * A `ZPipeline` is a polymorphic stream transformer. Pipelines