8000 Stream and Pipe benchmarks by johnhungerford · Pull Request #1300 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Stream and Pipe benchmarks #1300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions kyo-bench/src/main/scala/kyo/bench/arena/StreamPipeBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package kyo.bench.arena

import WarmupJITProfile.*
import kyo.AllowUnsafe
import kyo.bench.BaseBench
import org.openjdk.jmh.annotations.*

class StreamPipeBench extends BaseBench:
Copy link
Collaborator
10000
@fwbrasil fwbrasil Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the arena package is for comparative benchmarks, this new one should be under the bench package directly

val seq = (0 until 10000).toVector

import AllowUnsafe.embrace.danger

@Benchmark
def mapPureStreamBench() =
import kyo.*
Stream.init(seq)
.mapPure(_ + 1)
.fold(0)(_ + _)
.eval
end mapPureStreamBench

@Benchmark
def mapPureStreamAbstractBench() =
import kyo.*
Stream.init(seq)
.mapAbstractPure(_ + 1)
.fold(0)(_ + _)
.eval
end mapPureStreamAbstractBench

@Benchmark
def mapPurePipeBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.mapPure((_: Int) + 1))
.fold(0)(_ + _)
.eval
end mapPurePipeBench

@Benchmark
def mapPurePipeAbstractBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.mapAbstractPure((_: Int) + 1))
.fold(0)(_ + _)
.eval
end mapPurePipeAbstractBench

@Benchmark
def mapKyoStreamBench() =
import kyo.*
Stream.init(seq)
.map(v => Sync(v + 1))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end mapKyoStreamBench

@Benchmark
def mapKyoStreamAbstractBench() =
import kyo.*
Stream.init(seq)
.mapAbstract(v => Sync(v + 1))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end mapKyoStreamAbstractBench

@Benchmark
def mapKyoPipeBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.map((v: Int) => Sync(v + 1)))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end mapKyoPipeBench

@Benchmark
def mapKyoPipeAbstractBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.mapAbstract((v: Int) => Sync(v + 1)))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end mapKyoPipeAbstractBench

@Benchmark
def filterPureStreamBench() =
import kyo.*
Stream.init(seq)
.filterPure(_ % 2 == 0)
.fold(0)(_ + _)
.eval
end filterPureStreamBench

@Benchmark
def filterPureStreamAbstractBench() =
import kyo.*
Stream.init(seq)
.filterAbstractPure(_ % 2 == 0)
.fold(0)(_ + _)
.eval
end filterPureStreamAbstractBench

@Benchmark
def filterPurePipeBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filterPure((_: Int) % 2 == 0))
.fold(0)(_ + _)
.eval
end filterPurePipeBench

@Benchmark
def filterPurePipeAbstractBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filterAbstractPure((_: Int) % 2 == 0))
.fold(0)(_ + _)
.eval
end filterPurePipeAbstractBench

@Benchmark
def filterKyoStreamBench() =
import kyo.*
Stream.init(seq)
.filter(v => Sync(v % 2 == 0))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end filterKyoStreamBench

@Benchmark
def filterKyoStreamAbstractBench() =
import kyo.*
Stream.init(seq)
.filterAbstract(v => Sync(v % 2 == 0))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end filterKyoStreamAbstractBench

@Benchmark
def filterKyoPipeBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filter((v: Int) => Sync(v % 2 == 0)))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end filterKyoPipeBench

@Benchmark
def filterKyoPipeAbstractBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filterAbstract((v: Int) => Sync(v % 2 == 0)))
.fold(0)(_ + _)
.handle(Sync.Unsafe.evalOrThrow)
end filterKyoPipeAbstractBench

@Benchmark
def filterMapVarStreamBench() =
import kyo.*
Stream.init(seq)
.filter(v => Var.get[Int].map(i => ((i + v) % 2 > -1)))
.map(v => Var.update[Int](_ + 1).map(i => v + i))
.fold(0)(_ + _)
.handle(Var.run(0), Sync.Unsafe.evalOrThrow)
end filterMapVarStreamBench

