8000 Fast path interpreter by jdegoes · Pull Request #6076 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fast path interpreter #6076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 23, 2021
32 changes: 23 additions & 9 deletions benchmarks/src/main/scala/zio/UnsafeRunBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,29 @@ import java.util.concurrent.TimeUnit

/**
* {{{
* [info] Benchmark (depth) Mode Cnt Score Error Units
* [info] UnsafeRunBenchmark.zioLeft 1 thrpt 5 2527835.063 � 20547.193 ops/s
* [info] UnsafeRunBenchmark.zioLeft 2 thrpt 5 2192859.126 � 21557.848 ops/s
* [info] UnsafeRunBenchmark.zioLeft 4 thrpt 5 1861759.677 � 21811.809 ops/s
* [info] UnsafeRunBenchmark.zioLeft 8 thrpt 5 1478009.404 � 13764.737 ops/s
* [info] UnsafeRunBenchmark.zioRight 1 thrpt 5 2434178.854 � 23297.673 ops/s
* [info] UnsafeRunBenchmark.zioRight 2 thrpt 5 2197319.901 � 13466.396 ops/s
* [info] UnsafeRunBenchmark.zioRight 4 thrpt 5 1981274.470 � 15435.194 ops/s
* [info] UnsafeRunBenchmark.zioRight 8 thrpt 5 1561479.601 � 42674.608 ops/s
* [info] UnsafeRunBenchmark.zioLeft 1 thrpt 10 2924072.908 � 207535.037 ops/s
* [info] UnsafeRunBenchmark.zioLeft' 1 thrpt 10 264234984.929 � 27407427.777 ops/s
*
* [info] UnsafeRunBenchmark.zioLeft 2 thrpt 10 2619085.083 � 181905.105 ops/s
* [info] UnsafeRunBenchmark.zioLeft' 2 thrpt 10 109613072.462 � 526748.448 ops/s
*
* [info] UnsafeRunBenchmark.zioLeft 4 thrpt 10 2252902.513 � 71522.781 ops/s
* [info] UnsafeRunBenchmark.zioLeft' 4 thrpt 10 44569924.478 � 1060326.379 ops/s
*
* [info] UnsafeRunBenchmark.zioLeft 8 thrpt 10 1782754.491 � 59228.031 ops/s
* [info] UnsafeRunBenchmark.zioLeft' 8 thrpt 10 15689424.820 � 1487690.562 ops/s
*
* [info] UnsafeRunBenchmark.zioRight 1 thrpt 10 2673482.726 � 24960.009 ops/s
* [info] UnsafeRunBenchmark.zioRight' 1 thrpt 10 270720926.073 � 537049.199 ops/s
*
* [info] UnsafeRunBenchmark.zioRight 2 thrpt 10 2644805.378 � 58712.959 ops/s
* [info] UnsafeRunBenchmark.zioRight' 2 thrpt 10 98828477.442 � 493812.270 ops/s
*
* [info] UnsafeRunBenchmark.zioRight 4 thrpt 10 2434308.062 � 22470.067 ops/s
* [info] UnsafeRunBenchmark.zioRight' 4 thrpt 10 35925377.327 � 1732444.668 ops/s
*
* [info] UnsafeRunBenchmark.zioRight 8 thrpt 10 2035601.943 � 69795.749 ops/s
* [info] UnsafeRunBenchmark.zioRight' 8 thrpt 10 15740140.258 � 672817.959 ops/s
* }}}
*/
@State(Scope.Thread)
Expand Down
63 changes: 63 additions & 0 deletions core-tests/shared/src/test/scala/zio/RuntimeSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package zio

import zio.test._
import zio.test.TestAspect.jvmOnly

