8000 Exposing the compression channels as pipelines by vigoo · Pull Request #5879 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Exposing the compression channels as pipelines #5879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package zio.stream.experimental

import zio._
import zio.stream.compression.TestData._
import zio.test._
import zio.test.Assertion._

import java.util.zip.Deflater

object ZPipelinePlatformSpecificSpec extends ZIOBaseSpec {

def spec: ZSpec[Environment, Failure] = suite("ZPipeline JVM experimental")(
suite("Constructors")(
suite("Deflate")(
test("JDK inflates what was deflated")(
check(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (input, n, bufferSize) =>
assertM(for {
deflated <-
ZStream
.fromIterable(input)
.rechunk(n)
.via(ZPipeline.deflate[Any, Nothing](bufferSize))
.runCollect
inflated <- jdkInflate(deflated, noWrap = false)
} yield inflated)(equalTo(input))
}
)
),
suite("Inflate")(
test("inflate what JDK deflated")(
check(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (chunk, n, bufferSize) =>
assertM(for {
out <-
ZStream
.fromIterable(jdkDeflate(chunk.toArray, new Deflater()))
.rechunk(n)
.via(ZPipeline.inflate[Any](bufferSize))
.runCollect
} yield out.toList)(equalTo(chunk))
}
)
),
suite("Gunzip")(
test("gunzip what JDK gzipped, nowrap")(
check(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (chunk, n, bufferSize) =>
assertM(for {
out <- ZStream
.fromIterable(jdkGzip(chunk.toArray))
.rechunk(n)
.via(ZPipeline.gunzip[Any](bufferSize))
.runCollect
} yield out.toList)(equalTo(chunk))
}
)
),
suite("Gzip")(
test("JDK gunzips what was gzipped")(
check(Gen.listOfBounded(0, `1K`)(Gen.byte).zip(Gen.int(1, `1K`)).zip(Gen.int(1, `1K`))) {
case (input, n, bufferSize) =>
assertM(for {
gzipped <- ZStream
.fromIterable(input)
.rechunk(n)
.via(ZPipeline.gzip[Any, Nothing](bufferSize))
.runCollect
inflated <- jdkGunzip(gzipped)
} yield inflated)(equalTo(input))
}
)
)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,5 @@ trait ZStreamPlatformSpecificConstructors {
}

trait ZSinkPlatformSpecificConstructors

trait ZPipelinePlatformSpecificConstructors
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.stream.experimental

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace
import zio.stream.compression.{CompressionException, CompressionLevel, CompressionStrategy, FlushMode}

import java.io._
import java.net.InetSocketAddress
Expand Down Expand Up @@ -565,3 +565,87 @@ trait ZSinkPlatformSpecificConstructors {
}

}

trait ZPipelinePlatformSpecificConstructors {
def deflate[Env, Err](
bufferSize: Int = 64 * 1024,
noWrap: Boolean = false,
level: CompressionLevel = CompressionLevel.DefaultCompression,
strategy: CompressionStrategy = CompressionStrategy.DefaultStrategy,
flushMode: FlushMode = FlushMode.NoFlush
)(implicit trace: ZTraceElement): ZPipeline.WithOut[
Env,
Env,
Err,
Err,
Byte,
Byte,
({ type OutEnv[Env0] = Env })#OutEnv,
({ type OutErr[Err0] = Err })#OutErr,
({ type OutElem[Elem] = Byte })#OutElem
] =
ZPipeline.fromChannel(
Deflate.makeDeflater(
bufferSize,
noWrap,
level,
strategy,
flushMode
)
)

def inflate[Env](
bufferSize: Int = 64 * 1024,
noWrap: Boolean = false
)(implicit trace: ZTraceElement): ZPipeline.WithOut[
Env,
Env,
CompressionException,
CompressionException,
Byte,
Byte,
({ type OutEnv[Env0] = Env })#OutEnv,
({ type OutErr[Err0] = CompressionException })#OutErr,
({ type OutElem[Elem] = Byte })#OutElem
] =
ZPipeline.fromChannel(
Inflate.makeInflater(bufferSize, noWrap)
)

def gzip[Env, Err](
bufferSize: Int = 64 * 1024,
level: CompressionLevel = CompressionLevel.DefaultCompression,
strategy: CompressionStrategy = CompressionStrategy.DefaultStrategy,
flushMode: FlushMode = FlushMode.NoFlush
)(implicit trace: ZTraceElement): ZPipeline.WithOut[
Env,
Env,
Err,
Err,
Byte,
Byte,
({ type OutEnv[Env0] = Env })#OutEnv,
({ type OutErr[Err0] = Err })#OutErr,
({ type OutElem[Elem] = Byte })#OutElem
] =
ZPipeline.fromChannel(
Gzip.makeGzipper(bufferSize, level, strategy, flushMode)
)

def gunzip[Env](bufferSize: Int = 64 * 1024)(implicit
trace: ZTraceElement
): ZPipeline.WithOut[
Env,
Env,
CompressionException,
CompressionException,
Byte,
Byte,
({ type OutEnv[Env0] = Env })#OutEnv,
({ type OutErr[Err0] = CompressionException })#OutErr,
({ type OutElem[Elem] = Byte })#OutElem
] =
ZPipeline.fromChannel(
Gunzip.makeGunzipper(bufferSize)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,5 @@ trait ZStreamPlatformSpecificConstructors {
}

trait ZSinkPlatformSpecificConstructors

trait ZPipelinePlatformSpecificConstructors
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ trait ZPipeline[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperEl
): ZStream[OutEnv[Env], OutErr[Err], OutElem[Elem]]
}

object ZPipeline extends ZPipelineCompanionVersionSpecific {
object ZPipeline extends ZPipelineCompanionVersionSpecific with ZPipelinePlatformSpecificConstructors {

type WithOut[+LowerEnv, -UpperEnv, +LowerErr, -UpperErr, +LowerElem, -UpperElem, OutEnv0[Env], OutErr0[Err], Out0[
Elem
Expand Down Expand Up @@ -249,6 +249,32 @@ object ZPipeline extends ZPipelineCompanionVersionSpecific {
stream.groupAdjacentBy(f)
}

/**
* Creates a pipeline that sends all the elements through the given channel
*/
def fromChannel[InEnv, OutEnv0, InErr, OutErr0, In, Out](
channel: ZChannel[OutEnv0, InErr, Chunk[In], Any, OutErr0, Chunk[Out], Any]
): ZPipeline.WithOut[
OutEnv0,
InEnv,
InErr,
InErr,
In,
In,
({ type OutEnv[Env] = OutEnv0 })#OutEnv,
({ type OutErr[Err] = OutErr0 })#OutErr,
({ type OutElem[Elem] = Out })#OutElem
] = new ZPipeline[OutEnv0, InEnv, InErr, InErr, In, In] {
override type OutEnv[Env] = OutEnv0
override type OutErr[Err] = OutErr0
override type OutElem[Elem] = Out

override def apply[Env >: OutEnv0 <: InEnv, Err >: InErr <: InErr, Elem >: In <: In](
stream: ZStream[Env, Err, Elem]
)(implicit trace: ZTraceElement): ZStream[OutEnv0, OutErr0, Out] =
stream.pipeThroughChannel(channel)
}

/**
* The identity pipeline, which does not modify streams in any way.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2743,6 +2743,14 @@ class ZStream[-R, +E, +A](val channel: ZChannel[R, Any, Any, Any, E, Chunk[A], A
): ZStream[R1, E2, L] =
new ZStream(self.channel >>> sink.channel)

/**
* Pipes all the values from this stream through the provided channel
*/
def pipeThroughChannel[R1 <: R, E2, A2](channel: ZChannel[R1, E, Chunk[A], Any, E2, Chunk[A2], Any])(implicit
trace: ZTraceElement
): ZStream[R1, E2, A2] =
new ZStream(self.channel >>> channel)

/**
* Provides the stream with its required environment, which eliminates
* its dependency on `R`.
Expand Down Expand Up @@ -6300,12 +6308,12 @@ object ZStream extends ZStreamPlatformSpecificConstructors {
new ZStream(self.channel >>> collecting(Chunk.empty))
}

/**
* Threads the stream through the transformation function `f`.
*/
def via[OutEnv, OutErr, OutElem](f: ZStream[Env, Err, Elem] => ZStream[OutEnv, OutErr, OutElem])(implicit
trace: ZTraceElement
): ZStream[OutEnv, OutErr, OutElem] = f(self)
// /**
// * Threads the stream through the transformation function `f`.
// */
// def via[OutEnv, OutErr, OutElem](f: ZStream[Env, Err, Elem] => ZStream[OutEnv, OutErr, OutElem])(implicit
// trace: ZTraceElement
// ): ZStream[OutEnv, OutErr, OutElem] = f(self)

/**
* Threads the stream through a transformation pipeline.
Expand Down
0