@Benchmark
def filterMapVarStreamAbstractBench() =
import kyo.*
Stream.init(seq)
.filterAbstract(v => Var.get[Int].map(i => ((i + v) % 2 > -1)))
.mapAbstract(v => Var.update[Int](_ + 1).map(i => v + i))
.fold(0)(_ + _)
.handle(Var.run(0), Sync.Unsafe.evalOrThrow)
end filterMapVarStreamAbstractBench

@Benchmark
def filterMapVarPipeBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filter((v: Int) => Var.get[Int].map(i => ((i + v) % 2 > -1))))
.into(Pipe.map((v: Int) => Var.update[Int](_ + 1).map(i => v + i)))
.fold(0)(_ + _)
.handle(Var.run(0), Sync.Unsafe.evalOrThrow)
end filterMapVarPipeBench

@Benchmark
def filterMapVarPipeAbstractBench() =
import kyo.*
Stream.init(seq)
.into(Pipe.filterAbstract((v: Int) => Var.get[Int].map(i => ((i + v) % 2 > -1))))
.into(Pipe.mapAbstract((v: Int) => Var.update[Int](_ + 1).map(i => v + i)))
.fold(0)(_ + _)
.handle(Var.run(0), Sync.Unsafe.evalOrThrow)
end filterMapVarPipeAbstractBench
end StreamPipeBench
43 changes: 43 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Pipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,34 @@ object Pipe:
Kyo.foreach(c)(f).map: c1 =>
Emit.valueWith(c1)(Loop.continue)

def mapAbstract[A](using
Tag[A]
)[B, S](f: A => B < S)(
using
Tag[Poll[Chunk[A]]],
Tag[Emit[Chunk[B]]],
Frame
): Pipe[A, B, S] =
Pipe:
Loop.foreach:
Poll.andMap[Chunk[A]]:
case Absent => Loop.done
case Present(c) => StreamTransformations.handleMap(c, Loop.continue)(f)

def mapAbstractPure[A](using
Tag[A]
)[B](f: A => B)(
using
Tag[Poll[Chunk[A]]],
Tag[Emit[Chunk[B]]],
Frame
): Pipe[A, B, Any] =
Pipe:
Loop.foreach:
Poll.andMap[Chunk[A]]:
case Absent => Loop.done
case Present(c) => StreamTransformations.handleMapPure(c, Loop.continue)(f)

/** A pipe that transforms each chunk of a stream with a pure function.
*
* @see
Expand Down Expand Up @@ -503,6 +531,13 @@ object Pipe:
if c1.isEmpty then Loop.continue
else Emit.valueWith(c1)(Loop.continue)

def filterAbstractPure[A](using Tag[Poll[Chunk[A]]], Tag[Emit[Chunk[A]]])(f: A => Boolean)(using Frame): Pipe[A, A, Any] =
Pipe:
Loop.foreach:
Poll.andMap[Chunk[A]]:
case Absent => Loop.done
case Present(c) => StreamTransformations.handleFilterPure(c, Loop.continue)(f)

/** A pipe whose output skips input elements that do not satisfy the provided effectful predicate.
*
* @see
Expand All @@ -521,6 +556,14 @@ object Pipe:
if c1.isEmpty then Loop.continue
else Emit.valueWith(c1)(Loop.continue)

def filterAbstract[A](using Tag[Poll[Chunk[A]]], Tag[Emit[Chunk[A]]])[S](f: A => Boolean < S)(using Frame): Pipe[A, A, S] =
Pipe:
Loop.foreach:
Poll.andMap[Chunk[A]]:
case Absent => Loop.done
case Present(c) =>
StreamTransformations.handleFilter(c, Loop.continue)(f)

/** A pipe that filters and transforms an input stream using a pure function.
*
* @see
Expand Down
52 changes: 52 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,40 @@ sealed abstract class Stream[+V, -S] extends Serializable:
def map[VV >: V, V2, S2](f: VV => V2 < S2)(using Tag[Emit[Chunk[VV]]], Tag[Emit[Chunk[V2]]], Frame): Stream[V2, S & S2] =
mapChunk[VV, V2, S2](c => Kyo.foreach(c)(f))

/** Transforms each value in the stream using the given effectful function.
*
* @param f
* The function to apply to each value
* @return
* A new stream with transformed values
*/
def mapAbstract[VV >: V, V2, S2](f: VV => V2 < S2)(using
t1: Tag[Emit[Chunk[VV]]],
t2: Tag[Emit[Chunk[V2]]],
fr: Frame
): Stream[V2, S & S2] =
Stream(
ArrowEffect.handleLoop(t1, emit)(
[C] =>
(input, cont) =>
StreamTransformations.handleMap(input, Loop.continue(cont(())))(f)
)
)
end mapAbstract