object RuntimeSpec extends ZIOBaseSpec {
val r = Runtime.default

def foo =
for {
_ <- ZIO.succeed(42)
_ <- ZIO.fail("Uh oh!")
} yield ()

def bar =
for {
_ <- ZIO.succeed(92)
a <- foo
} yield a

def buz =
for {
_ <- UIO.async[Int](k => k(ZIO.succeed(42)))
a <- bar
} yield a

def traceOf(exit: Exit[Any, Any]): Chunk[String] =
exit.fold[ZTrace](_.trace, _ => ZTrace.none).stackTrace.map(_.toString)

def fastPath[E, A](zio: ZIO[ZEnv, E, A]): Exit[E, A] =
r.unsafeRunSyncFast(zio)

def slowPath[E, A](zio: ZIO[ZEnv, E, A]): Task[Exit[E, A]] =
Task.attemptBlocking(r.defaultUnsafeRunSync(zio))

def spec = suite("RuntimeSpec") {
suite("primitives") {
test("ZIO.succeed") {
assertTrue(fastPath(ZIO.succeed(42)) == Exit.succeed(42))
}
} +
suite("fallback") {
test("merged traces") {

for {
exit <- slowPath(buz)
trace = traceOf(exit)
} yield assertTrue(
trace.exists(_.contains("foo")) && trace.exists(_.contains("bar")) && trace.exists(_.contains("buz"))
)
} @@ jvmOnly
} +
suite("traces") {
test("depth of 2") {
val exit = fastPath(bar)

val trace = traceOf(exit)

assertTrue(trace.exists(_.contains("foo")) && trace.exists(_.contains("bar")))
}
}
}
}
116 changes: 67 additions & 49 deletions core/shared/src/main/scala/zio/Cause.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,50 +363,7 @@ sealed abstract class Cause[+E] extends Product with Serializable { self =>
* Returns a `String` with the cause pretty-printed.
*/
final def prettyPrint: String = {
final case class Unified(fiberId: FiberId, className: String, message: String, trace: Chunk[StackTraceElement])

def renderFiberId(fiberId: FiberId): String = s"zio-fiber-${fiberId.ids.mkString(", ")}"

def unify(cause: Cause[E]): List[Unified] = {
@tailrec
def loop(
causes: List[Cause[E]],
fiberId: FiberId,
stackless: Boolean,
result: List[Unified]
): List[Unified] = {
def unifyFail(fail: Cause.Fail[E]): Unified =
Unified(fail.trace.fiberId, fail.value.getClass.getName(), fail.value.toString(), fail.trace.toJava)

def unifyDie(die: Cause.Die): Unified = {
val extra =
if (stackless) Chunk.empty else Chunk.fromArray(die.value.getStackTrace())

Unified(die.trace.fiberId, die.value.getClass.getName(), die.value.getMessage(), extra ++ die.trace.toJava)
}

def unifyInterrupt(interrupt: Cause.Interrupt): Unified = {
val message = "Interrupted by thread \"" + renderFiberId(fiberId) + "\""

Unified(interrupt.trace.fiberId, classOf[InterruptedException].getName(), message, interrupt.trace.toJava)
}

causes match {
case Nil => result

case Empty :: more => loop(more, fiberId, stackless, result)
case Both(left, right) :: more => loop(left :: right :: more, fiberId, stackless, result)
case Stackless(cause, stackless) :: more => loop(cause :: more, fiberId, stackless, result)
case Then(left, right) :: more => loop(left :: right :: more, fiberId, stackless, result)
case (cause @ Fail(_, _)) :: more => loop(more, fiberId, stackless, unifyFail(cause) :: result)
case (cause @ Die(_, _)) :: more => loop(more, fiberId, stackless, unifyDie(cause) :: result)
case (cause @ Interrupt(_, _)) :: more =>
loop(more, fiberId, stackless, unifyInterrupt(cause) :: result)
}
}

loop(cause :: Nil, FiberId.None, false, Nil).reverse
}
import Cause.Unified

def renderCause(cause: Cause[E]): String = {
def renderUnified(indent: Int, prefix: String, unified: Unified) = {
Expand All @@ -417,9 +374,9 @@ sealed abstract class Cause[+E] extends Product with Serializable { self =>
unified.trace.map(trace => s"${traceIndent}at ${trace}").mkString("\n")
}

unify(cause).zipWithIndex.map {
cause.unified.zipWithIndex.map {
case (unified, 0) =>
renderUnified(0, "Exception in thread \"" + renderFiberId(unified.fiberId) + "\" ", unified)
renderUnified(0, "Exception in thread \"" + unified.fiberId.threadName + "\" ", unified)
case (unified, n) => renderUnified(n, s"Suppressed: ", unified)
}.mkString("\n")
}
Expand Down Expand Up @@ -509,6 +466,12 @@ sealed abstract class Cause[+E] extends Product with Serializable { self =>
(causeOption, stackless) => causeOption.map(Stackless(_, stackless))
)

/**
* Grabs a complete, linearized trace for the cause. Note: This linearization
* may be misleading in the presence of parallel errors.
*/
def trace: ZTrace = traces.fold(ZTrace.none)(_ ++ _)

/**
* Grabs a list of execution traces from the cause.
*/
Expand All @@ -528,6 +491,52 @@ sealed abstract class Cause[+E] extends Product with Serializable { self =>
final def traced(trace: ZTrace): Cause[E] =
mapTrace(_ ++ trace)

/**
* Returns a homogenized list of failures for the cause. This homogenization
* process throws away key information, but it is useful for interop with
* traditional stack traces.
*/
final def unified: List[Unified] = {
@tailrec
def loop(
causes: List[Cause[E]],
fiberId: FiberId,
stackless: Boolean,
result: List[Unified]
): List[Unified] = {
def unifyFail(fail: Cause.Fail[E]): Unified =
Unified(fail.trace.fiberId, fail.value.getClass.getName(), fail.value.toString(), fail.trace.toJava)

def unifyDie(die: Cause.Die): Unified = {
val extra =
if (stackless) Chunk.empty else Chunk.fromArray(die.value.getStackTrace())

Unified(die.trace.fiberId, die.value.getClass.getName(), die.value.getMessage(), extra ++ die.trace.toJava)
}

def unifyInterrupt(interrupt: Cause.Interrupt): Unified = {
val message = "Interrupted by thread \"" + fiberId.threadName + "\""

Unified(interrupt.trace.fiberId, classOf[InterruptedException].getName(), message, interrupt.trace.toJava)
}

causes match {
case Nil => result

case Empty :: more => loop(more, fiberId, stackless, result)
case Both(left, right) :: more => loop(left :: right :: more, fiberId, stackless, result)
case Stackless(cause, stackless) :: more => loop(cause :: more, fiberId, stackless, result)
case Then(left, right) :: more => loop(left :: right :: more, fiberId, stackless, result)
case (cause @ Fail(_, _)) :: more => loop(more, fiberId, stackless, unifyFail(cause) :: result)
case (cause @ Die(_, _)) :: more => loop(more, fiberId, stackless, unifyDie(cause) :: result)
case (cause @ Interrupt(_, _)) :: more =>
loop(more, fiberId, stackless, unifyInterrupt(cause) :: result)
}
}

loop(self :: Nil, FiberId.None, false, Nil).reverse
}

/**
* Returns a `Cause` that has been stripped of all tracing information.
*/
Expand All @@ -549,6 +558,15 @@ object Cause extends Serializable {
def stack[E](cause: Cause[E]): Cause[E] = Stackless(cause, false)
def stackless[E](cause: Cause[E]): Cause[E] = Stackless(cause, true)

final case class Unified(fiberId: FiberId, className: String, message: String, trace: Chunk[StackTraceElement]) {
def toThrowable: Throwable =
new Throwable(null, null, false, false) {
override final def getMessage(): String = message

override final def getStackTrace(): Array[StackTraceElement] = trace.toArray
}
}

/**
* Converts the specified `Cause[Option[E]]` to an `Option[Cause[E]]` by
* recursively stripping out any failures with the error `None`.
Expand Down Expand Up @@ -585,7 +603,7 @@ object Cause extends Serializable {
}
}

final case class Fail[+E](value: E, trace: ZTrace) extends Cause[E] {
final case class Fail[+E](value: E, override val trace: ZTrace) extends Cause[E] {
override def equals(that: Any): Boolean = that match {
case fail: Fail[_] => value == fail.value
case c @ Then(_, _) => sym(empty)(this, c)
Expand All @@ -595,7 +613,7 @@ object Cause extends Serializable {
}
}

final case class Die(value: Throwable, trace: ZTrace) extends Cause[Nothing] {
final case class Die(value: Throwable, override val trace: ZTrace) extends Cause[Nothing] {
override def equals(that: Any): Boolean = that match {
case die: Die => value == die.value
case c @ Then(_, _) => sym(empty)(this, c)
Expand All @@ -605,7 +623,7 @@ object Cause extends Serializable {
}
}

final case class Interrupt(fiberId: FiberId, trace: ZTrace) extends Cause[Nothing] {
final case class Interrupt(fiberId: FiberId, override val trace: ZTrace) extends Cause[Nothing] {
override def equals(that: Any): Boolean =
(this eq that.asInstanceOf[AnyRef]) || (that match {
case interrupt: Interrupt => fiberId == interrupt.fiberId
Expand Down
21 changes: 21 additions & 0 deletions core/shared/src/main/scala/zio/Exit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ sealed abstract class Exit[+E, +A] extends Product with Serializable { self =>
final def bimap[E1, A1](f: E => E1, g: A => A1): Exit[E1, A1] =
mapBoth(f, g)

/**
* Returns an option of the cause of failure.
*/
final def causeOption: Option[Cause[E]] =
self match {
case Failure(cause) => Some(cause)
case _ => None
}

final def exists(p: A => Boolean): Boolean =
fold(_ => false, p)

Expand Down Expand Up @@ -103,6 +112,9 @@ sealed abstract class Exit[+E, +A] extends Product with Serializable { self =>
case e @ Failure(_) => ZIO.succeedNow(e)
}

/**
* Flattens an Exit of an Exit into a single Exit value.
*/
final def flatten[E1 >: E, B](implicit ev: A <:< Exit[E1, B]): Exit[E1, B] =
Exit.flatten(self.map(ev))

Expand Down Expand Up @@ -235,6 +247,15 @@ sealed abstract class Exit[+E, +A] extends Product with Serializable { self =>
case Failure(cause) => Left(FiberFailure(cause))
}

/**
* Converts the `Exit` to a `ZIO` effect.
*/
final def toZIO(implicit trace: ZTraceElement): IO[E, A] =
self match {
case Exit.Failure(cause) => ZIO.failCause(cause)
case Exit.Success(value) => ZIO.succeedNow(value)
}

/**
* Discards the value.
*/
Expand Down
5 changes: 0 additions & 5 deletions core/shared/src/main/scala/zio/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -913,11 +913,6 @@ object Fiber extends FiberPlatformSpecific {
def unsafeCurrentFiber(): Option[Fiber[Any, Any]] =
Option(_currentFiber.get)

private[zio] def newFiberId(): FiberId.Runtime =
FiberId.Runtime((java.lang.System.currentTimeMillis / 1000).toInt, _fiberCounter.getAndIncrement())

private[zio] val _currentFiber: ThreadLocal[internal.FiberContext[_, _]] =
new ThreadLocal[internal.FiberContext[_, _]]()

private[zio] val _fiberCounter = new java.util.concurrent.atomic.AtomicInteger(0)
}
10 changes: 9 additions & 1 deletion core/shared/src/main/scala/zio/FiberFailure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,13 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace
* better integrate with Scala exception handling.
*/
final case class FiberFailure(cause: Cause[Any]) extends Throwable(null, null, true, false) {
override def getMessage: String = cause.prettyPrint
override def getMessage: String = cause.unified.headOption.fold("<unknown>")(_.message)

override def getStackTrace(): Array[StackTraceElement] =
cause.unified.headOption.fold[Chunk[StackTraceElement]](Chunk.empty)(_.trace).toArray

def unsafeInitSuppressed(): Unit =
if (getSuppressed().length == 0) {
cause.unified.iterator.drop(1).foreach(unified => addSuppressed(unified.toThrowable))
}
}
27 changes: 25 additions & 2 deletions core/shared/src/main/scala/zio/FiberId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.annotation.tailrec
sealed trait FiberId extends Serializable { self =>
import FiberId._

def combine(that: FiberId): FiberId =
final def combine(that: FiberId): FiberId =
(self, that) match {
case (None, that) => that
case (that, None) => that
Expand All @@ -37,12 +37,30 @@ sealed trait FiberId extends Serializable { self =>
case (self @ Runtime(_, _), that @ Runtime(_, _)) => Composite(Set(self, that))
}

def ids: Set[Int] =
final def getOrElse(that: => FiberId): FiberId = if (isNone) that else self

final def ids: Set[Int] =
self match {
case None => Set.empty
case Runtime(id, _) => Set(id)
case Composite(fiberIds) => fiberIds.map(_.id)
}

final def isNone: Boolean =
self match {
case None => true
case Composite(set) => set.forall(_.isNone)
case _ => false
}

final def threadName: String = s"zio-fiber-${self.ids.mkString(",")}"

final def toOption: Option[FiberId] =
self match {
case None => Option.empty[FiberId]
case Composite(set) => set.map(_.toOption).collect { case Some(fiberId) => fiberId }.reduceOption(_.combine(_))
case other => Some(other)
}
}

object FiberId {
Expand All @@ -53,6 +71,11 @@ object FiberId {
def combineAll(fiberIds: Set[FiberId]): FiberId =
fiberIds.foldLeft[FiberId](FiberId.None)(_ combine _)

private[zio] def unsafeMake(): FiberId.Runtime =
FiberId.Runtime((java.lang.System.currentTimeMillis / 1000).toInt, _fiberCounter.getAndIncrement())

private[zio] val _fiberCounter = new java.util.concurrent.atomic.AtomicInteger(0)

case object None extends FiberId
final case class Runtime(id: Int, startTimeSeconds: Int) extends FiberId
final case class Composite(fiberIds: Set[FiberId.Runtime]) extends FiberId
Expand Down
Loading
0