def mapAbstractPure[VV >: V, V2](f: VV => V2)(using
t1: Tag[Emit[Chunk[VV]]],
t2: Tag[Emit[Chunk[V2]]],
fr: Frame
): Stream[V2, S] =
Stream(
ArrowEffect.handleLoop(t1, emit)(
[C] =>
(input, cont) =>
StreamTransformations.handleMapPure(input, Loop.continue(cont(())))(f)
)
)

/** Transforms each chunk in the stream using the given pure function.
*
* @param f
Expand Down Expand Up @@ -386,6 +420,24 @@ sealed abstract class Stream[+V, -S] extends Serializable:
)
)

def filterAbstract[VV >: V, S2](f: VV => Boolean < S2)(using tag: Tag[Emit[Chunk[VV]]], frame: Frame): Stream[VV, S & S2] =
Stream(
ArrowEffect.handleLoop(tag, emit)(
[C] =>
(input, cont) =>
StreamTransformations.handleFilter(input, Loop.continue(cont(())))(f)
)
)

def filterAbstractPure[VV >: V](f: VV => Boolean)(using tag: Tag[Emit[Chunk[VV]]], frame: Frame): Stream[VV, S] =
Stream(
ArrowEffect.handleLoop(tag, emit)(
[C] =>
(input, cont) =>
StreamTransformations.handleFilterPure(input, Loop.continue(cont(())))(f)
)
)

def filterPure[VV >: V](f: VV => Boolean)(using tag: Tag[Emit[Chunk[VV]]], frame: Frame): Stream[VV, S] =
Stream(
ArrowEffect.handleLoop(tag, emit)(
Expand Down
39 changes: 39 additions & 0 deletions kyo-prelude/shared/src/main/scala/kyo/StreamTransformations.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kyo

private[kyo] object StreamTransformations:
inline def handleMap[V1, B, V2, S, S2](chunk: Chunk[V1], cont: => B < S)(f: V1 => V2 < S2)(using
Tag[Emit[Chunk[V2]]],
Frame
): B < (S & S2 & Emit[Chunk[V2]]) =
Kyo.foreach(chunk)(f).map: newChunk =>
if newChunk.isEmpty then cont
else Emit.valueWith(newChunk)(cont)

inline def handleMapPure[V1, B, V2, S](chunk: Chunk[V1], cont: => B < S)(f: V1 => V2)(using
Tag[Emit[Chunk[V2]]],
Frame
): B < (S & Emit[Chunk[V2]]) =
val newChunk = chunk.map(f)
if newChunk.isEmpty then cont
else Emit.valueWith(newChunk)(cont)
end handleMapPure

inline def handleFilter[V1, B, S, S2](chunk: Chunk[V1], cont: => B < S)(f: V1 => Boolean < S2)(
using
Tag[Emit[Chunk[V1]]],
Frame
): B < (S & S2 & Emit[Chunk[V1]]) =
Kyo.filter(chunk)(f).map: newChunk =>
if newChunk.isEmpty then cont
else Emit.valueWith(newChunk)(cont)

inline def handleFilterPure[V1, B, S](chunk: Chunk[V1], cont: => B < S)(f: V1 => Boolean)(
using
Tag[Emit[Chunk[V1]]],
Frame
): B < (S & Emit[Chunk[V1]]) =
val newChunk = chunk.filter(f)
if newChunk.isEmpty then cont
else Emit.valueWith(newChunk)(cont)
end handleFilterPure
end StreamTransformations
Loading
0