diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index db9963ed66f8..80cd34c4e6e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,25 +18,27 @@ on: jobs: lint: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} steps: - name: Checkout current branch uses: actions/checkout@v4.2.2 - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: 17 check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Cache scala dependencies uses: coursier/cache-action@v6 - name: Lint code run: sbt "++2.13; check; ++3.3; check" publishLocal: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} env: @@ -51,11 +53,13 @@ jobs: with: fetch-depth: 0 - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: 11 check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Cache scala dependencies uses: coursier/cache-action@v6 - name: publishLocal 2.12 @@ -73,20 +77,23 @@ jobs: run: sbt --client shutdown build-website: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} steps: - name: Checkout Current Branch uses: actions/checkout@v4.2.2 with: fetch-depth: 0 - - name: Setup Scala and Java - uses: actions/setup-java@v4.7.0 + - name: Setup Java + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: 17 check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 + - name: Install NodeJS uses: actions/setup-node@v4 with: @@ -115,7 +122,7 @@ jobs: run: find ./website/build -print test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} strategy: @@ -128,11 +135,13 @@ jobs: - name: Checkout current branch uses: actions/checkout@v4.2.2 - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: ${{ matrix.java }} check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Cache scala dependencies uses: coursier/cache-action@v6 - name: Run tests @@ -174,7 +183,7 @@ jobs: overwrite: true testJvms: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} strategy: @@ -186,11 +195,13 @@ jobs: - name: Checkout current branch uses: actions/checkout@v4.2.2 - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: ${{ matrix.java }} check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Cache scala dependencies uses: coursier/cache-action@v6 - name: Test on different JVM versions @@ -205,7 +216,7 @@ jobs: overwrite: true testPlatforms: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 if: ${{ ((github.event_name != 'repository_dispatch') && (github.event.action != 'update-docs')) }} strategy: @@ -220,11 +231,13 @@ jobs: if: ${{ startsWith(matrix.platform, 'Native') }} run: sudo apt-get update && sudo apt-get install -y libgc-dev - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: ${{ matrix.java }} check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Cache scala dependencies uses: coursier/cache-action@v6 - name: Set Swap Space @@ -236,7 +249,7 @@ jobs: ci: if: always() - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest needs: [lint, publishLocal, build-website, test, testJvms, testPlatforms] steps: - uses: re-actors/alls-green@release/v1 @@ -244,7 +257,7 @@ jobs: jobs: ${{ toJSON(needs) }} publish: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest timeout-minutes: 60 needs: [ci] if: github.event_name != 'pull_request' @@ -254,11 +267,13 @@ jobs: with: fetch-depth: 0 - name: Setup Java - uses: actions/setup-java@v4.7.0 + uses: actions/setup-java@v4.7.1 with: distribution: temurin java-version: 11 check-latest: true + - name: Setup sbt + uses: sbt/setup-sbt@v1 - name: Release run: sbt -v ci-release env: @@ -271,7 +286,7 @@ jobs: publish-website: needs: [ci] if: github.event_name != 'pull_request' - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v4.2.2 @@ -309,7 +324,7 @@ jobs: git commit -m "$commit_message" || echo "No changes to commit" - name: Generate a token id: generate-tokens - uses: actions/create-github-app-token@v1.12.0 + uses: actions/create-github-app-token@v2.0.6 with: app-id: ${{ secrets.APP_ID }} private-key: ${{ secrets.APP_SECRET }} diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml index f40ad21f05e9..34e045ac2cb5 100644 --- a/.github/workflows/release-drafter.yml +++ b/.github/workflows/release-drafter.yml @@ -7,7 +7,7 @@ on: jobs: update_release_draft: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - uses: release-drafter/release-drafter@v6 env: diff --git a/.jvmopts b/.jvmopts index 89c8775d9d2d..10beed360be7 100644 --- a/.jvmopts +++ b/.jvmopts @@ -1 +1,5 @@ -Dcats.effect.stackTracingMode=full +-Dfile.encoding=UTF8 +-Xms256M +-Xmx6G +-XX:+UseG1GC diff --git a/.vscode/settings.json b/.vscode/settings.json index b4bdcc7fa826..052d27309198 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -7,5 +7,5 @@ ], "files.watcherExclude": { "**/target": true - } +} } \ No newline at end of file diff --git a/benchmarks/src/main/scala/zio/BenchmarkUtil.scala b/benchmarks/src/main/scala/zio/BenchmarkUtil.scala index fb5d72171165..45b224363268 100644 --- a/benchmarks/src/main/scala/zio/BenchmarkUtil.scala +++ b/benchmarks/src/main/scala/zio/BenchmarkUtil.scala @@ -39,9 +39,12 @@ object BenchmarkUtil extends Runtime[Any] { self => Unsafe.unsafe(implicit unsafe => rt.unsafe.run(zio).getOrThrowFiberFailure()) } + override val unsafe = super.unsafe + private object NoFiberRootsRuntime extends Runtime[Any] { - val environment = Runtime.default.environment - val fiberRefs = Runtime.default.fiberRefs - val runtimeFlags = RuntimeFlags(RuntimeFlag.CooperativeYielding, RuntimeFlag.Interruption) + override val unsafe = super.unsafe + val environment = Runtime.default.environment + val fiberRefs = Runtime.default.fiberRefs + val runtimeFlags = RuntimeFlags(RuntimeFlag.CooperativeYielding, RuntimeFlag.Interruption) } } diff --git a/benchmarks/src/main/scala/zio/PromiseBenchmarks.scala b/benchmarks/src/main/scala/zio/PromiseBenchmarks.scala index 16ab08e815c3..41a0ae690d58 100644 --- a/benchmarks/src/main/scala/zio/PromiseBenchmarks.scala +++ b/benchmarks/src/main/scala/zio/PromiseBenchmarks.scala @@ -1,8 +1,11 @@ package zio import cats.effect.kernel.Deferred +import cats.syntax.traverse._ +import cats.instances.list._ import cats.effect.unsafe.implicits.global import cats.effect.{IO => CIO} +import cats.syntax.foldable._ import org.openjdk.jmh.annotations.{Scope => JScope, _} import zio.BenchmarkUtil._ @@ -11,35 +14,109 @@ import java.util.concurrent.TimeUnit @State(JScope.Thread) @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) -@Measurement(iterations = 5, timeUnit = TimeUnit.SECONDS, time = 3) -@Warmup(iterations = 5, timeUnit = TimeUnit.SECONDS, time = 3) +@Measurement(iterations = 5, timeUnit = TimeUnit.SECONDS, time = 10) +@Warmup(iterations = 5, timeUnit = TimeUnit.SECONDS, time = 10) @Fork(value = 3) class PromiseBenchmarks { - val size = 100000 + val n = 100000 + val waiters: Int = 8 - val ints: List[Int] = List.range(0, size) + def createWaitersZIO(promise: Promise[Nothing, Unit]): ZIO[Any, Nothing, Seq[Fiber[Nothing, Unit]]] = + ZIO.foreach(Vector.range(0, waiters))(_ => promise.await.forkDaemon) + + def createWaitersCats(promise: Deferred[CIO, Unit]) = + List.range(0, waiters).traverse(_ => promise.get.start) @Benchmark - def zioPromiseAwaitDone(): Unit = { + def zioPromiseDoneAwait(): Unit = { - val io = ZIO.foreachDiscard(ints) { _ => - Promise.make[Nothing, Unit].flatMap { promise => - promise.succeed(()) *> promise.await - } - } + val io = + Promise + .make[Nothing, Unit] + .flatMap { promise => + promise.done(Exit.unit) *> promise.await + } + .repeatN(n) unsafeRun(io) } @Benchmark - def catsPromiseAwaitDone(): Unit = { + def catsPromiseDoneAwait(): Unit = { - val io = catsForeachDiscard(List.range(1, size)) { _ => + val io = Deferred[CIO, Unit].flatMap { promise => promise.complete(()).flatMap(_ => promise.get) + }.replicateA_(n) + + io.unsafeRunSync() + } + + @Benchmark + def zioPromiseMultiAwaitDone(): Unit = { + val io = Promise + .make[Nothing, Unit] + .flatMap { promise => + for { + fibers <- createWaitersZIO(promise) + _ <- promise.done(Exit.unit) + _ <- ZIO.foreachDiscard(fibers)(_.await) + } yield () } - } + .repeatN(1023) + + unsafeRun(io) + } + + @Benchmark + def catsPromiseMultiAwaitDone(): Unit = { + val io = + Deferred[CIO, Unit].flatMap { promise => + for { + fibers <- createWaitersCats(promise) + _ <- promise.complete(()) + _ <- fibers.traverse_(_.join) + } yield () + }.replicateA_(1023) + + io.unsafeRunSync() + } + + @Benchmark + def zioPromiseMultiAwaitMultiDone(): Unit = { + def createCompleters(promise: Promise[Nothing, Unit], latch: Promise[Nothing, Unit]) = + ZIO.foreach(Vector.range(0, waiters))(_ => (latch.await *> promise.done(Exit.unit)).forkDaemon) + + val io = { + for { + latch <- Promise.make[Nothing, Unit] + promise <- Promise.make[Nothing, Unit] + waiters <- createWaitersZIO(promise) + fibers <- createCompleters(promise, latch) + _ <- latch.done(Exit.unit) + result <- promise.await + } yield result + }.repeatN(1023) + + unsafeRun(io) + } + + @Benchmark + def catsPromiseMultiAwaitMultiDone(): Unit = { + def createCompleters(promise: Deferred[CIO, Unit], latch: Deferred[CIO, Unit]) = + List.range(0, waiters).traverse(_ => (latch.get *> promise.complete(())).start) + + val io = { + for { + latch <- Deferred[CIO, Unit] + promise <- Deferred[CIO, Unit] + waiters <- createWaitersCats(promise) + fibers <- createCompleters(promise, latch) + _ <- latch.complete(()) + result <- promise.get + } yield result + }.replicateA_(1023) io.unsafeRunSync() } diff --git a/build.sbt b/build.sbt index 79845a46ba70..f04890ac92fa 100644 --- a/build.sbt +++ b/build.sbt @@ -214,7 +214,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) .settings(stdSettings("zio")) .settings(crossProjectSettings) .settings(buildInfoSettings("zio")) - .settings(libraryDependencies += "dev.zio" %%% "izumi-reflect" % IzumiReflectVersion) + .settings( + libraryDependencies ++= List( + "dev.zio" %%% "izumi-reflect" % IzumiReflectVersion, + "org.scala-lang.modules" %%% "scala-collection-compat" % ScalaCollectionCompatVersion + ) + ) .enablePlugins(BuildInfoPlugin) .settings(macroDefinitionSettings) .settings(scalacOptions += "-Wconf:msg=[zio.stacktracer.TracingImplicits.disableAutoTrace]:silent") @@ -538,8 +543,8 @@ lazy val commonJunitTestSettings = Seq( "org.apache.maven" % "maven-compat" % "3.9.9" % Test, "com.google.inject" % "guice" % "6.0.0" % Test, "org.eclipse.sisu" % "org.eclipse.sisu.inject" % "0.3.5" % Test, - "org.apache.maven.resolver" % "maven-resolver-connector-basic" % "1.9.22" % Test, - "org.apache.maven.resolver" % "maven-resolver-transport-http" % "1.9.22" % Test, + "org.apache.maven.resolver" % "maven-resolver-connector-basic" % "1.9.23" % Test, + "org.apache.maven.resolver" % "maven-resolver-transport-http" % "1.9.23" % Test, "org.codehaus.plexus" % "plexus-component-annotations" % "2.2.0" % Test, "org.slf4j" % "slf4j-simple" % "2.0.17" % Test ) @@ -568,12 +573,7 @@ lazy val testJunitRunnerTests = project.module lazy val testJunitEngine = project.module .in(file("test-junit-engine")) .settings(stdSettings("zio-test-junit-engine")) - .settings( - libraryDependencies ++= Seq( - "org.junit.platform" % "junit-platform-engine" % JunitPlatformEngineVersion, - "org.scala-lang.modules" %% "scala-collection-compat" % ScalaCollectionCompatVersion - ) - ) + .settings(libraryDependencies += "org.junit.platform" % "junit-platform-engine" % JunitPlatformEngineVersion) .dependsOn(tests.jvm) lazy val testJunitEngineTests = project.module @@ -652,14 +652,14 @@ lazy val benchmarks = project.module "com.twitter" %% "util-core" % "24.2.0", "com.typesafe.akka" %% "akka-stream" % "2.8.8", "io.github.timwspence" %% "cats-stm" % "0.13.4", - "io.projectreactor" % "reactor-core" % "3.7.4", + "io.projectreactor" % "reactor-core" % "3.7.6", "io.reactivex.rxjava2" % "rxjava" % "2.2.21", "org.jctools" % "jctools-core" % "4.0.5", "org.typelevel" %% "cats-effect" % CatsEffectVersion, "org.scalacheck" %% "scalacheck" % ScalaCheckVersion, - "qa.hedgehog" %% "hedgehog-core" % "0.11.0", + "qa.hedgehog" %% "hedgehog-core" % "0.12.0", "com.github.japgolly.nyaya" %% "nyaya-gen" % nyanaVersion, - "org.springframework" % "spring-core" % "6.2.5" + "org.springframework" % "spring-core" % "6.2.6" ) }, excludeDependencies ++= { diff --git a/concurrent/src/main/scala/zio/concurrent/ConcurrentMap.scala b/concurrent/src/main/scala/zio/concurrent/ConcurrentMap.scala index 7ceb2bfff797..ea8f79182230 100644 --- a/concurrent/src/main/scala/zio/concurrent/ConcurrentMap.scala +++ b/concurrent/src/main/scala/zio/concurrent/ConcurrentMap.scala @@ -3,10 +3,8 @@ package zio.concurrent import zio.{Chunk, ChunkBuilder, UIO, ZIO} import java.util.concurrent.ConcurrentHashMap - import java.util.function.BiConsumer -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** * Wrapper over `java.util.concurrent.ConcurrentHashMap`. @@ -111,7 +109,7 @@ final class ConcurrentMap[K, V] private (private val underlying: ConcurrentHashM * Adds all new key-value pairs */ def putAll(keyValues: (K, V)*): UIO[Unit] = - ZIO.succeed(underlying.putAll(keyValues.toMap.asJava): @nowarn("msg=JavaConverters")) + ZIO.succeed(underlying.putAll(keyValues.toMap.asJava)) /** * Adds a new key-value pair, unless the key is already bound to some other diff --git a/concurrent/src/main/scala/zio/concurrent/ConcurrentSet.scala b/concurrent/src/main/scala/zio/concurrent/ConcurrentSet.scala index 72814ee61f6b..b70b712e08e8 100644 --- a/concurrent/src/main/scala/zio/concurrent/ConcurrentSet.scala +++ b/concurrent/src/main/scala/zio/concurrent/ConcurrentSet.scala @@ -4,8 +4,7 @@ import zio.{UIO, ZIO} import java.util.concurrent.ConcurrentHashMap import java.util.function.{Consumer, Predicate} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ /** * A `ConcurrentSet` is a Set wrapper over @@ -24,7 +23,7 @@ final class ConcurrentSet[A] private (private val underlying: ConcurrentHashMap. * Adds all new values. */ def addAll(xs: Iterable[A]): UIO[Boolean] = - ZIO.succeed(underlying.addAll(xs.asJavaCollection): @nowarn("msg=JavaConverters")) + ZIO.succeed(underlying.addAll(xs.asJavaCollection)) /** * Removes all elements. @@ -139,7 +138,7 @@ final class ConcurrentSet[A] private (private val underlying: ConcurrentHashMap. * existing element. */ def removeAll(xs: Iterable[A]): UIO[Boolean] = - ZIO.succeed(underlying.removeAll(xs.asJavaCollection): @nowarn("msg=JavaConverters")) + ZIO.succeed(underlying.removeAll(xs.asJavaCollection)) /** * Removes all elements which satisfy the given predicate. @@ -152,7 +151,7 @@ final class ConcurrentSet[A] private (private val underlying: ConcurrentHashMap. * existing element. */ def retainAll(xs: Iterable[A]): UIO[Boolean] = - ZIO.succeed(underlying.retainAll(xs.asJavaCollection): @nowarn("msg=JavaConverters")) + ZIO.succeed(underlying.retainAll(xs.asJavaCollection)) /** * Removes all elements which do not satisfy the given predicate. @@ -170,12 +169,11 @@ final class ConcurrentSet[A] private (private val underlying: ConcurrentHashMap. * Convert the ConcurrentSet to Set. */ def toSet: UIO[Set[A]] = - ZIO.succeed(underlying.asScala.toSet: @nowarn("msg=JavaConverters")) + ZIO.succeed(underlying.asScala.toSet) /** * Transform all elements of the ConcurrentSet using the given function. */ - @nowarn("msg=JavaConverters") def transform(f: A => A): UIO[Unit] = ZIO.succeed { val set = underlying.asScala.toSet diff --git a/core-tests/jvm/src/test/scala/zio/interop/JavaSpec.scala b/core-tests/jvm/src/test/scala/zio/interop/JavaSpec.scala index 34d5ee87f9f1..0557e9216cd3 100644 --- a/core-tests/jvm/src/test/scala/zio/interop/JavaSpec.scala +++ b/core-tests/jvm/src/test/scala/zio/interop/JavaSpec.scala @@ -9,6 +9,7 @@ import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel} import java.util.concurrent.{CompletableFuture, CompletionStage, Future} +import java.util.function.BiFunction object JavaSpec extends ZIOBaseSpec { @@ -99,7 +100,31 @@ object JavaSpec extends ZIOBaseSpec { _ <- task _ <- task } yield assert(n)(equalTo(2)) - } + }, + test("interrupt completion stage only once") { + ZIO.suspendSucceedUnsafe { implicit u => + @volatile var cancellations = 0 + val promise = Promise.unsafe.make[Nothing, Unit](FiberId.None) + + ZIO + .fromCompletionStage(new CompletableFuture[Unit]() { + override def handle[U](fn: BiFunction[_ >: Unit, Throwable, _ <: U]): CompletableFuture[U] = { + promise.unsafe.succeedUnit + super.handle(fn) + } + override def cancel(mayInterruptIfRunning: Boolean): Boolean = { + cancellations += 1 + true + } + }) + .fork + .zipLeft(promise.await) + .flatMap(_.interrupt) + .as { + assertTrue(cancellations == 1) + } + } + } @@ TestAspect.nonFlaky ) @@ zioTag(future), suite("`ZIO.toCompletableFuture` must")( test("produce always a successful `IO` of `Future`") { diff --git a/core-tests/shared/src/test/scala-3/zio/ProvideSpec.scala b/core-tests/shared/src/test/scala-3/zio/ProvideSpec.scala new file mode 100644 index 000000000000..53d9fe55299e --- /dev/null +++ b/core-tests/shared/src/test/scala-3/zio/ProvideSpec.scala @@ -0,0 +1,36 @@ +package zio + +import zio.test._ + +object ProvideSpec extends ZIOBaseSpec { + + def spec = suite("ProvideSpec")( + suite("provideSomeAuto")( + test("Should infer the environment type") { + class A(scope: Scope, string: String, int: Int) + object A { + val layer = ZLayer.derive[A] + } + + class B(a: A) + object B { + val layer = ZLayer.derive[B] + } + + val program = (for { + _ <- ZIO.service[B] + } yield ()) + .provideSomeAuto( + A.layer, + B.layer + ) + + // Verify correct type + val p: ZIO[zio.Scope & String & Int, Any, Unit] = program + + assertCompletes + } + ) + ) + +} diff --git a/core-tests/shared/src/test/scala/zio/FiberRuntimeSpec.scala b/core-tests/shared/src/test/scala/zio/FiberRuntimeSpec.scala index be5d6972efcc..ebc2ffc1e235 100644 --- a/core-tests/shared/src/test/scala/zio/FiberRuntimeSpec.scala +++ b/core-tests/shared/src/test/scala/zio/FiberRuntimeSpec.scala @@ -1,6 +1,9 @@ package zio +import zio.Random.RandomLive import zio.internal.FiberScope +import zio.metrics.Metric +import zio.test.TestAspect.{nonFlaky, timeout} import zio.test._ import java.util.concurrent.atomic.AtomicInteger @@ -62,6 +65,76 @@ object FiberRuntimeSpec extends ZIOBaseSpec { } } } + ), + suite("async")( + test("async callback after interruption is ignored") { + ZIO.suspendSucceed { + val executed = Ref.unsafe.make(0) + val cb = Ref.unsafe.make[Option[ZIO[Any, Nothing, Unit] => Unit]](None) + val latch = Promise.unsafe.make[Nothing, Unit](FiberId.None) + val async = ZIO.async[Any, Nothing, Unit] { k => + cb.unsafe.set(Some(k)) + latch.unsafe.done(Exit.unit) + } + val increment = executed.update(_ + 1) + for { + fiber <- async.fork + _ <- latch.await + exit <- fiber.interrupt + callback <- cb.get.some + state1 <- fiber.poll + _ <- ZIO.succeed(callback(increment)) + state2 <- fiber.poll + executedBefore <- executed.get + _ <- ZIO.succeed(callback(increment)) + state3 <- fiber.poll + executedAfter <- executed.get + } yield assertTrue( + state1 == Some(exit), + state2 == Some(exit), + state3 == Some(exit), + executedBefore == 0, + executedAfter == 0, + exit.isInterrupted + ) + } + } @@ TestAspect.nonFlaky(10) + ), + suite("runtime metrics")( + test("Failures are counted once for the fiber that caused them and exits are not") { + val nullErrors = ZIO.foreachParDiscard(1 to 2)(_ => ZIO.attempt(throw new NullPointerException)) + + val customErrors = + ZIO.foreachParDiscard(1 to 5)(_ => ZIO.fail("Custom application error")) + + val exitErrors = + ZIO.foreachParDiscard(1 to 5)(_ => Exit.fail(new IllegalArgumentException("Foo"))) + + (nullErrors <&> exitErrors <&> customErrors).uninterruptible + .foldCauseZIO( + _ => + Metric.runtime.fiberFailureCauses.value + .map(_.occurrences) + // NOTE: Fibers in foreachParDiscard register metrics at the very end of the fiber's life which might be after we check them + // so we might need to retry until they are registered + .repeatUntil { oc => + oc.size >= 2 && + oc.getOrElse("java.lang.String", 0L) >= 5L && + oc.getOrElse("java.lang.NullPointerException", 0L) >= 2L + } + .map { oc => + assertTrue( + oc.size == 2, + oc.get("java.lang.String").contains(5), + oc.get("java.lang.NullPointerException").contains(2) + ) + }, + _ => ZIO.succeed(assertNever("Effect did not fail")) + ) + .provide(Runtime.enableRuntimeMetrics) @@ + // Need to tag them to extract metrics from this specific effect + ZIOAspect.tagged("FiberRuntimeSpec" -> RandomLive.unsafe.nextString(20)) + } @@ nonFlaky(1000) @@ timeout(10.seconds) ) ) diff --git a/core-tests/shared/src/test/scala/zio/PromiseSpec.scala b/core-tests/shared/src/test/scala/zio/PromiseSpec.scala index 04cf82c1f682..747a9024e6e6 100644 --- a/core-tests/shared/src/test/scala/zio/PromiseSpec.scala +++ b/core-tests/shared/src/test/scala/zio/PromiseSpec.scala @@ -7,6 +7,11 @@ object PromiseSpec extends ZIOBaseSpec { import ZIOTag._ + private def empty[E, A]: Promise.internal.Pending[E, A] = + Promise.internal.State.empty[E, A].asInstanceOf[Promise.internal.Pending[E, A]] + + val n = 10000 + def spec: Spec[Any, TestFailure[Any]] = suite("PromiseSpec")( test("complete a promise using succeed") { for { @@ -120,6 +125,72 @@ object PromiseSpec extends ZIOBaseSpec { _ <- p.fail("failure") d <- p.isDone } yield assert(d)(isTrue) - } @@ zioTag(errors) + } @@ zioTag(errors), + test("waiter stack safety") { + for { + p <- Promise.make[Nothing, Unit] + fibers <- ZIO.foreach(1 to n)(_ => p.await.forkDaemon) + _ <- p.complete(Exit.unit) + _ <- ZIO.foreach(fibers)(_.await) + } yield assertCompletes + }, + suite("State")( + suite("add")( + test("stack safety") { + (0 to 100000).foldLeft(empty[Nothing, Unit])((acc, _) => acc.add(_ => ())) + assertCompletes + } + ), + suite("complete")( + test("one") { + var increment = 0 + val state = empty[Nothing, Unit].add(_ => increment += 1) + state.complete(ZIO.unit) + assert(increment)(equalTo(1)) + }, + test("multiple") { + var increment = 0 + val state = (0 until n).foldLeft(empty[Nothing, Unit])((acc, _) => acc.add(_ => increment += 1)) + state.complete(ZIO.unit) + assert(increment)(equalTo(n)) + } + ), + suite("remove")( + test("one") { + var increment = 0 + val cb = (_: IO[Nothing, Unit]) => increment += 1 + val state = empty[Nothing, Unit].add(cb) + val removed = state.remove(cb) + removed.complete(ZIO.unit) + assert(removed)(equalTo(empty[Nothing, Unit])) && + assert(increment)(equalTo(0)) + }, + test("multiple") { + var fired = 0 + val cb = (_: IO[Nothing, Unit]) => () + val toRemove = (_: IO[Nothing, Unit]) => fired += 1 + val state = + (0 until n).foldLeft(empty[Nothing, Unit])((acc, i) => if (i < 5) acc.add(cb) else acc.add(toRemove)) + val removed = state.remove(toRemove) + removed.complete(ZIO.unit) + assert(removed.size)(equalTo(5)) && + assert(fired)(equalTo(0)) + } + ), + suite("complete")( + test("one") { + var completed = 0 + val state = empty[Nothing, Unit].add(_ => completed += 1) + state.complete(ZIO.unit) + assert(completed)(equalTo(1)) + }, + test("multiple") { + var completed = List.empty[Int] + val state = (0 until n).foldLeft(empty[Nothing, Unit])((acc, i) => acc.add(_ => completed = i :: completed)) + state.complete(ZIO.unit) + assert(completed)(equalTo(List.range(0, n))) + } + ) + ) ) } diff --git a/core-tests/shared/src/test/scala/zio/QueueSpec.scala b/core-tests/shared/src/test/scala/zio/QueueSpec.scala index 0eaeec3cf2e4..d5e6987b86f0 100644 --- a/core-tests/shared/src/test/scala/zio/QueueSpec.scala +++ b/core-tests/shared/src/test/scala/zio/QueueSpec.scala @@ -864,7 +864,25 @@ object QueueSpec extends ZIOBaseSpec { _ <- queue.offerAll(expected).fork actual <- queue.take.replicateZIO(100).map(Chunk.fromIterable) } yield assertTrue(actual == expected) - } @@ exceptJS(nonFlaky(1000)) + } @@ exceptJS(nonFlaky(1000)), + suite("suspension of take methods") { + + def testSuspension(takeF: Queue[?] => UIO[Any]): UIO[TestResult] = + for { + q <- Queue.unbounded[String] + f <- takeF(q).fork + _ <- f.status.repeatUntil(_.isSuspended) + _ <- f.interrupt + } yield assertCompletes + + List( + test("take")(testSuspension(_.take)), + test("takeN(1)")(testSuspension(_.takeN(1))), + test("takeN(>1)")(testSuspension(_.takeN(10))), + test("takeBetween with min 1")(testSuspension(_.takeBetween(1, 10))), + test("takeBetween with min >= 1")(testSuspension(_.takeBetween(5, 10))) + ) + } @@ TestAspect.timeout(5.seconds) ) } diff --git a/core-tests/shared/src/test/scala/zio/SemaphoreSpec.scala b/core-tests/shared/src/test/scala/zio/SemaphoreSpec.scala index 8fd5f58c48b0..78076c00ef76 100644 --- a/core-tests/shared/src/test/scala/zio/SemaphoreSpec.scala +++ b/core-tests/shared/src/test/scala/zio/SemaphoreSpec.scala @@ -32,6 +32,52 @@ object SemaphoreSpec extends ZIOBaseSpec { permits <- semaphore.available } yield assertTrue(permits == 2L) }, + test("tryWithPermits acquires and releases same number of permits") { + for { + sem <- Semaphore.make(3L) + ans <- sem.tryWithPermits(2L)(ZIO.unit) + permits <- sem.available + } yield assertTrue(permits == 3L && ans.isDefined) + }, + test("tryWithPermits if 0 permits requested") { + for { + sem <- Semaphore.make(3L) + ans <- sem.tryWithPermits(0L)(ZIO.succeed("I got executed")) + permits <- sem.available + } yield assertTrue(permits == 3L && ans.contains("I got executed")) + }, + test("tryWithPermits returns None if no permits available") { + for { + sem <- Semaphore.make(3L) + ans <- sem.tryWithPermits(4L)(ZIO.succeed("Shouldn't get executed")) + permits <- sem.available + } yield assertTrue(permits == 3L && ans.isEmpty) + }, + test("tryWithPermit acquires and releases same number of permits") { + for { + sem <- Semaphore.make(3L) + ans <- sem.tryWithPermit(ZIO.unit) + permits <- sem.available + } yield assertTrue(permits == 3L && ans.isDefined) + }, + test("tryWithPermits fails if requested permits in negative number") { + for { + sem <- Semaphore.make(3L) + ans <- sem.tryWithPermits(-1L)(ZIO.unit).exit + } yield assert(ans)(dies(isSubtype[IllegalArgumentException](anything))) + }, + test("tryWithPermits restores permits after failure") { + for { + sem <- Semaphore.make(3L) + failure = ZIO.fail("exception") + result <- sem.tryWithPermits(2L)(failure).exit + permits <- sem.available + } yield assertTrue( + permits == 3L, + result.isFailure, + result == Exit.fail("exception") + ) + }, test("awaiting returns the count of waiting fibers") { for { semaphore <- Semaphore.make(1) diff --git a/core-tests/shared/src/test/scala/zio/StackTracesSpec.scala b/core-tests/shared/src/test/scala/zio/StackTracesSpec.scala index 16719e2da03d..bfb65b8289f2 100644 --- a/core-tests/shared/src/test/scala/zio/StackTracesSpec.scala +++ b/core-tests/shared/src/test/scala/zio/StackTracesSpec.scala @@ -1,8 +1,11 @@ package zio -import zio.test.Assertion.{containsString, matchesRegex} -import zio.test.{TestResult, assert, assertTrue} -import zio.test.TestAspect.sequential +import zio.test.Assertion._ +import zio.test.TestAspect._ +import zio.test._ + +import java.io.{ByteArrayOutputStream, PrintStream} +import scala.collection.AbstractIterator object StackTracesSpec extends ZIOBaseSpec { @@ -12,12 +15,13 @@ object StackTracesSpec extends ZIOBaseSpec { for { _ <- ZIO.succeed(25) value = ZIO.fail("Oh no!") - trace <- matchPrettyPrintCause(value) - } yield { - assertHasExceptionInThreadZioFiber(trace)("java.lang.String: Oh no!") && - assertHasStacktraceFor(trace)("matchPrettyPrintCause") && - assertTrue(!trace.contains("Suppressed")) - } + cause <- value.cause + } yield assert(cause)(causeHasTrace { + """java.lang.String: Oh no! + | at zio.StackTracesSpec.spec.value + | at zio.StackTracesSpec.spec + |""".stripMargin + }) } ), suite("captureMultiMethod")( @@ -37,16 +41,21 @@ object StackTracesSpec extends ZIOBaseSpec { for { _ <- ZIO.succeed(25) value = underlyingFailure - trace <- matchPrettyPrintCause(value) - } yield { - assertHasExceptionInThreadZioFiber(trace)("java.lang.String: Oh no!") && - assertHasStacktraceFor(trace)("spec.deepUnderlyingFailure") && - assertHasStacktraceFor(trace)("spec.underlyingFailure") && - assertHasStacktraceFor(trace)("matchPrettyPrintCause") && - assert(trace)(containsString("Suppressed: java.lang.RuntimeException: deep failure")) && - assert(trace)(containsString("Suppressed: java.lang.RuntimeException: other failure")) && - assertTrue(numberOfOccurrences("Suppressed")(trace) == 2) - } + cause <- value.cause + } yield assert(cause)(causeHasTrace { + """java.lang.String: Oh no! + | at zio.StackTracesSpec.spec.deepUnderlyingFailure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + | Suppressed: java.lang.RuntimeException: deep failure + | at zio.StackTracesSpec.spec.deepUnderlyingFailure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + | Suppressed: java.lang.RuntimeException: other failure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + |""".stripMargin + }) }, test("captures a deep embedded failure without suppressing the underlying cause") { val deepUnderlyingFailure = @@ -64,15 +73,18 @@ object StackTracesSpec extends ZIOBaseSpec { for { _ <- ZIO.succeed(25) value = underlyingFailure - trace <- matchPrettyPrintCause(value) - } yield { - assertHasExceptionInThreadZioFiber(trace)("java.lang.String: Oh no!") && - assertHasStacktraceFor(trace)("spec.deepUnderlyingFailure") && - assertHasStacktraceFor(trace)("spec.underlyingFailure") && - assertHasStacktraceFor(trace)("matchPrettyPrintCause") && - assert(trace)(containsString("Suppressed: java.lang.RuntimeException: deep failure")) && - assertTrue(numberOfOccurrences("Suppressed")(trace) == 1) - } + cause <- value.cause + } yield assert(cause)(causeHasTrace { + """java.lang.String: Oh no! + | at zio.StackTracesSpec.spec.deepUnderlyingFailure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + | Suppressed: java.lang.RuntimeException: deep failure + | at zio.StackTracesSpec.spec.deepUnderlyingFailure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + |""".stripMargin + }) }, test("captures the embedded failure") { val underlyingFailure = @@ -84,14 +96,16 @@ object StackTracesSpec extends ZIOBaseSpec { for { _ <- ZIO.succeed(25) value = underlyingFailure - trace <- matchPrettyPrintCause(value) - } yield { - assertHasExceptionInThreadZioFiber(trace)("java.lang.String: Oh no!") && - assertHasStacktraceFor(trace)("spec.underlyingFailure") && - assertHasStacktraceFor(trace)("matchPrettyPrintCause") && - assert(trace)(containsString("Suppressed: java.lang.RuntimeException: other failure")) && - assertTrue(numberOfOccurrences("Suppressed")(trace) == 1) - } + cause <- value.cause + } yield assert(cause)(causeHasTrace { + """java.lang.String: Oh no! + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + | Suppressed: java.lang.RuntimeException: other failure + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + |""".stripMargin + }) }, test("captures a die failure") { val underlyingFailure = @@ -99,39 +113,212 @@ object StackTracesSpec extends ZIOBaseSpec { .succeed("ITEM") .map(_ => List.empty.head) - for { - trace <- matchPrettyPrintCause(underlyingFailure) - } yield { - assertHasExceptionInThreadZioFiber(trace)("java.util.NoSuchElementException: head of empty list") && - assertHasStacktraceFor(trace)("spec.underlyingFailure") && - assertHasStacktraceFor(trace)("matchPrettyPrintCause") + for (cause <- underlyingFailure.cause) + yield assert(cause)(causeHasTrace { + if (TestVersion.isScala2) + """java.util.NoSuchElementException: head of empty list + | at scala.collection.immutable.Nil$.head + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.ZIO.$anonfun$map + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + |""".stripMargin + else + """java.util.NoSuchElementException: head of empty list + | at scala.collection.immutable.Nil$.head + | at zio.StackTracesSpec$.$anonfun + | at zio.ZIO.map$$anonfun + | at zio.StackTracesSpec.spec.underlyingFailure + | at zio.StackTracesSpec.spec + |""".stripMargin + }) + } @@ jvmOnly @@ ignore + ), + suite("getOrThrowFiberFailure")( + test("fills in the external stack trace") { + def call(): Unit = subcall() + def subcall2() = ZIO.fail("boom") + def subcall(): Unit = Unsafe.unsafe { implicit unsafe => + Runtime.default.unsafe + .run(subcall2()) + .getOrThrowFiberFailure() } + + assertThrows(call())(exceptionHasTrace { + if (TestVersion.isScala2) + """java.lang.String: boom + | at zio.StackTracesSpec.spec.subcall2 + | at zio.StackTracesSpec.spec.subcall + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.$anonfun$spec + | at scala.runtime.java8.JFunction0$mcV$sp.apply + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.$anonfun$spec + |""".stripMargin + else + """java.lang.String: boom + | at zio.StackTracesSpec.spec.subcall2 + | at zio.StackTracesSpec.spec.subcall + | at zio.StackTracesSpec$.subcall$1$$anonfun + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.spec$$anonfun$6$$anonfun + | at zio.StackTracesSpec$.spec$$anonfun$6$$anonfun$adapted + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.spec$$anonfun + | at zio.test.TestConstructor$.apply$$anonfun$1$$anonfun + |""".stripMargin + }) + } + ) @@ jvmOnly, + suite("getOrThrow")( + test("fills in the external stack trace (as suppressed)") { + def call(): Unit = subcall() + def subcall2() = ZIO.die(new RuntimeException("boom")) + def subcall(): Unit = Unsafe.unsafe { implicit unsafe => + Runtime.default.unsafe + .run(subcall2()) + .getOrThrow() + } + + assertThrows(call())(exceptionHasTrace { + if (TestVersion.isScala2) + """java.lang.RuntimeException: boom + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.ZIO$.$anonfun$die + | at zio.ZIO$.$anonfun$failCause + | at zio.internal.FiberRuntime.runLoop + | at zio.internal.FiberRuntime.evaluateEffect + | at zio.internal.FiberRuntime.start + | at zio.Runtime$UnsafeAPIV1.runOrFork + | at zio.Runtime$UnsafeAPIV1.run + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.$anonfun$spec + | at scala.runtime.java8.JFunction0$mcV$sp.apply + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.internal.FiberRuntime.runLoop + | at zio.internal.FiberRuntime.evaluateEffect + | at zio.internal.FiberRuntime.evaluateMessageWhileSuspended + | at zio.internal.FiberRuntime.drainQueueOnCurrentThread + | at zio.internal.FiberRuntime.run + | at zio.internal.ZScheduler$$anon$3.run + | Suppressed: zio.Cause$FiberTrace: java.lang.RuntimeException: boom + | at zio.StackTracesSpec.spec.subcall2 + | at zio.StackTracesSpec.spec.subcall + | at zio.StackTracesSpec$.$anonfun$spec + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.$anonfun$spec + | at scala.runtime.java8.JFunction0$mcV$sp.apply + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.$anonfun$spec + |""".stripMargin + else + """java.lang.RuntimeException: boom + | at zio.StackTracesSpec$.subcall2$2$$anonfun + | at zio.ZIO$.die$$anonfun + | at zio.ZIO$.failCause$$anonfun + | at zio.internal.FiberRuntime.runLoop + | at zio.internal.FiberRuntime.evaluateEffect + | at zio.internal.FiberRuntime.start + | at zio.Runtime$UnsafeAPIV1.runOrFork + | at zio.Runtime$UnsafeAPIV1.run + | at zio.StackTracesSpec$.subcall$2$$anonfun + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.spec$$anonfun$7$$anonfun + | at zio.StackTracesSpec$.spec$$anonfun$7$$anonfun$adapted + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.spec$$anonfun + | at zio.test.TestConstructor$.apply$$anonfun$1$$anonfun + | at zio.internal.FiberRuntime.runLoop + | at zio.internal.FiberRuntime.evaluateEffect + | at zio.internal.FiberRuntime.evaluateMessageWhileSuspended + | at zio.internal.FiberRuntime.drainQueueOnCurrentThread + | at zio.internal.FiberRuntime.run + | at zio.internal.ZScheduler$$anon$3.run + | Suppressed: zio.Cause$FiberTrace: java.lang.RuntimeException: boom + | at zio.StackTracesSpec.spec.subcall2 + | at zio.StackTracesSpec.spec.subcall + | at zio.StackTracesSpec$.subcall$2$$anonfun + | at zio.Unsafe$.unsafe + | at zio.StackTracesSpec$.subcall + | at zio.StackTracesSpec$.call + | at zio.StackTracesSpec$.spec$$anonfun$7$$anonfun + | at zio.StackTracesSpec$.spec$$anonfun$7$$anonfun$adapted + | at zio.StackTracesSpec$.assertThrows + | at zio.StackTracesSpec$.spec$$anonfun + | at zio.test.TestConstructor$.apply$$anonfun$1$$anonfun + |""".stripMargin + }) } - ) + ) @@ jvmOnly ) @@ sequential // set to true to print traces private val debug = false - private def show(trace: => Cause[Any]): Unit = - if (debug) { - println("*****") - println(trace.prettyPrint) + private val locationRegex = + """(\$\d+)?\((\w|\$)+\.(scala|java):\d+\)$""".r + + private def assertThrows[T](task: => T)(expected: Assertion[Throwable]) = + try { + val value = task + assert(value)(assertion("expected an exception")(_ => false)) + } catch { + case exception: Throwable => + assert(exception)(expected) } - private def assertHasExceptionInThreadZioFiber(trace: String): String => TestResult = - errorMessage => assert(trace)(matchesRegex(s"""(?s)^Exception in thread\\s"zio-fiber-\\d*"\\s$errorMessage.*""")) + def strip(trace: String) = { + if (debug) println(trace) + val stripped = trace.linesIterator.map(locationRegex.replaceAllIn(_, "")) + dedupe(stripped.filterNot(_.isEmpty)).mkString("", "\n", "\n") + } + + def causeHasTrace(expected: String): Assertion[Cause[Any]] = + hasField("stackTrace", strippedStackTrace, equalTo(expected)) + + def exceptionHasTrace(expected: String): Assertion[Throwable] = + hasField("stackTrace", strippedStackTrace, equalTo(expected)) + + def strippedStackTrace(cause: Cause[Any]): String = + strip(cause.prettyPrint) - private def assertHasStacktraceFor(trace: String): String => TestResult = subject => - assert(trace)(matchesRegex(s"""(?s).*at zio\\.StackTracesSpec.?\\.$subject.*\\(.*:\\d*\\).*""")) + def strippedStackTrace(exception: Throwable): String = { + val buffer = new ByteArrayOutputStream + exception.printStackTrace(new PrintStream(buffer)) + strip(buffer.toString) + } + + def dedupe[A](it: Iterator[A]): Iterator[A] = new AbstractIterator[A] { + private var current = + if (it.hasNext) Some(it.next()) else None - private def numberOfOccurrences(text: String): String => Int = stack => - (stack.length - stack.replace(text, "").length) / text.length + override def hasNext = + current.isDefined - private val matchPrettyPrintCause: ZIO[Any, String, Nothing] => ZIO[Any, Throwable, String] = { - case fail: IO[String, Nothing] => - fail.catchAllCause { cause => - ZIO.succeed(show(cause)) *> ZIO.attempt(cause.prettyPrint) + override def next(): A = { + while (it.hasNext) { + val next = it.next() + if (!current.contains(next)) { + try return current.get + finally current = Some(next) + } } + + try current.get + finally current = None + } } } diff --git a/core-tests/shared/src/test/scala/zio/ZIOSpec.scala b/core-tests/shared/src/test/scala/zio/ZIOSpec.scala index 93e4c9d7f7f4..5ee8388418b8 100644 --- a/core-tests/shared/src/test/scala/zio/ZIOSpec.scala +++ b/core-tests/shared/src/test/scala/zio/ZIOSpec.scala @@ -266,7 +266,14 @@ object ZIOSpec extends ZIOBaseSpec { _ <- startWaiting.await _ <- callFiber.interrupt } yield assert(first)(equalTo(false)) - } + }, + test("does not cache interruption") { + for { + effect <- ZIO.sleep(Duration.fromSeconds(1)).cached(Duration.Infinity) + res1 <- effect.timeout(Duration.fromMillis(1)).exit + res2 <- effect.exit + } yield assertTrue(res1.isSuccess, !res1.isInterrupted, res2.isSuccess, !res2.isInterrupted) + } @@ TestAspect.withLiveClock ), suite("catchNonFatalOrDie")( test("recovers from NonFatal") { @@ -3746,6 +3753,168 @@ object ZIOSpec extends ZIOBaseSpec { assertZIO(zio.provide(ZLayer.succeed(0)))(equalTo(3)) } ), + suite("fromFunction")( + test("1 arg") { + ZIO + .fromFunction((int: Int) => int + 1) + .map(n => assertTrue(n == 2)) + .provide(ZLayer.succeed(1)) + }, + test("2 arg") { + ZIO.fromFunction { (int: Int, s: String) => + assertTrue(int == 1, s == "a") + } + .provide(ZLayer.succeed(1) ++ ZLayer.succeed("a")) + }, + test("3 arg") { + ZIO.fromFunction { (int: Int, s: String, d: Double) => + assertTrue(int == 1, s == "a", d == 2.0) + } + .provide(ZLayer.succeed(1) ++ ZLayer.succeed("a") ++ ZLayer.succeed(2.0)) + }, + test("4 arg") { + ZIO.fromFunction { (int: Int, s: String, d: Double, b: Byte) => + assertTrue(int == 1, s == "a", d == 2.0, b == 3) + } + .provide(ZLayer.succeed(1) ++ ZLayer.succeed("a") ++ ZLayer.succeed(2.0) ++ ZLayer.succeed(3.toByte)) + }, + test("5 arg") { + ZIO.fromFunction { (int: Int, s: String, d: Double, b: Byte, c: Char) => + assertTrue(int == 1, s == "a", d == 2.0, b == 3, c == 'c') + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("a") ++ + ZLayer.succeed(2.0) ++ + ZLayer.succeed(3.toByte) ++ + ZLayer.succeed('c') + ) + }, + test("6 arg") { + ZIO.fromFunction { (int: Int, s: String, d: Double, b: Byte, c: Char, li: List[Int]) => + assertTrue(int == 1, s == "a", d == 2.0, b == 3, c == 'c', li == List(1, 2)) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("a") ++ + ZLayer.succeed(2.0) ++ + ZLayer.succeed(3.toByte) ++ + ZLayer.succeed('c') ++ + ZLayer.succeed(List(1, 2)) + ) + }, + test("7 arg") { + ZIO.fromFunction { (int: Int, s: String, d: Double, b: Byte, c: Char, li: List[Int], ls: List[String]) => + assertTrue(int == 1, s == "a", d == 2.0, b == 3, c == 'c', li == List(1, 2), ls == List("a", "b")) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("a") ++ + ZLayer.succeed(2.0) ++ + ZLayer.succeed(3.toByte) ++ + ZLayer.succeed('c') ++ + ZLayer.succeed(List(1, 2)) ++ + ZLayer.succeed(List("a", "b")) + ) + } + ), + suite("fromFunctionZIO")( + test("1 arg") { + ZIO.fromFunctionZIO { (int: Int) => + ZIO.succeed(assertTrue(int == 1)) + } + .provide(ZLayer.succeed(1)) + }, + test("fromFunction works with environment") { + trait MyService { + def n: Int + } + + ZIO.fromFunctionZIO { (int: Int) => + ZIO + .serviceWith[MyService](_.n + int) + .map(n => assertTrue(n == 3)) + } + .provide(ZLayer.succeed(2) ++ ZLayer.succeed(new MyService { def n = 1 })) + }, + test("2 arg") { + ZIO.fromFunctionZIO { (int: Int, string: String) => + ZIO.succeed(assertTrue(int == 1, string == "ab")) + } + .provide(ZLayer.succeed(1) ++ ZLayer.succeed("ab")) + }, + test("3 arg") { + ZIO.fromFunctionZIO { (int: Int, string: String, double: Double) => + ZIO.succeed(assertTrue(int == 1, string == "ab", double == 3)) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("ab") ++ + ZLayer.succeed(3.toDouble) + ) + }, + test("4 arg") { + ZIO.fromFunctionZIO { (int: Int, string: String, double: Double, byte: Byte) => + ZIO.succeed(assertTrue(int == 1, string == "ab", double == 3, byte == 4)) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("ab") ++ + ZLayer.succeed(3.toDouble) ++ + ZLayer.succeed(4.toByte) + ) + }, + test("5 arg") { + ZIO.fromFunctionZIO { (int: Int, string: String, double: Double, byte: Byte, c: Char) => + ZIO.succeed(assertTrue(int == 1, string == "ab", double == 3, byte == 4, c == 'c')) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("ab") ++ + ZLayer.succeed(3.toDouble) ++ + ZLayer.succeed(4.toByte) ++ + ZLayer.succeed('c') + ) + }, + test("6 arg") { + ZIO.fromFunctionZIO { (int: Int, string: String, double: Double, byte: Byte, c: Char, li: List[Int]) => + ZIO.succeed(assertTrue(int == 1, string == "ab", double == 3, byte == 4, c == 'c', li == List(1, 2))) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("ab") ++ + ZLayer.succeed(3.toDouble) ++ + ZLayer.succeed(4.toByte) ++ + ZLayer.succeed('c') ++ + ZLayer.succeed(List(1, 2)) + ) + }, + test("7 arg") { + ZIO.fromFunctionZIO { + (int: Int, string: String, double: Double, byte: Byte, c: Char, li: List[Int], ls: List[String]) => + ZIO.succeed( + assertTrue( + int == 1, + string == "ab", + double == 3, + byte == 4, + c == 'c', + li == List(1, 2), + ls == List("a", "b") + ) + ) + } + .provide( + ZLayer.succeed(1) ++ + ZLayer.succeed("ab") ++ + ZLayer.succeed(3.toDouble) ++ + ZLayer.succeed(4.toByte) ++ + ZLayer.succeed('c') ++ + ZLayer.succeed(List(1, 2)) ++ + ZLayer.succeed(List("a", "b")) + ) + } + ), suite("schedule")( test("runs effect for each recurrence of the schedule") { for { @@ -4546,7 +4715,7 @@ object ZIOSpec extends ZIOBaseSpec { for { future <- ZIO.fail(new Throwable(new IllegalArgumentException)).toFuture result <- ZIO.fromFuture(_ => future).either - } yield assert(result)(isLeft(hasSuppressed(exists(hasMessage(containsString("zio-fiber")))))) + } yield assert(result)(isLeft(hasSuppressed(exists(hasMessage(containsString("IllegalArgumentException")))))) } ) @@ zioTag(future), suite("resurrect")( diff --git a/core-tests/shared/src/test/scala/zio/autowire/AutoWireSpec.scala b/core-tests/shared/src/test/scala/zio/autowire/AutoWireSpec.scala index 0141cd3b0ac5..ef7d5f659b71 100644 --- a/core-tests/shared/src/test/scala/zio/autowire/AutoWireSpec.scala +++ b/core-tests/shared/src/test/scala/zio/autowire/AutoWireSpec.scala @@ -248,6 +248,30 @@ object AutoWireSpec extends ZIOBaseSpec { ZLayer.succeed(true) ++ ZLayer.succeed(100.1) >>> layer ) assertZIO(provided)(equalTo(128)) + }, + test("displays error message when remainder type does not match") { + + val checked = typeCheck( + """ + class Engine + class Wheels + class Car + + val carLayer: ZLayer[Engine with Wheels, Nothing, Car] = ??? + val wheelsLayer: ZLayer[Any, Nothing, Wheels] = ??? + val layer = ZLayer.makeSome[String, Car](carLayer, wheelsLayer) + """ + ) + + assertZIO(checked)( + isLeft( + containsStringWithoutAnsi("Please provide a layer for the following type:") && + containsStringWithoutAnsi("Required by carLayer") && + containsStringWithoutAnsi("1. Engine") && + containsStringWithoutAnsi("Alternatively, you may add them to the remainder type ascription:") && + containsStringWithoutAnsi("provideSome[Engine]") + ) + ) } ) ) diff --git a/core-tests/shared/src/test/scala/zio/stm/TSemaphoreSpec.scala b/core-tests/shared/src/test/scala/zio/stm/TSemaphoreSpec.scala index c3c1d4cf01cb..1452a2c7bfb1 100644 --- a/core-tests/shared/src/test/scala/zio/stm/TSemaphoreSpec.scala +++ b/core-tests/shared/src/test/scala/zio/stm/TSemaphoreSpec.scala @@ -137,6 +137,83 @@ object TSemaphoreSpec extends ZIOBaseSpec { assertTrue(remaining == 3L) } } + ), + suite("tryAcquire, tryAcquireN, tryWithPermit and tryWithPermits")( + test("tryAcquire should succeed when a permit is available") { + for { + sem <- TSemaphore.makeCommit(1L) + res <- sem.tryAcquire.commit + } yield assert(res)(isTrue) + }, + test("tryAcquire should fail when no permits are available") { + for { + sem <- TSemaphore.makeCommit(0L) + res <- sem.tryAcquire.commit + } yield assert(res)(isFalse) + }, + test("tryAcquire should decrease the permit count when successful") { + for { + sem <- TSemaphore.makeCommit(1L) + _ <- sem.tryAcquire.commit + avail <- sem.available.commit + } yield assert(avail)(equalTo(0L)) + }, + test("tryAcquireN should acquire permits if enough are available") { + for { + sem <- TSemaphore.makeCommit(5L) + res <- sem.tryAcquireN(3L).commit + } yield assert(res)(isTrue) + }, + test("tryAcquireN should fail if not enough permits are available") { + for { + sem <- TSemaphore.makeCommit(2L) + res <- sem.tryAcquireN(3L).commit + } yield assert(res)(isFalse) + }, + test("tryAcquireN should decrease the permit count when successful") { + for { + sem <- TSemaphore.makeCommit(5L) + _ <- sem.tryAcquireN(3L).commit + avail <- sem.available.commit + } yield assert(avail)(equalTo(2L)) + }, + test("tryAcquireN should not change permit count when unsuccessful") { + for { + sem <- TSemaphore.makeCommit(2L) + _ <- sem.tryAcquireN(3L).commit + avail <- sem.available.commit + } yield assert(avail)(equalTo(2L)) + }, + test("tryWithPermits should acquire a permit and release it") { + for { + sem <- TSemaphore.makeCommit(2L) + result <- sem.tryWithPermits(1L)(ZIO.succeed(2)) + avail <- sem.available.commit + } yield assertTrue(result.contains(2) && avail == 2L) + }, + test("tryWithPermits should return None if no permits available") { + for { + sem <- TSemaphore.makeCommit(0L) + result <- sem.tryWithPermits(1L)(ZIO.succeed(2)) + avail <- sem.available.commit + } yield assertTrue(result.isEmpty && avail == 0L) + }, + test( + "tryWithPermits should return None if requested amount of permits is greater than available amount of permits" + ) { + for { + sem <- TSemaphore.makeCommit(3L) + result <- sem.tryWithPermits(5L)(ZIO.succeed(2)) + avail <- sem.available.commit + } yield assertTrue(result.isEmpty && avail == 3L) + }, + test("tryWithPermit should acquire a permit and release it") { + for { + sem <- TSemaphore.makeCommit(3L) + result <- sem.tryWithPermit(ZIO.succeed(2)) + avail <- sem.available.commit + } yield assertTrue(result.contains(2) && avail == 3L) + } ) ) diff --git a/core/js/src/main/scala/zio/ClockPlatformSpecific.scala b/core/js/src/main/scala/zio/ClockPlatformSpecific.scala index 66a696e768a8..25d72da023b3 100644 --- a/core/js/src/main/scala/zio/ClockPlatformSpecific.scala +++ b/core/js/src/main/scala/zio/ClockPlatformSpecific.scala @@ -21,7 +21,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.scalajs.js private[zio] trait ClockPlatformSpecific { - private[zio] val globalScheduler = new Scheduler { + private[zio] val globalScheduler: Scheduler = new Scheduler.Internal { import Scheduler.CancelToken private[this] val ConstFalse = () => false diff --git a/core/js/src/main/scala/zio/Scheduler.scala b/core/js/src/main/scala/zio/Scheduler.scala index d49b959255ed..d61ee3c18f25 100644 --- a/core/js/src/main/scala/zio/Scheduler.scala +++ b/core/js/src/main/scala/zio/Scheduler.scala @@ -21,13 +21,15 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.util.concurrent.{ScheduledExecutorService, TimeUnit} -abstract class Scheduler { +sealed abstract class Scheduler { def schedule(task: Runnable, duration: Duration)(implicit unsafe: Unsafe): CancelToken } object Scheduler { type CancelToken = () => Boolean + private[zio] abstract class Internal extends Scheduler + def fromScheduledExecutorService(service: ScheduledExecutorService): Scheduler = new Scheduler { val ConstFalse = () => false diff --git a/core/jvm-native/src/main/scala/zio/ClockPlatformSpecific.scala b/core/jvm-native/src/main/scala/zio/ClockPlatformSpecific.scala index 701317470100..bdf2c62908bf 100644 --- a/core/jvm-native/src/main/scala/zio/ClockPlatformSpecific.scala +++ b/core/jvm-native/src/main/scala/zio/ClockPlatformSpecific.scala @@ -24,7 +24,7 @@ import java.util.concurrent._ private[zio] trait ClockPlatformSpecific { import Scheduler.CancelToken - private[zio] val globalScheduler = new Scheduler { + private[zio] val globalScheduler: Scheduler = new Scheduler.Internal { private[this] val service = makeService() diff --git a/core/jvm-native/src/main/scala/zio/Scheduler.scala b/core/jvm-native/src/main/scala/zio/Scheduler.scala index a73d1010f902..238214b87be5 100644 --- a/core/jvm-native/src/main/scala/zio/Scheduler.scala +++ b/core/jvm-native/src/main/scala/zio/Scheduler.scala @@ -21,7 +21,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.util.concurrent.{ScheduledExecutorService, TimeUnit} -abstract class Scheduler { +sealed abstract class Scheduler { def asScheduledExecutorService: ScheduledExecutorService def schedule(task: Runnable, duration: Duration)(implicit unsafe: Unsafe): CancelToken } @@ -29,6 +29,8 @@ abstract class Scheduler { object Scheduler { type CancelToken = () => Boolean + private[zio] abstract class Internal extends Scheduler + def fromScheduledExecutorService(service: ScheduledExecutorService): Scheduler = new Scheduler { val ConstFalse = () => false diff --git a/core/jvm-native/src/main/scala/zio/SystemPlatformSpecific.scala b/core/jvm-native/src/main/scala/zio/SystemPlatformSpecific.scala index 9ac989aebf8d..7cfd90a741c8 100644 --- a/core/jvm-native/src/main/scala/zio/SystemPlatformSpecific.scala +++ b/core/jvm-native/src/main/scala/zio/SystemPlatformSpecific.scala @@ -20,8 +20,7 @@ import zio.internal.stacktracer.Tracer import zio.stacktracer.TracingImplicits.disableAutoTrace import java.lang.{System => JSystem} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ private[zio] trait SystemPlatformSpecific { self: System.type => @@ -29,7 +28,6 @@ private[zio] trait SystemPlatformSpecific { self: System.type => override def env(variable: String): Option[String] = Option(JSystem.getenv(variable)) - @nowarn("msg=JavaConverters") override def envs: Map[String, String] = JSystem.getenv().asScala.toMap } diff --git a/core/jvm-native/src/main/scala/zio/ZIOAppPlatformSpecific.scala b/core/jvm-native/src/main/scala/zio/ZIOAppPlatformSpecific.scala index 3c6c1c2deafc..484178de776b 100644 --- a/core/jvm-native/src/main/scala/zio/ZIOAppPlatformSpecific.scala +++ b/core/jvm-native/src/main/scala/zio/ZIOAppPlatformSpecific.scala @@ -24,20 +24,16 @@ private[zio] trait ZIOAppPlatformSpecific { self: ZIOApp => } yield result).provideLayer(newLayer.tapErrorCause(ZIO.logErrorCause(_))) runtime.unsafe.run { - ZIO.uninterruptibleMask { restore => + ZIO.uninterruptible { for { fiberId <- ZIO.fiberId - p <- Promise.make[Nothing, Set[FiberId.Runtime]] - fiber <- restore(workflow).onExit { exit0 => - val exitCode = if (exit0.isSuccess) ExitCode.success else ExitCode.failure - val interrupt = interruptRootFibers(p) - // If we're shutting down due to an external signal, the shutdown hook will fulfill the promise - // Otherwise it means we're shutting down due to normal completion and we need to fulfill the promise - ZIO.unless(shuttingDown.get())(p.succeed(Set(fiberId))) *> interrupt *> exit(exitCode) + fiber <- workflow.interruptible.exitWith { exit0 => + val exitCode = if (exit0.isSuccess) ExitCode.success else ExitCode.failure + interruptRootFibers(fiberId).as(exitCode) }.fork _ <- ZIO.succeed(Platform.addShutdownHook { () => - if (!shuttingDown.getAndSet(true)) { + if (shuttingDown.compareAndSet(false, true)) { if (FiberRuntime.catastrophicFailure.get) { println( @@ -48,28 +44,26 @@ private[zio] trait ZIOAppPlatformSpecific { self: ZIOApp => "Check the logs for more details and consider overriding `Runtime.reportFatal` to capture context." ) } else { + // NOTE: try-catch likely not needed, + // but guarding against cases where the submission of the task fails spuriously try { - val completePromise = ZIO.fiberIdWith(fid2 => p.succeed(Set(fiberId, fid2))) - runtime.unsafe.run(completePromise *> fiber.interrupt) + fiber.tellInterrupt(Cause.interrupt(fiberId)) } catch { case _: Throwable => } } - - () } }) result <- fiber.join - } yield result + _ <- exit(result) + } yield () } }.getOrThrowFiberFailure() } - private def interruptRootFibers(p: Promise[Nothing, Set[FiberId.Runtime]])(implicit trace: Trace): UIO[Unit] = + private def interruptRootFibers(mainFiberId: FiberId)(implicit trace: Trace): UIO[Unit] = for { - ignoredIds <- p.await - roots <- Fiber.roots - _ <- Fiber.interruptAll(roots.view.filter(fiber => fiber.isAlive() && !ignoredIds(fiber.id))) + roots <- Fiber.roots + _ <- Fiber.interruptAll(roots.view.filter(fiber => fiber.isAlive() && (fiber.id != mainFiberId))) } yield () - } diff --git a/core/jvm/src/main/scala/zio/FiberPlatformSpecific.scala b/core/jvm/src/main/scala/zio/FiberPlatformSpecific.scala index 4f6b7036ccd8..1a7f09766a48 100644 --- a/core/jvm/src/main/scala/zio/FiberPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/FiberPlatformSpecific.scala @@ -37,17 +37,16 @@ private[zio] trait FiberPlatformSpecific { if (cf.isDone) { ZIO .isFatalWith(isFatal => javaz.unwrapDone(isFatal)(cf)) - .fold(Exit.fail, Exit.succeed) - .map(Some(_)) + .fold(e => Some(Exit.fail(e)), v => Some(Exit.succeed(v))) } else { - ZIO.succeed(None) + Exit.none } } def id: FiberId = FiberId.None final def interruptAsFork(id: FiberId)(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(cs.toCompletableFuture.cancel(false)).unit + ZIO.succeed(cs.toCompletableFuture.cancel(false): Unit) final def inheritAll(implicit trace: Trace): UIO[Unit] = ZIO.unit } @@ -72,17 +71,16 @@ private[zio] trait FiberPlatformSpecific { if (ftr.isDone) { ZIO .isFatalWith(isFatal => javaz.unwrapDone(isFatal)(ftr)) - .fold(Exit.fail, Exit.succeed) - .map(Some(_)) + .fold(e => Some(Exit.fail(e)), v => Some(Exit.succeed(v))) } else { - ZIO.none + Exit.none } } def id: FiberId = FiberId.None def interruptAsFork(id: FiberId)(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(ftr.cancel(false)).unit + ZIO.succeed(ftr.cancel(false): Unit) def inheritAll(implicit trace: Trace): UIO[Unit] = ZIO.unit } diff --git a/core/jvm/src/main/scala/zio/interop/javaz.scala b/core/jvm/src/main/scala/zio/interop/javaz.scala index 449524af44a8..741847a16971 100644 --- a/core/jvm/src/main/scala/zio/interop/javaz.scala +++ b/core/jvm/src/main/scala/zio/interop/javaz.scala @@ -73,18 +73,17 @@ private[zio] object javaz { if (cf.isDone) { unwrapDone(isFatal)(cf) } else { - val cancel = ZIO.succeed(cf.cancel(false)) restore { ZIO.asyncInterrupt[Any, Throwable, A] { cb => - val _ = cs.handle[Unit] { (v: A, t: Throwable) => + cs.handle[Unit] { (v: A, t: Throwable) => val io = if (t eq null) Exit.succeed(v) else catchFromGet(isFatal).applyOrElse(t, (d: Throwable) => ZIO.die(d)) cb(io) } - Left(cancel) + Left(ZIO.succeed(cf.cancel(false))) } - }.onInterrupt(cancel) + } } } } @@ -102,8 +101,11 @@ private[zio] object javaz { unwrapDone(isFatal)(future) } else { restore { - ZIO.blocking(ZIO.suspend(unwrapDone(isFatal)(future))) - }.onInterrupt(ZIO.succeed(future.cancel(false))) + ZIO.blocking(unwrapDone(isFatal)(future)) + }.catchAllCause { c => + if (c.isInterruptedOnly) future.cancel(false) + Exit.failCause(c) + } } } } diff --git a/core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala b/core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala index 149da05d4733..9175de6d2553 100644 --- a/core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala +++ b/core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala @@ -4,8 +4,7 @@ import zio._ import zio.metrics._ import java.lang.management.{BufferPoolMXBean, ManagementFactory} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ final case class BufferPools( bufferPoolUsedBytes: PollingMetric[Any, Throwable, Chunk[MetricState.Gauge]], @@ -14,7 +13,6 @@ final case class BufferPools( ) object BufferPools { - @nowarn("msg=JavaConverters") val live: ZLayer[JvmMetricsSchedule, Throwable, Reloadable[BufferPools]] = ZLayer.scoped { for { diff --git a/core/jvm/src/main/scala/zio/metrics/jvm/GarbageCollector.scala b/core/jvm/src/main/scala/zio/metrics/jvm/GarbageCollector.scala index 9d12fe949c3d..df6d7b70090a 100644 --- a/core/jvm/src/main/scala/zio/metrics/jvm/GarbageCollector.scala +++ b/core/jvm/src/main/scala/zio/metrics/jvm/GarbageCollector.scala @@ -4,8 +4,7 @@ import zio._ import zio.metrics._ import java.lang.management.ManagementFactory -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ final case class GarbageCollector( gcCollectionSecondsSum: PollingMetric[Any, Throwable, Chunk[MetricState.Gauge]], @@ -13,7 +12,6 @@ final case class GarbageCollector( ) object GarbageCollector { - @nowarn("msg=JavaConverters") val live: ZLayer[JvmMetricsSchedule, Throwable, GarbageCollector] = ZLayer.scoped { for { diff --git a/core/jvm/src/main/scala/zio/metrics/jvm/MemoryAllocation.scala b/core/jvm/src/main/scala/zio/metrics/jvm/MemoryAllocation.scala index e0c1c189e787..27497884ad03 100644 --- a/core/jvm/src/main/scala/zio/metrics/jvm/MemoryAllocation.scala +++ b/core/jvm/src/main/scala/zio/metrics/jvm/MemoryAllocation.scala @@ -8,9 +8,8 @@ import zio.metrics._ import java.lang.management.{GarbageCollectorMXBean, ManagementFactory} import javax.management.openmbean.CompositeData import javax.management.{Notification, NotificationEmitter, NotificationListener} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.jdk.CollectionConverters._ final case class MemoryAllocation(listener: NotificationListener, garbageCollectorMXBeans: List[GarbageCollectorMXBean]) @@ -26,7 +25,6 @@ object MemoryAllocation { private class Listener(runtime: Runtime[Any]) extends NotificationListener { private val lastMemoryUsage: mutable.Map[String, Long] = mutable.HashMap.empty - @nowarn("msg=JavaConverters") override def handleNotification(notification: Notification, handback: Any): Unit = { val info = GarbageCollectionNotificationInfo.from(notification.getUserData.asInstanceOf[CompositeData]) @@ -72,7 +70,6 @@ object MemoryAllocation { } } - @nowarn("msg=JavaConverters") val live: ZLayer[Any, Throwable, MemoryAllocation] = ZLayer.scoped { ZIO diff --git a/core/jvm/src/main/scala/zio/metrics/jvm/MemoryPools.scala b/core/jvm/src/main/scala/zio/metrics/jvm/MemoryPools.scala index a908fde0d751..99e20477102e 100644 --- a/core/jvm/src/main/scala/zio/metrics/jvm/MemoryPools.scala +++ b/core/jvm/src/main/scala/zio/metrics/jvm/MemoryPools.scala @@ -1,12 +1,10 @@ package zio.metrics.jvm import zio._ -import zio.metrics.MetricKeyType.Gauge import zio.metrics._ import java.lang.management.{ManagementFactory, MemoryPoolMXBean, MemoryUsage} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ final case class MemoryPools( memoryBytesUsed: PollingMetric[Any, Throwable, Chunk[MetricState.Gauge]], @@ -106,7 +104,6 @@ object MemoryPools { jvmMemoryPoolInitBytes = "jvm_memory_pool_bytes_init" ) - @nowarn("msg=JavaConverters") private def withNames( jvmMemoryCommittedBytes: String, jvmMemoryInitBytes: String, diff --git a/core/shared/src/main/scala-2.12/zio/ChunkLike.scala b/core/shared/src/main/scala-2.12/zio/ChunkLike.scala index 15e0266b5cba..414eadf55313 100644 --- a/core/shared/src/main/scala-2.12/zio/ChunkLike.scala +++ b/core/shared/src/main/scala-2.12/zio/ChunkLike.scala @@ -33,7 +33,7 @@ import scala.reflect.ClassTag * * Note that `IndexedSeq` is not a referentially transparent interface in that * it exposes methods that are partial (e.g. `apply`), allocate mutable state - * (e.g. `iterator`), or are purely side effecting (e.g. `foreach`). `Chunk` + * (e.g. `iterator`), or are purely side effecting (e.g. `foreach`). `ChunkLike` * extends `IndexedSeq` to provide interoperability with Scala's collection * library but users should avoid these methods whenever possible. */ diff --git a/core/shared/src/main/scala-2.13+/zio/ChunkLike.scala b/core/shared/src/main/scala-2.13+/zio/ChunkLike.scala index b36437088bc5..740161b89e65 100644 --- a/core/shared/src/main/scala-2.13+/zio/ChunkLike.scala +++ b/core/shared/src/main/scala-2.13+/zio/ChunkLike.scala @@ -32,7 +32,7 @@ import scala.reflect.ClassTag * * Note that `IndexedSeq` is not a referentially transparent interface in that * it exposes methods that are partial (e.g. `apply`), allocate mutable state - * (e.g. `iterator`), or are purely side effecting (e.g. `foreach`). `Chunk` + * (e.g. `iterator`), or are purely side effecting (e.g. `foreach`). `ChunkLike` * extends `IndexedSeq` to provide interoperability with Scala's collection * library but users should avoid these methods whenever possible. */ diff --git a/core/shared/src/main/scala-2/zio/ZIOCompanionVersionSpecific.scala b/core/shared/src/main/scala-2/zio/ZIOCompanionVersionSpecific.scala index adcbec9e4fbe..0a0b20ca4879 100644 --- a/core/shared/src/main/scala-2/zio/ZIOCompanionVersionSpecific.scala +++ b/core/shared/src/main/scala-2/zio/ZIOCompanionVersionSpecific.scala @@ -4,6 +4,7 @@ import zio.ZIO.Async import zio.stacktracer.TracingImplicits.disableAutoTrace import java.io.IOException +import java.util.concurrent.atomic.AtomicReference private[zio] trait ZIOCompanionVersionSpecific { @@ -44,22 +45,15 @@ private[zio] trait ZIOCompanionVersionSpecific { blockingOn: => FiberId = FiberId.None )(implicit trace: Trace): ZIO[R, E, A] = ZIO.suspendSucceed { - val cancelerRef = new java.util.concurrent.atomic.AtomicReference[URIO[R, Any]](ZIO.unit) - - ZIO - .Async[R, E, A]( - trace, - { k => - val result = register(k(_)) - - result match { - case Left(canceler) => cancelerRef.set(canceler); null.asInstanceOf[ZIO[R, E, A]] - case Right(done) => done - } - }, - () => blockingOn - ) - .onInterrupt(cancelerRef.get()) + val state = new AtomicReference[URIO[R, Any]](Exit.unit) with ((ZIO[R, E, A] => Unit) => ZIO[R, E, A]) { + def apply(k: ZIO[R, E, A] => Unit): ZIO[R, E, A] = + register(k(_)) match { + case Left(canceler) => set(canceler); null.asInstanceOf[ZIO[R, E, A]] + case Right(done) => done + } + } + + ZIO.Async[R, E, A](trace, state, () => blockingOn).onInterrupt(state.get()) } /** diff --git a/core/shared/src/main/scala-2/zio/internal/macros/LayerMacroUtils.scala b/core/shared/src/main/scala-2/zio/internal/macros/LayerMacroUtils.scala index bc027fbad24b..43ea9ad8cdc0 100644 --- a/core/shared/src/main/scala-2/zio/internal/macros/LayerMacroUtils.scala +++ b/core/shared/src/main/scala-2/zio/internal/macros/LayerMacroUtils.scala @@ -110,7 +110,7 @@ private[zio] trait LayerMacroUtils { val builder = LayerBuilder[Type, LayerExpr]( target0 = getRequirements[R], - remainder = getRequirements[R0], + remainder = RemainderMethod.Provided(getRequirements[R0]), providedLayers0 = layers.toList, layerToDebug = debugMap, sideEffectType = definitions.UnitTpe, diff --git a/core/shared/src/main/scala-3/zio/ZIOCompanionVersionSpecific.scala b/core/shared/src/main/scala-3/zio/ZIOCompanionVersionSpecific.scala index c9f766ea9953..070c8198dce9 100644 --- a/core/shared/src/main/scala-3/zio/ZIOCompanionVersionSpecific.scala +++ b/core/shared/src/main/scala-3/zio/ZIOCompanionVersionSpecific.scala @@ -4,6 +4,7 @@ import zio.ZIO.Async import zio.stacktracer.TracingImplicits.disableAutoTrace import java.io.IOException +import java.util.concurrent.atomic.AtomicReference import scala.annotation.targetName private[zio] transparent trait ZIOCompanionVersionSpecific { @@ -51,22 +52,15 @@ private[zio] transparent trait ZIOCompanionVersionSpecific { blockingOn: => FiberId = FiberId.None )(implicit trace: Trace): ZIO[R, E, A] = ZIO.suspendSucceed { - val cancelerRef = new java.util.concurrent.atomic.AtomicReference[URIO[R, Any]](ZIO.unit) - - ZIO - .Async[R, E, A]( - trace, - { k => - val result = register(using Unsafe)(k(_)) - - result match { - case Left(canceler) => cancelerRef.set(canceler); null.asInstanceOf[ZIO[R, E, A]] - case Right(done) => done - } - }, - () => blockingOn - ) - .onInterrupt(cancelerRef.get()) + val state = new AtomicReference[URIO[R, Any]](Exit.unit) with ((ZIO[R, E, A] => Unit) => ZIO[R, E, A]) { + def apply(k: ZIO[R, E, A] => Unit): ZIO[R, E, A] = + register(using Unsafe)(k(_)) match { + case Left(canceler) => set(canceler); null.asInstanceOf[ZIO[R, E, A]] + case Right(done) => done + } + } + + ZIO.Async[R, E, A](trace, state, () => blockingOn).onInterrupt(state.get()) } /** diff --git a/core/shared/src/main/scala-3/zio/ZIOVersionSpecific.scala b/core/shared/src/main/scala-3/zio/ZIOVersionSpecific.scala index 172685e2bf2f..8c6e5aff779d 100644 --- a/core/shared/src/main/scala-3/zio/ZIOVersionSpecific.scala +++ b/core/shared/src/main/scala-3/zio/ZIOVersionSpecific.scala @@ -19,16 +19,35 @@ private[zio] transparent trait ZIOVersionSpecific[-R, +E, +A] { self: ZIO[R, E, def provideSome[R0] = new ProvideSomePartiallyApplied[R0, R, E, A](self) + /** + * Equivalent to [[provideSome]], but does not require providing the remainder + * type + * + * {{{ + * val clockLayer: ZLayer[Any, Nothing, Clock] = ??? + * + * val zio: ZIO[Clock with Random, Nothing, Unit] = ??? + * + * val zio2 = zio.provideSome(clockLayer) // Inferred type is ZIO[Random, Nothing, Unit] + * }}} + * + * Note for Intellij users: By default, Intellij will not show correct type on + * hover. To fix this enable `Use types reported by Scala compiler + * (experimental)` in `Settings | Languages & Frameworks | Scala | Editor` + */ + inline transparent def provideSomeAuto[E1 >: E](inline layer: ZLayer[_, E1, _]*): ZIO[_, E1, A] = + ${ LayerMacros.provideDynamicImpl[R, E1, A]('self, 'layer) } + /** * Automatically assembles a layer for the ZIO effect, which translates it to * another level. */ inline def provide[E1 >: E](inline layer: ZLayer[_, E1, _]*): ZIO[Any, E1, A] = - ${ LayerMacros.provideImpl[Any, R, E1, A]('self, 'layer) } + ${ LayerMacros.provideStaticImpl[Any, R, E1, A]('self, 'layer) } } final class ProvideSomePartiallyApplied[R0, -R, +E, +A](val self: ZIO[R, E, A]) extends AnyVal { inline def apply[E1 >: E](inline layer: ZLayer[_, E1, _]*): ZIO[R0, E1, A] = - ${ LayerMacros.provideImpl[R0, R, E1, A]('self, 'layer) } + ${ LayerMacros.provideSomeStaticImpl[R0, R, E1, A]('self, 'layer) } } diff --git a/core/shared/src/main/scala-3/zio/ZLayerCompanionVersionSpecific.scala b/core/shared/src/main/scala-3/zio/ZLayerCompanionVersionSpecific.scala index 8c9044e23a8d..cfdc819aa6f5 100644 --- a/core/shared/src/main/scala-3/zio/ZLayerCompanionVersionSpecific.scala +++ b/core/shared/src/main/scala-3/zio/ZLayerCompanionVersionSpecific.scala @@ -5,12 +5,12 @@ import scala.deriving._ final class WirePartiallyApplied[R](val dummy: Boolean = true) extends AnyVal { inline def apply[E](inline layer: ZLayer[_, E, _]*): ZLayer[Any, E, R] = - ${ LayerMacros.constructLayer[Any, R, E]('layer) } + ${ LayerMacros.constructStaticProvideLayer[Any, R, E]('layer) } } final class WireSomePartiallyApplied[R0, R](val dummy: Boolean = true) extends AnyVal { inline def apply[E](inline layer: ZLayer[_, E, _]*): ZLayer[R0, E, R] = - ${ LayerMacros.constructLayer[R0, R, E]('layer) } + ${ LayerMacros.constructStaticProvideSomeLayer[R0, R, E]('layer) } } private[zio] transparent trait ZLayerCompanionVersionSpecific { diff --git a/core/shared/src/main/scala-3/zio/internal/macros/LayerMacroUtils.scala b/core/shared/src/main/scala-3/zio/internal/macros/LayerMacroUtils.scala index dbcc7237080b..220f8a42ae17 100644 --- a/core/shared/src/main/scala-3/zio/internal/macros/LayerMacroUtils.scala +++ b/core/shared/src/main/scala-3/zio/internal/macros/LayerMacroUtils.scala @@ -16,12 +16,40 @@ private[zio] object LayerMacroUtils { )(using Trace): ZLayer[R1, E, O2] = lhs >>> rhs - def constructLayer[R0: Type, R: Type, E: Type](using Quotes)( + def constructStaticLayer[R0: Type, R: Type, E: Type](using Quotes)( layers: Seq[LayerExpr[E]], provideMethod: ProvideMethod ): Expr[ZLayer[R0, E, R]] = { import quotes.reflect._ + constructTypelessLayer[R0, R, E](layers, provideMethod, false).asExprOf[ZLayer[R0, E, R]] + } + + def constructStaticSomeLayer[R0: Type, R: Type, E: Type](using Quotes)( + layers: Seq[LayerExpr[E]], + provideMethod: ProvideMethod + ): Expr[ZLayer[R0, E, _]] = { + import quotes.reflect._ + + constructTypelessLayer[R0, R, E](layers, provideMethod, false).asExprOf[ZLayer[R0, E, _]] + } + + def constructDynamicLayer[R: Type, E: Type](using Quotes)( + layers: Seq[LayerExpr[E]], + provideMethod: ProvideMethod + ): Expr[ZLayer[_, E, R]] = { + import quotes.reflect._ + + constructTypelessLayer[Nothing, R, E](layers, provideMethod, true).asExprOf[ZLayer[_, E, R]] + } + + private def constructTypelessLayer[R0: Type, R: Type, E: Type](using Quotes)( + layers: Seq[LayerExpr[E]], + provideMethod: ProvideMethod, + inferRemainder: Boolean + ): Expr[ZLayer[_, _, _]] = { + import quotes.reflect._ + def renderExpr[A](expr: Expr[A]): String = scala.util.Try(expr.asTerm.pos.sourceCode).toOption.flatten.getOrElse(expr.show) @@ -88,9 +116,15 @@ private[zio] object LayerMacroUtils { .asExprOf[ZLayer[_, E, _]] } + val remainder = if (inferRemainder) { + RemainderMethod.Inferred + } else { + RemainderMethod.Provided(getRequirements[R0]) + } + val builder = LayerBuilder[TypeRepr, LayerExpr[E]]( target0 = getRequirements[R], - remainder = getRequirements[R0], + remainder = remainder, providedLayers0 = layers.toList, layerToDebug = layerToDebug, typeEquals = _ <:< _, @@ -106,7 +140,7 @@ private[zio] object LayerMacroUtils { reportError = report.errorAndAbort ) - builder.build.asTerm.asExprOf[ZLayer[R0, E, R]] + builder.build.asTerm.asExprOf[ZLayer[_, _, _]] } } } diff --git a/core/shared/src/main/scala-3/zio/internal/macros/LayerMacros.scala b/core/shared/src/main/scala-3/zio/internal/macros/LayerMacros.scala index 36b7333bb66a..e02e9621d3dd 100644 --- a/core/shared/src/main/scala-3/zio/internal/macros/LayerMacros.scala +++ b/core/shared/src/main/scala-3/zio/internal/macros/LayerMacros.scala @@ -11,26 +11,78 @@ import java.util.Base64 import LayerMacroUtils._ object LayerMacros { - def constructLayer[R0: Type, R: Type, E: Type]( + def constructStaticProvideLayer[R0: Type, R: Type, E: Type]( layers: Expr[Seq[ZLayer[_, E, _]]] )(using Quotes): Expr[ZLayer[R0, E, R]] = layers match { case Varargs(layers) => - LayerMacroUtils.constructLayer[R0, R, E](layers, ProvideMethod.Provide) + LayerMacroUtils.constructStaticLayer[R0, R, E](layers, ProvideMethod.Provide) } - def provideImpl[R0: Type, R: Type, E: Type, A: Type](zio: Expr[ZIO[R, E, A]], layer: Expr[Seq[ZLayer[_, E, _]]])(using + def constructStaticProvideSomeLayer[R0: Type, R: Type, E: Type]( + layers: Expr[Seq[ZLayer[_, E, _]]] + )(using Quotes): Expr[ZLayer[R0, E, R]] = + layers match { + case Varargs(layers) => + LayerMacroUtils.constructStaticLayer[R0, R, E](layers, ProvideMethod.ProvideSome) + } + + def constructStaticProvideSomeSharedLayer[R0: Type, R: Type, E: Type]( + layers: Expr[Seq[ZLayer[_, E, _]]] + )(using Quotes): Expr[ZLayer[R0, E, _]] = + layers match { + case Varargs(layers) => + LayerMacroUtils.constructStaticSomeLayer[R0, R, E](layers, ProvideMethod.ProvideSomeShared) + } + + def constructDynamicLayer[R: Type, E: Type]( + layers: Expr[Seq[ZLayer[_, E, _]]] + )(using Quotes): Expr[ZLayer[_, E, R]] = + layers match { + case Varargs(layers) => + LayerMacroUtils.constructDynamicLayer[R, E](layers, ProvideMethod.Provide) + } + + def provideStaticImpl[R0: Type, R: Type, E: Type, A: Type]( + zio: Expr[ZIO[R, E, A]], + layer: Expr[Seq[ZLayer[_, E, _]]] + )(using Quotes ): Expr[ZIO[R0, E, A]] = { - val layerExpr = constructLayer[R0, R, E](layer) + val layerExpr = constructStaticProvideLayer[R0, R, E](layer) '{ $zio.provideLayer($layerExpr) } } + def provideSomeStaticImpl[R0: Type, R: Type, E: Type, A: Type]( + zio: Expr[ZIO[R, E, A]], + layer: Expr[Seq[ZLayer[_, E, _]]] + )(using + Quotes + ): Expr[ZIO[R0, E, A]] = { + val layerExpr = constructStaticProvideSomeLayer[R0, R, E](layer) + '{ $zio.provideLayer($layerExpr) } + } + + def provideDynamicImpl[R: Type, E: Type, A: Type]( + zio: Expr[ZIO[R, E, A]], + layer: Expr[Seq[ZLayer[_, E, _]]] + )(using + Quotes + ): Expr[ZIO[_, E, A]] = { + val layerExpr = constructDynamicLayer[R, E](layer) + + // https://github.com/scala/scala3/issues/22886 + layerExpr match { + case '{ $layer: ZLayer[in, e, out] } => + '{ $zio.provideLayer($layer) } + } + } + def runWithImpl[R: Type, E: Type]( layer: Expr[ZLayer[R, E, Unit]], deps: Expr[Seq[ZLayer[_, E, _]]] )(using Quotes) = { - val layerExpr = constructLayer[Any, R, E](deps) + val layerExpr = constructStaticProvideLayer[Any, R, E](deps) '{ ZIO.scoped($layer.build).provideLayer($layerExpr).unit } } diff --git a/core/shared/src/main/scala/zio/Cached.scala b/core/shared/src/main/scala/zio/Cached.scala index 8028d3ca7f0e..010eb663bb4f 100644 --- a/core/shared/src/main/scala/zio/Cached.scala +++ b/core/shared/src/main/scala/zio/Cached.scala @@ -19,7 +19,7 @@ package zio * A [[Cached]] is a possibly resourceful value that is loaded into memory, and * which can be refreshed either manually or automatically. */ -trait Cached[+Error, +Resource] { +sealed trait Cached[+Error, +Resource] { /** * Retrieves the current value stored in the cache. diff --git a/core/shared/src/main/scala/zio/Cause.scala b/core/shared/src/main/scala/zio/Cause.scala index 9c62a7c7fc83..ee64370179e2 100644 --- a/core/shared/src/main/scala/zio/Cause.scala +++ b/core/shared/src/main/scala/zio/Cause.scala @@ -19,6 +19,7 @@ package zio import zio.Cause.Both import zio.stacktracer.TracingImplicits.disableAutoTrace +import java.io.PrintWriter import scala.annotation.tailrec import scala.runtime.AbstractFunction2 @@ -497,35 +498,38 @@ sealed abstract class Cause[+E] extends Product with Serializable { self => final def nonEmpty: Boolean = !isEmpty - /** - * Returns a `String` with the cause pretty-printed. - */ + /** Returns a `String` with the cause pretty-printed. */ final def prettyPrint: String = { - import Cause.Unified + val builder = new StringBuilder + prettyPrintWith(builder.append(_).append('\n'))(Unsafe.unsafe) + builder.result() + } - val builder = ChunkBuilder.make[String]() - var size = 0 + /** Pretty-prints this cause with the provided `append` function. */ + private[zio] final def prettyPrintWith(append: String => Unit)(implicit unsafe: Unsafe): Unit = { + import Cause.Unified - def append(string: String): Unit = + var size = 0 + def appendLine(line: String): Unit = if (size <= 1024) { - builder += string + append(line) size += 1 } def appendCause(cause: Cause[E]): Unit = cause.unified.zipWithIndex.foreach { case (unified, 0) => - appendUnified(0, "Exception in thread \"" + unified.fiberId.threadName + "\" ", unified) + appendUnified(0, "", unified) case (unified, n) => - appendUnified(n, s"Suppressed: ", unified) + appendUnified(n, "Suppressed: ", unified) } def appendUnified(indent: Int, prefix: String, unified: Unified): Unit = { val baseIndent = "\t" * indent val traceIndent = baseIndent + "\t" - append(s"${baseIndent}${prefix}${unified.className}: ${unified.message}") - unified.trace.foreach(trace => append(s"${traceIndent}at ${trace}")) + appendLine(s"$baseIndent$prefix${unified.className}: ${unified.message}") + unified.trace.foreach(trace => appendLine(s"${traceIndent}at $trace")) } val (die, fail, interrupt) = @@ -541,7 +545,6 @@ sealed abstract class Cause[+E] extends Product with Serializable { self => die.foreach(appendCause) fail.foreach(appendCause) interrupt.foreach(appendCause) - builder.result.mkString("\n") } def size: Int = self.foldContext(())(Cause.Folder.Size) diff --git a/core/shared/src/main/scala/zio/Chunk.scala b/core/shared/src/main/scala/zio/Chunk.scala index 83bf9e9a7294..d1b217e4fb65 100644 --- a/core/shared/src/main/scala/zio/Chunk.scala +++ b/core/shared/src/main/scala/zio/Chunk.scala @@ -26,10 +26,10 @@ import scala.math.log import scala.reflect.{ClassTag, classTag} /** - * A `Chunk[A]` represents a chunk of values of type `A`. Chunks are designed - * are usually backed by arrays, but expose a purely functional, safe interface - * to the underlying elements, and they become lazy on operations that would be - * costly with arrays, such as repeated concatenation. + * A `Chunk[A]` represents a chunk of values of type `A`. Chunks are usually + * backed by arrays, but expose a purely functional, safe interface to the + * underlying elements, and they become lazy on operations that would be costly + * with arrays, such as repeated concatenation. * * The implementation of balanced concatenation is based on the one for * Conc-Trees in "Conc-Trees for Functional and Parallel Programming" by @@ -460,25 +460,25 @@ sealed abstract class Chunk[+A] extends ChunkLike[A] with Serializable { self => * Returns the first element that satisfies the effectful predicate. */ final def findZIO[R, E](f: A => ZIO[R, E, Boolean])(implicit trace: Trace): ZIO[R, E, Option[A]] = - ZIO.suspendSucceed { - val iterator = self.chunkIterator - var index = 0 + if (self.isEmpty) ZIO.none + else + ZIO.suspendSucceed { + val iterator = self.chunkIterator + var index = 0 - def loop(iterator: Chunk.ChunkIterator[A]): ZIO[R, E, Option[A]] = - if (iterator.hasNextAt(index)) { - val a = iterator.nextAt(index) - index += 1 + def loop(iterator: Chunk.ChunkIterator[A]): ZIO[R, E, Option[A]] = + if (iterator.hasNextAt(index)) { + val a = iterator.nextAt(index) + index += 1 - f(a).flatMap { - if (_) ZIO.succeed(Some(a)) - else loop(iterator) - } - } else { - ZIO.succeed(None) - } + f(a).flatMap { + if (_) Exit.succeed(Some(a)) + else loop(iterator) + } + } else Exit.none - loop(iterator) - } + loop(iterator) + } /** * Get the element at the specified index. diff --git a/core/shared/src/main/scala/zio/Dequeue.scala b/core/shared/src/main/scala/zio/Dequeue.scala index 807544059d1d..c4b1bf8451ac 100644 --- a/core/shared/src/main/scala/zio/Dequeue.scala +++ b/core/shared/src/main/scala/zio/Dequeue.scala @@ -19,7 +19,7 @@ package zio /** * A queue that can only be dequeued. */ -trait Dequeue[+A] extends Serializable { +sealed trait Dequeue[+A] extends Serializable { /** * Waits until the queue is shutdown. The `IO` returned by this method will @@ -85,27 +85,20 @@ trait Dequeue[+A] extends Serializable { * maximum. If there are fewer than the minimum number of elements available, * suspends until at least the minimum number of elements have been collected. */ - final def takeBetween(min: Int, max: Int)(implicit trace: Trace): UIO[Chunk[A]] = - ZIO.suspendSucceed { - - def takeRemainder(min: Int, max: Int, acc: Chunk[A]): UIO[Chunk[A]] = - if (max < min) ZIO.succeed(acc) - else - takeUpTo(max).flatMap { bs => - val remaining = min - bs.length - if (remaining == 1) - take.map(b => acc ++ bs :+ b) - else if (remaining > 1) { - take.flatMap { b => - takeRemainder(remaining - 1, max - bs.length - 1, acc ++ bs :+ b) - - } - } else - ZIO.succeed(acc ++ bs) - } - - takeRemainder(min, max, Chunk.empty) - } + final def takeBetween(min: Int, max: Int)(implicit trace: Trace): UIO[Chunk[A]] = { + def takeRemainder(min: Int, max: Int, acc: Chunk[A]): UIO[Chunk[A]] = + if (max < min) ZIO.succeed(acc) + else + takeUpTo(max).flatMap { bs => + val remaining = min - bs.length + + if (remaining <= 0) Exit.succeed(acc ++ bs) + else if (remaining == 1) take.map(b => acc ++ bs :+ b) + else take.flatMap(b => takeRemainder(remaining - 1, max - bs.length - 1, acc ++ bs :+ b)) + } + + takeRemainder(min, max, Chunk.empty) + } /** * Takes the specified number of elements from the queue. If there are fewer @@ -121,3 +114,6 @@ trait Dequeue[+A] extends Serializable { def poll(implicit trace: Trace): UIO[Option[A]] = takeUpTo(1).map(_.headOption) } +private[zio] object Dequeue { + private[zio] abstract class Internal[+A] extends Dequeue[A] +} diff --git a/core/shared/src/main/scala/zio/Enqueue.scala b/core/shared/src/main/scala/zio/Enqueue.scala index a3e0dd9ae3ad..17733d8a9947 100644 --- a/core/shared/src/main/scala/zio/Enqueue.scala +++ b/core/shared/src/main/scala/zio/Enqueue.scala @@ -19,7 +19,7 @@ package zio /** * A queue that can only be enqueued. */ -trait Enqueue[-A] extends Serializable { +sealed trait Enqueue[-A] extends Serializable { /** * Waits until the queue is shutdown. The `IO` returned by this method will @@ -87,3 +87,6 @@ trait Enqueue[-A] extends Serializable { def isFull(implicit trace: Trace): UIO[Boolean] = size.map(_ >= capacity) } +private[zio] object Enqueue { + private[zio] trait Internal[-A] extends Enqueue[A] +} diff --git a/core/shared/src/main/scala/zio/FiberFailure.scala b/core/shared/src/main/scala/zio/FiberFailure.scala index 972f87bba7d2..d2a01604e2fc 100644 --- a/core/shared/src/main/scala/zio/FiberFailure.scala +++ b/core/shared/src/main/scala/zio/FiberFailure.scala @@ -18,6 +18,8 @@ package zio import zio.stacktracer.TracingImplicits.disableAutoTrace +import java.io.{PrintStream, PrintWriter} + /** * Represents a failure in a fiber. This could be caused by some non- * recoverable error, such as a defect or system error, by some typed error, or @@ -27,21 +29,27 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace * better integrate with Scala exception handling. */ final case class FiberFailure(cause: Cause[Any]) extends Throwable(null, null, true, false) { - override def getMessage: String = cause.unified.headOption.fold("")(_.message) - - override def getStackTrace(): Array[StackTraceElement] = - cause.unified.headOption.fold[Chunk[StackTraceElement]](Chunk.empty)(_.trace).toArray + override def getMessage: String = + cause.unified.headOption.fold("")(_.message) - override def getCause(): Throwable = - cause.find { case Cause.Die(throwable, _) => throwable } - .orElse(cause.find { case Cause.Fail(value: Throwable, _) => value }) - .orNull + override def getStackTrace: Array[StackTraceElement] = + cause.trace.toJava.toArray - def fillSuppressed()(implicit unsafe: Unsafe): Unit = - if (getSuppressed().length == 0) { - cause.unified.iterator.drop(1).foreach(unified => addSuppressed(unified.toThrowable)) - } + override def getCause: Throwable = + cause.dieOption.orElse(cause.failureOption.collect { case t: Throwable => t }).orNull - override def toString = + // Note: unlike standard Java exceptions, this includes the stack trace. + override def toString: String = cause.prettyPrint + + override def printStackTrace(s: PrintStream): Unit = + cause.prettyPrintWith(s.println)(Unsafe.unsafe) + + override def printStackTrace(s: PrintWriter): Unit = + cause.prettyPrintWith(s.println)(Unsafe.unsafe) + + def fillSuppressed()(implicit unsafe: Unsafe): Unit = + if (getSuppressed.length == 0) + for (unified <- cause.unified.iterator.drop(1)) + addSuppressed(unified.toThrowable) } diff --git a/core/shared/src/main/scala/zio/FiberRef.scala b/core/shared/src/main/scala/zio/FiberRef.scala index 1e2117597753..6d733bf1861d 100644 --- a/core/shared/src/main/scala/zio/FiberRef.scala +++ b/core/shared/src/main/scala/zio/FiberRef.scala @@ -53,7 +53,7 @@ import zio.metrics.MetricLabel * Here `value` will be 2 as the value in the joined fiber is lower and we * specified `max` as our combining function. */ -trait FiberRef[A] extends Serializable { self => +sealed trait FiberRef[A] extends Serializable { self => /** * The type of the value of the `FiberRef`. diff --git a/core/shared/src/main/scala/zio/FunctionConstructor.scala b/core/shared/src/main/scala/zio/FunctionConstructor.scala new file mode 100644 index 000000000000..7d5b2f895b6a --- /dev/null +++ b/core/shared/src/main/scala/zio/FunctionConstructor.scala @@ -0,0 +1,103 @@ +package zio + +/** + * A `FunctionConstructor[Input]` knows how to construct a `ZIO` value from a + * function of type `In`. This allows the type of the `ZIO` value constructed to + * depend on `In`. + */ +sealed abstract class FunctionConstructor[In] { + + /** + * The type of the `ZIO` value. + */ + type Out + + /** + * Constructs a `ZIO` value from the specified input. + */ + def apply(in: In)(implicit trace: Trace): Out +} + +object FunctionConstructor { + + type WithOut[In, Out0] = FunctionConstructor[In] { type Out = Out0 } + + implicit def function1Constructor[A: Tag, Z]: FunctionConstructor.WithOut[A => Z, ZIO[A, Nothing, Z]] = + new FunctionConstructor[A => Z] { + type Out = ZIO[A, Nothing, Z] + def apply(f: A => Z)(implicit trace: Trace): ZIO[A, Nothing, Z] = + ZIO.serviceWith[A](f) + } + + implicit def function2Constructor[A: Tag, B: Tag, Z] + : FunctionConstructor.WithOut[(A, B) => Z, ZIO[A with B, Nothing, Z]] = + new FunctionConstructor[(A, B) => Z] { + type Out = ZIO[A with B, Nothing, Z] + def apply(f: (A, B) => Z)(implicit trace: Trace): ZIO[A with B, Nothing, Z] = + ZIO.environmentWith[A with B](env => f(env.get[A], env.get[B])) + } + + implicit def function3Constructor[A: Tag, B: Tag, C: Tag, Z] + : FunctionConstructor.WithOut[(A, B, C) => Z, ZIO[A with B with C, Nothing, Z]] = + new FunctionConstructor[(A, B, C) => Z] { + type Out = ZIO[A with B with C, Nothing, Z] + def apply(f: (A, B, C) => Z)(implicit trace: Trace): ZIO[A with B with C, Nothing, Z] = + ZIO.environmentWith[A with B with C](env => f(env.get[A], env.get[B], env.get[C])) + } + + implicit def function4Constructor[A: Tag, B: Tag, C: Tag, D: Tag, Z] + : FunctionConstructor.WithOut[(A, B, C, D) => Z, ZIO[A with B with C with D, Nothing, Z]] = + new FunctionConstructor[(A, B, C, D) => Z] { + type Out = ZIO[A with B with C with D, Nothing, Z] + def apply(f: (A, B, C, D) => Z)(implicit trace: Trace): ZIO[A with B with C with D, Nothing, Z] = + ZIO.environmentWith[A with B with C with D](env => f(env.get[A], env.get[B], env.get[C], env.get[D])) + } + + implicit def function5Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, Z] + : FunctionConstructor.WithOut[(A, B, C, D, F) => Z, ZIO[ + A with B with C with D with F, + Nothing, + Z + ]] = + new FunctionConstructor[(A, B, C, D, F) => Z] { + type Out = ZIO[A with B with C with D with F, Nothing, Z] + def apply( + f: (A, B, C, D, F) => Z + )(implicit trace: Trace): ZIO[A with B with C with D with F, Nothing, Z] = + ZIO.environmentWith[A with B with C with D with F](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F]) + ) + } + + implicit def function6Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, G: Tag, Z] + : FunctionConstructor.WithOut[(A, B, C, D, F, G) => Z, ZIO[ + A with B with C with D with F with G, + Nothing, + Z + ]] = + new FunctionConstructor[(A, B, C, D, F, G) => Z] { + type Out = ZIO[A with B with C with D with F with G, Nothing, Z] + def apply( + f: (A, B, C, D, F, G) => Z + )(implicit trace: Trace): ZIO[A with B with C with D with F with G, Nothing, Z] = + ZIO.environmentWith[A with B with C with D with F with G](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F], env.get[G]) + ) + } + + implicit def function7Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, G: Tag, H: Tag, Z] + : FunctionConstructor.WithOut[(A, B, C, D, F, G, H) => Z, ZIO[ + A with B with C with D with F with G with H, + Nothing, + Z + ]] = + new FunctionConstructor[(A, B, C, D, F, G, H) => Z] { + type Out = ZIO[A with B with C with D with F with G with H, Nothing, Z] + def apply( + f: (A, B, C, D, F, G, H) => Z + )(implicit trace: Trace): ZIO[A with B with C with D with F with G with H, Nothing, Z] = + ZIO.environmentWith[A with B with C with D with F with G with H](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F], env.get[G], env.get[H]) + ) + } +} diff --git a/core/shared/src/main/scala/zio/Hub.scala b/core/shared/src/main/scala/zio/Hub.scala index fded13b2e99f..76de187de686 100644 --- a/core/shared/src/main/scala/zio/Hub.scala +++ b/core/shared/src/main/scala/zio/Hub.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean * A `Hub` is an asynchronous message hub. Publishers can offer messages to the * hub and subscribers can subscribe to take messages from the hub. */ -abstract class Hub[A] extends Enqueue[A] { +sealed abstract class Hub[A] extends Enqueue.Internal[A] { /** * Publishes a message to the hub, returning whether the message was published @@ -213,7 +213,7 @@ object Hub { shutdownFlag: AtomicBoolean, strategy: Strategy[A] ): Dequeue[A] = - new Dequeue[A] { self => + new Dequeue.Internal[A] { self => def awaitShutdown(implicit trace: Trace): UIO[Unit] = shutdownHook.await val capacity: Int = diff --git a/core/shared/src/main/scala/zio/Promise.scala b/core/shared/src/main/scala/zio/Promise.scala index 1436e13c7159..9e54ef47bf85 100644 --- a/core/shared/src/main/scala/zio/Promise.scala +++ b/core/shared/src/main/scala/zio/Promise.scala @@ -16,7 +16,6 @@ package zio -import zio.Promise.internal._ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.util.concurrent.atomic.AtomicReference @@ -38,10 +37,8 @@ import java.util.concurrent.atomic.AtomicReference * } yield value * }}} */ -final class Promise[E, A] private ( - private val state: AtomicReference[Promise.internal.State[E, A]], - blockingOn: FiberId -) extends Serializable { +final class Promise[E, A] private (blockingOn: FiberId) extends Serializable { + import Promise.internal._ /** * Retrieves the value of the promise, suspending the fiber running the action @@ -50,32 +47,24 @@ final class Promise[E, A] private ( def await(implicit trace: Trace): IO[E, A] = ZIO.suspendSucceed { state.get match { - case Done(value) => - value - case _ => + case Done(value) => value + case pending => ZIO.asyncInterrupt[Any, E, A]( k => { - var result = null.asInstanceOf[Either[UIO[Any], IO[E, A]]] - var retry = true - - while (retry) { - val oldState = state.get - - val newState = oldState match { - case Pending(joiners) => - result = Left(interruptJoiner(k)) - - Pending(k :: joiners) - case s @ Done(value) => - result = Right(value) - - s + @annotation.tailrec + def loop(current: State[E, A]): Unit = + current match { + case pending: Pending[?, ?] => + if (state.compareAndSet(pending, pending.add(k))) () + else loop(state.get) + case Done(value) => k(value) } + loop(pending) - retry = !state.compareAndSet(oldState, newState) - } - - result + Left(ZIO.succeed(state.updateAndGet { + case pending: Pending[?, ?] => pending.remove(k) + case completed => completed + })) }, blockingOn ) @@ -87,14 +76,14 @@ final class Promise[E, A] private ( * fibers waiting on the value of the promise. */ def die(e: Throwable)(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.die(e)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.die(e)(trace, Unsafe)) /** * Exits the promise with the specified exit, which will be propagated to all * fibers waiting on the value of the promise. */ def done(e: Exit[E, A])(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.completeWith(e)(Unsafe.unsafe)) + ZIO.succeed(unsafe.completeWith(e)(Unsafe)) /** * Completes the promise with the result of the specified effect. If the @@ -118,21 +107,21 @@ final class Promise[E, A] private ( * promise with the result of an effect see [[Promise.complete]]. */ def completeWith(io: IO[E, A])(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.completeWith(io)(Unsafe.unsafe)) + ZIO.succeed(unsafe.completeWith(io)(Unsafe)) /** * Fails the promise with the specified error, which will be propagated to all * fibers waiting on the value of the promise. */ def fail(e: E)(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.fail(e)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.fail(e)(trace, Unsafe)) /** * Fails the promise with the specified cause, which will be propagated to all * fibers waiting on the value of the promise. */ def failCause(e: Cause[E])(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.failCause(e)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.failCause(e)(trace, Unsafe)) /** * Completes the promise with interruption. This will interrupt all fibers @@ -146,21 +135,21 @@ final class Promise[E, A] private ( * waiting on the value of the promise as by the specified fiber. */ def interruptAs(fiberId: FiberId)(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.interruptAs(fiberId)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.interruptAs(fiberId)(trace, Unsafe)) /** * Checks for completion of this Promise. Produces true if this promise has * already been completed with a value or an error and false otherwise. */ def isDone(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.isDone(Unsafe.unsafe)) + ZIO.succeed(unsafe.isDone(Unsafe)) /** * Checks for completion of this Promise. Returns the result effect if this * promise has already been completed or a `None` otherwise. */ def poll(implicit trace: Trace): UIO[Option[IO[E, A]]] = - ZIO.succeed(unsafe.poll(Unsafe.unsafe)) + ZIO.succeed(unsafe.poll(Unsafe)) /** * Fails the promise with the specified cause, which will be propagated to all @@ -168,13 +157,13 @@ final class Promise[E, A] private ( * to the cause. */ def refailCause(e: Cause[E])(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.refailCause(e)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.refailCause(e)(trace, Unsafe)) /** * Completes the promise with the specified value. */ def succeed(a: A)(implicit trace: Trace): UIO[Boolean] = - ZIO.succeed(unsafe.succeed(a)(trace, Unsafe.unsafe)) + ZIO.succeed(unsafe.succeed(a)(trace, Unsafe)) /** * Internally, you can use this method instead of calling @@ -185,24 +174,6 @@ final class Promise[E, A] private ( private[zio] def succeedUnit(implicit ev0: A =:= Unit, trace: Trace): UIO[Boolean] = ZIO.succeed(unsafe.succeedUnit(ev0, trace, Unsafe)) - private def interruptJoiner(joiner: IO[E, A] => Any)(implicit trace: Trace): UIO[Any] = ZIO.succeed { - var retry = true - - while (retry) { - val oldState = state.get - - val newState = oldState match { - case Pending(joiners) => - Pending(joiners.filter(j => !j.eq(joiner))) - - case _ => - oldState - } - - retry = !state.compareAndSet(oldState, newState) - } - } - private[zio] trait UnsafeAPI extends Serializable { def completeWith(io: IO[E, A])(implicit unsafe: Unsafe): Boolean def die(e: Throwable)(implicit trace: Trace, unsafe: Unsafe): Boolean @@ -217,112 +188,159 @@ final class Promise[E, A] private ( def succeedUnit(implicit ev0: A =:= Unit, trace: Trace, unsafe: Unsafe): Boolean } - private[zio] val unsafe: UnsafeAPI = - new UnsafeAPI { - def completeWith(io: IO[E, A])(implicit unsafe: Unsafe): Boolean = { - var action: () => Boolean = null.asInstanceOf[() => Boolean] - var retry = true + @deprecated("Kept for binary compatibility only. Do not use", "2.1.16") + private[zio] def state: AtomicReference[Promise.internal.State[E, A]] = + unsafe.asInstanceOf[AtomicReference[Promise.internal.State[E, A]]] + private[zio] val unsafe: UnsafeAPI = new AtomicReference(Promise.internal.State.empty[E, A]) with UnsafeAPI { state => + def completeWith(io: IO[E, A])(implicit unsafe: Unsafe): Boolean = { + @annotation.tailrec + def loop(): Boolean = + state.get match { + case pending: Pending[?, ?] => + if (state.compareAndSet(pending, Done(io))) { + pending.complete(io) + true + } else { + loop() + } + case _ => false + } + loop() + } - while (retry) { - val oldState = state.get + def die(e: Throwable)(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(ZIO.die(e)) - val newState = oldState match { - case Pending(joiners) => - action = () => { joiners.foreach(_(io)); true } + def done(io: IO[E, A])(implicit unsafe: Unsafe): Unit = completeWith(io) - Done(io) + def fail(e: E)(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(ZIO.fail(e)) - case _ => - action = Promise.ConstFalse + def failCause(e: Cause[E])(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(ZIO.failCause(e)) - oldState - } + def interruptAs(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(ZIO.interruptAs(fiberId)) - retry = !state.compareAndSet(oldState, newState) - } + def isDone(implicit unsafe: Unsafe): Boolean = + state.get().isInstanceOf[Done[?, ?]] - action() + def poll(implicit unsafe: Unsafe): Option[IO[E, A]] = + state.get() match { + case Done(value) => Some(value) + case _ => None } - def die(e: Throwable)(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(ZIO.die(e)) + def refailCause(e: Cause[E])(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(Exit.failCause(e)) - def done(io: IO[E, A])(implicit unsafe: Unsafe): Unit = { - var retry: Boolean = true - var joiners: List[IO[E, A] => Any] = null + def succeed(a: A)(implicit trace: Trace, unsafe: Unsafe): Boolean = + completeWith(Exit.succeed(a)) - while (retry) { - val oldState = state.get - - val newState = oldState match { - case Pending(js) => - joiners = js - Done(io) - case _ => oldState - } + override def succeedUnit(implicit ev0: A =:= Unit, trace: Trace, unsafe: Unsafe): Boolean = + completeWith(Exit.unit.asInstanceOf[IO[E, A]]) + } - retry = !state.compareAndSet(oldState, newState) +} +object Promise { + private[zio] object internal { + sealed abstract class State[E, A] extends Serializable + final case class Done[E, A](value: IO[E, A]) extends State[E, A] + sealed abstract class Pending[E, A] extends State[E, A] { self => + def complete(io: IO[E, A]): Unit + def add(waiter: IO[E, A] => Any): Pending[E, A] + def remove(waiter: IO[E, A] => Any): Pending[E, A] + def size: Int + } + private case object Empty extends Pending[Nothing, Nothing] { self => + override def complete(io: IO[Nothing, Nothing]): Unit = () + def size = 0 + def add(waiter: IO[Nothing, Nothing] => Any): Pending[Nothing, Nothing] = + new Link[Nothing, Nothing](waiter, self) { + override def size = 1 } - - if (joiners ne null) joiners.foreach(_(io)) + def remove(waiter: IO[Nothing, Nothing] => Any): Pending[Nothing, Nothing] = self + } + private sealed abstract class Link[E, A](final val waiter: IO[E, A] => Any, final val ws: Pending[E, A]) + extends Pending[E, A] { + self => + final def add(waiter: IO[E, A] => Any): Pending[E, A] = new Link(waiter, self) { + override val size = self.size + 1 } - - def fail(e: E)(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(ZIO.fail(e)) - - def failCause(e: Cause[E])(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(ZIO.failCause(e)) - - def interruptAs(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(ZIO.interruptAs(fiberId)) - - def isDone(implicit unsafe: Unsafe): Boolean = - state.get().isInstanceOf[Done[?, ?]] - - def poll(implicit unsafe: Unsafe): Option[IO[E, A]] = - state.get() match { - case _: Pending[?, ?] => None - case Done(io) => Some(io) + final def complete(io: IO[E, A]): Unit = + if (size == 1) waiter(io) + else { + var current: Pending[E, A] = self + while (current ne Empty) { + current match { + case link: Link[?, ?] => + link.waiter(io) + current = link.ws + case _ => // Empty + current = Empty.asInstanceOf[Pending[E, A]] + } + } } - def refailCause(e: Cause[E])(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(Exit.failCause(e)) - - def succeed(a: A)(implicit trace: Trace, unsafe: Unsafe): Boolean = - completeWith(Exit.succeed(a)) - - override def succeedUnit(implicit ev0: A =:= Unit, trace: Trace, unsafe: Unsafe): Boolean = - completeWith(Exit.unit.asInstanceOf[IO[E, A]]) + final def remove(waiter: IO[E, A] => Any): Pending[E, A] = + if (size == 1) if (waiter eq self.waiter) ws else self + else { + val arr = Link.materialize(self, size) + var i = size - 1 + var acc: Pending[E, A] = Empty.asInstanceOf[Pending[E, A]] + + while (i >= 0) { + if (arr(i) ne waiter) { + acc = acc.add(arr(i)) + } + i -= 1 + } + acc + } } -} -object Promise { - private val ConstFalse: () => Boolean = () => false - - private[zio] object internal { - sealed abstract class State[E, A] extends Serializable with Product - final case class Pending[E, A](joiners: List[IO[E, A] => Any]) extends State[E, A] - final case class Done[E, A](value: IO[E, A]) extends State[E, A] + private object Link { + + /** + * Materializes the pending state into an array of waiters in reverse + * order. + */ + def materialize[E, A](pending: Pending[E, A], size: Int): Array[IO[E, A] => Any] = { + val array = new Array[IO[E, A] => Any](size) + var current = pending + var i = size - 1 + + while (i >= 0) { + current match { + case link: Link[?, ?] => + array(i) = link.waiter + current = link.ws + case _ => () // Empty + } + i -= 1 + } + array + } + } object State { - private val _pending: State[Nothing, Nothing] = Pending(Nil) - def pending[E, A]: State[E, A] = _pending.asInstanceOf[State[E, A]] + def empty[E, A]: State[E, A] = Empty.asInstanceOf[State[E, A]] } } /** * Makes a new promise to be completed by the fiber creating the promise. */ - def make[E, A](implicit trace: Trace): UIO[Promise[E, A]] = ZIO.fiberIdWith(makeAs(_)) + def make[E, A](implicit trace: Trace): UIO[Promise[E, A]] = + ZIO.fiberIdWith(id => Exit.succeed(unsafe.make(id)(Unsafe))) /** * Makes a new promise to be completed by the fiber with the specified id. */ def makeAs[E, A](fiberId: => FiberId)(implicit trace: Trace): UIO[Promise[E, A]] = - ZIO.succeed(unsafe.make(fiberId)(Unsafe.unsafe)) + ZIO.succeed(unsafe.make(fiberId)(Unsafe)) object unsafe { - def make[E, A](fiberId: FiberId)(implicit unsafe: Unsafe): Promise[E, A] = - new Promise[E, A](new AtomicReference[State[E, A]](internal.State.pending[E, A]), fiberId) + def make[E, A](fiberId: FiberId)(implicit unsafe: Unsafe): Promise[E, A] = new Promise[E, A](fiberId) } } diff --git a/core/shared/src/main/scala/zio/Queue.scala b/core/shared/src/main/scala/zio/Queue.scala index 7cd9973904be..0e7341eb2e32 100644 --- a/core/shared/src/main/scala/zio/Queue.scala +++ b/core/shared/src/main/scala/zio/Queue.scala @@ -26,7 +26,7 @@ import scala.annotation.tailrec * A `Queue` is a lightweight, asynchronous queue into which values can be * enqueued and of which elements can be dequeued. */ -abstract class Queue[A] extends Dequeue[A] with Enqueue[A] { +sealed abstract class Queue[A] extends Dequeue.Internal[A] with Enqueue.Internal[A] { /** * Checks whether the queue is currently empty. @@ -42,6 +42,7 @@ abstract class Queue[A] extends Dequeue[A] with Enqueue[A] { } object Queue extends QueuePlatformSpecific { + private[zio] abstract class Internal[A] extends Queue[A] /** * Makes a new bounded queue. When the capacity of the queue is reached, any @@ -150,14 +151,22 @@ object Queue extends QueuePlatformSpecific { shutdownHook: Promise[Nothing, Unit], shutdownFlag: AtomicBoolean, strategy: Strategy[A] - ): Queue[A] = new Queue[A] { + ): Queue[A] = new QueueImpl[A](queue, takers, shutdownHook, shutdownFlag, strategy) + + private final class QueueImpl[A]( + queue: MutableConcurrentQueue[A], + takers: ConcurrentDeque[Promise[Nothing, A]], + shutdownHook: Promise[Nothing, Unit], + shutdownFlag: AtomicBoolean, + strategy: Strategy[A] + ) extends Queue[A] { private def removeTaker(taker: Promise[Nothing, A])(implicit trace: Trace): UIO[Unit] = ZIO.succeed(takers.remove(taker)) - val capacity: Int = queue.capacity + override def capacity: Int = queue.capacity - def offer(a: A)(implicit trace: Trace): UIO[Boolean] = + override def offer(a: A)(implicit trace: Trace): UIO[Boolean] = ZIO.suspendSucceed { if (shutdownFlag.get) ZIO.interrupt else { @@ -186,7 +195,7 @@ object Queue extends QueuePlatformSpecific { } } - def offerAll[A1 <: A](as: Iterable[A1])(implicit trace: Trace): UIO[Chunk[A1]] = + override def offerAll[A1 <: A](as: Iterable[A1])(implicit trace: Trace): UIO[Chunk[A1]] = ZIO.suspendSucceed { if (shutdownFlag.get) ZIO.interrupt else { @@ -212,9 +221,9 @@ object Queue extends QueuePlatformSpecific { } } - def awaitShutdown(implicit trace: Trace): UIO[Unit] = shutdownHook.await + override def awaitShutdown(implicit trace: Trace): UIO[Unit] = shutdownHook.await - def size(implicit trace: Trace): UIO[Int] = + override def size(implicit trace: Trace): UIO[Int] = ZIO.suspendSucceed { if (shutdownFlag.get) ZIO.interrupt @@ -222,18 +231,23 @@ object Queue extends QueuePlatformSpecific { Exit.succeed(queue.size() - takers.size() + strategy.surplusSize) } - def shutdown(implicit trace: Trace): UIO[Unit] = + override def shutdown(implicit trace: Trace): UIO[Unit] = ZIO.fiberIdWith { fiberId => - shutdownFlag.set(true) - - ZIO.whenDiscard(shutdownHook.unsafe.completeWith(Exit.unit)(Unsafe))( - ZIO.foreachParDiscard(unsafePollAll(takers))(_.interruptAs(fiberId)) *> strategy.shutdown - ) + if (shutdownFlag.compareAndSet(false, true)) { + implicit val unsafe: Unsafe = Unsafe + shutdownHook.unsafe.succeedUnit + val it = unsafePollAll(takers).iterator + while (it.hasNext) { + it.next().unsafe.interruptAs(fiberId) + } + strategy.shutdown(fiberId) + } + Exit.unit }.uninterruptible - def isShutdown(implicit trace: Trace): UIO[Boolean] = ZIO.succeed(shutdownFlag.get) + override def isShutdown(implicit trace: Trace): UIO[Boolean] = ZIO.succeed(shutdownFlag.get) - def take(implicit trace: Trace): UIO[A] = + override def take(implicit trace: Trace): UIO[A] = ZIO.fiberIdWith { fiberId => if (shutdownFlag.get) ZIO.interrupt else { @@ -258,13 +272,13 @@ object Queue extends QueuePlatformSpecific { } } - def takeAll(implicit trace: Trace): UIO[Chunk[A]] = + override def takeAll(implicit trace: Trace): UIO[Chunk[A]] = ZIO.suspendSucceed { if (shutdownFlag.get) ZIO.interrupt else { val as = unsafePollAll(queue) - if (as.nonEmpty) { + if (!as.isEmpty) { strategy.unsafeOnQueueEmptySpace(queue, takers) Exit.succeed(as) } else { @@ -273,13 +287,13 @@ object Queue extends QueuePlatformSpecific { } } - def takeUpTo(max: Int)(implicit trace: Trace): UIO[Chunk[A]] = + override def takeUpTo(max: Int)(implicit trace: Trace): UIO[Chunk[A]] = ZIO.suspendSucceed { if (shutdownFlag.get) ZIO.interrupt else { val as = unsafePollN(queue, max) - if (as.nonEmpty) { + if (!as.isEmpty) { strategy.unsafeOnQueueEmptySpace(queue, takers) Exit.succeed(as) } else { @@ -320,7 +334,7 @@ object Queue extends QueuePlatformSpecific { def surplusSize: Int - def shutdown(implicit trace: Trace): UIO[Unit] + def shutdown(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Unit @tailrec final def unsafeCompleteTakers( @@ -438,12 +452,14 @@ object Queue extends QueuePlatformSpecific { def surplusSize: Int = putters.size() - def shutdown(implicit trace: Trace): UIO[Unit] = - for { - fiberId <- ZIO.fiberId - putters <- ZIO.succeed(unsafePollAll(putters)) - _ <- ZIO.foreachPar(putters) { case (_, p, lastItem) => if (lastItem) p.interruptAs(fiberId) else ZIO.unit } - } yield () + def shutdown(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Unit = { + var next = putters.poll() + while (next ne null) { + val (_, promise, isLast) = next + if (isLast) promise.unsafe.interruptAs(fiberId) + next = putters.poll() + } + } } final case class Dropping[A]() extends Strategy[A] { @@ -453,7 +469,7 @@ object Queue extends QueuePlatformSpecific { queue: MutableConcurrentQueue[A], takers: ConcurrentDeque[Promise[Nothing, A]], isShutdown: AtomicBoolean - )(implicit trace: Trace): UIO[Boolean] = ZIO.succeed(false) + )(implicit trace: Trace): UIO[Boolean] = Exit.`false` def unsafeOnQueueEmptySpace( queue: MutableConcurrentQueue[A], @@ -462,7 +478,7 @@ object Queue extends QueuePlatformSpecific { def surplusSize: Int = 0 - def shutdown(implicit trace: Trace): UIO[Unit] = ZIO.unit + def shutdown(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Unit = () } final case class Sliding[A]() extends Strategy[A] { @@ -473,7 +489,7 @@ object Queue extends QueuePlatformSpecific { isShutdown: AtomicBoolean )(implicit trace: Trace): UIO[Boolean] = { def unsafeSlidingOffer(as: Iterable[A]): Unit = - if (as.nonEmpty && queue.capacity > 0) { + if (!as.isEmpty && queue.capacity > 0) { val iterator = as.iterator var a = iterator.next() var loop = true @@ -503,7 +519,7 @@ object Queue extends QueuePlatformSpecific { def surplusSize: Int = 0 - def shutdown(implicit trace: Trace): UIO[Unit] = ZIO.unit + def shutdown(fiberId: FiberId)(implicit trace: Trace, unsafe: Unsafe): Unit = () } } diff --git a/core/shared/src/main/scala/zio/Ref.scala b/core/shared/src/main/scala/zio/Ref.scala index ffacf27c2f0f..341fa26861ea 100644 --- a/core/shared/src/main/scala/zio/Ref.scala +++ b/core/shared/src/main/scala/zio/Ref.scala @@ -203,7 +203,7 @@ object Ref extends Serializable { * semantically block other writers, while multiple readers can read * simultaneously. */ - abstract class Synchronized[A] extends Ref[A] { + sealed abstract class Synchronized[A] extends Ref[A] { /** * Reads the value from the `Ref`. @@ -295,6 +295,7 @@ object Ref extends Serializable { } object Synchronized { + private[zio] abstract class Internal[A] extends Synchronized[A] /** * Creates a new `Ref.Synchronized` with the specified value. diff --git a/core/shared/src/main/scala/zio/Runtime.scala b/core/shared/src/main/scala/zio/Runtime.scala index f2698bdcdd2d..fbf7d0fdfdd3 100644 --- a/core/shared/src/main/scala/zio/Runtime.scala +++ b/core/shared/src/main/scala/zio/Runtime.scala @@ -24,6 +24,7 @@ import scala.concurrent.Future /** * A `Runtime[R]` is capable of executing tasks within an environment `R`. */ +@deprecatedInheritance("Use Runtime.apply", since = "2.1.18") trait Runtime[+R] { self => /** diff --git a/core/shared/src/main/scala/zio/Scope.scala b/core/shared/src/main/scala/zio/Scope.scala index 3031890c940d..cbff11726798 100644 --- a/core/shared/src/main/scala/zio/Scope.scala +++ b/core/shared/src/main/scala/zio/Scope.scala @@ -27,7 +27,7 @@ import scala.collection.immutable.LongMap * to the scope, and `close`, which closes a scope and runs all finalizers that * have been added to the scope. */ -trait Scope extends Serializable { self => +sealed trait Scope extends Serializable { self => /** * Adds a finalizer to this scope. The finalizer is guaranteed to be run when diff --git a/core/shared/src/main/scala/zio/ScopedRef.scala b/core/shared/src/main/scala/zio/ScopedRef.scala index b14b81377a7c..98cd8ff04a52 100644 --- a/core/shared/src/main/scala/zio/ScopedRef.scala +++ b/core/shared/src/main/scala/zio/ScopedRef.scala @@ -23,7 +23,7 @@ package zio * resources). The reference itself takes care of properly releasing resources * for the old value whenever a new value is obtained. */ -trait ScopedRef[A] { +sealed trait ScopedRef[A] { /** * Sets the value of this reference to the specified resourcefully-created diff --git a/core/shared/src/main/scala/zio/Semaphore.scala b/core/shared/src/main/scala/zio/Semaphore.scala index fbad74f692c1..1dacb46b0eb8 100644 --- a/core/shared/src/main/scala/zio/Semaphore.scala +++ b/core/shared/src/main/scala/zio/Semaphore.scala @@ -45,6 +45,20 @@ sealed trait Semaphore extends Serializable { */ def awaiting(implicit trace: Trace): UIO[Long] = ZIO.succeed(0L) + /** + * Executes the effect, acquiring a permit if available and releasing it after + * execution. Returns `None` if no permits were available. + */ + final def tryWithPermit[R, E, A](zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, Option[A]] = + tryWithPermits(1L)(zio) + + /** + * Executes the effect, acquiring `n` permits if available and releasing them + * after execution. Returns `None` if no permits were available. + */ + def tryWithPermits[R, E, A](n: Long)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, Option[A]] = + ZIO.none + /** * Executes the specified workflow, acquiring a permit immediately before the * workflow begins execution and releasing it immediately after the workflow @@ -71,6 +85,7 @@ sealed trait Semaphore extends Serializable { * permits and releasing them when the scope is closed. */ def withPermitsScoped(n: Long)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] + } object Semaphore { @@ -110,13 +125,35 @@ object Semaphore { def withPermitsScoped(n: Long)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] = ZIO.acquireRelease(reserve(n))(_.release).flatMap(_.acquire) + override def tryWithPermits[R, E, A](n: Long)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, Option[A]] = + ZIO.acquireReleaseWith(tryReserve(n)) { + case Some(reservation) => reservation.release + case _ => Exit.unit + } { + case _: Some[?] => zio.asSome + case _ => Exit.none + } + case class Reservation(acquire: UIO[Unit], release: UIO[Any]) + object Reservation { + private[zio] val zero = Reservation(ZIO.unit, ZIO.unit) + } + + def tryReserve(n: Long)(implicit trace: Trace): UIO[Option[Reservation]] = + if (n < 0) ZIO.die(new IllegalArgumentException(s"Unexpected negative `$n` permits requested.")) + else if (n == 0L) ZIO.succeed(Some(Reservation.zero)) + else + ref.modify { + case Right(permits) if permits >= n => + Some(Reservation(ZIO.unit, releaseN(n))) -> Right(permits - n) + case other => None -> other + } def reserve(n: Long)(implicit trace: Trace): UIO[Reservation] = if (n < 0) ZIO.die(new IllegalArgumentException(s"Unexpected negative `$n` permits requested.")) else if (n == 0L) - ZIO.succeedNow(Reservation(ZIO.unit, ZIO.unit)) + ZIO.succeed(Reservation.zero) else Promise.make[Nothing, Unit].flatMap { promise => ref.modify { diff --git a/core/shared/src/main/scala/zio/System.scala b/core/shared/src/main/scala/zio/System.scala index 78d29b559932..e3af51e9db3b 100644 --- a/core/shared/src/main/scala/zio/System.scala +++ b/core/shared/src/main/scala/zio/System.scala @@ -19,8 +19,7 @@ package zio import zio.stacktracer.TracingImplicits.disableAutoTrace import java.lang.{System => JSystem} -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ trait System extends Serializable { self => def env(variable: => String)(implicit trace: Trace): IO[SecurityException, Option[String]] @@ -153,7 +152,6 @@ object System extends SystemPlatformSpecific { override def lineSeparator()(implicit unsafe: Unsafe): String = JSystem.lineSeparator - @nowarn("msg=JavaConverters") override def properties()(implicit unsafe: Unsafe): Map[String, String] = JSystem.getProperties.asScala.toMap diff --git a/core/shared/src/main/scala/zio/ThreadLocalBridge.scala b/core/shared/src/main/scala/zio/ThreadLocalBridge.scala index 4a0e124db43d..fe598c2649fb 100644 --- a/core/shared/src/main/scala/zio/ThreadLocalBridge.scala +++ b/core/shared/src/main/scala/zio/ThreadLocalBridge.scala @@ -5,7 +5,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.util.concurrent.atomic.AtomicReference -trait ThreadLocalBridge { +sealed trait ThreadLocalBridge { def makeFiberRef[A](initialValue: A)(link: A => Unit): ZIO[Scope, Nothing, FiberRef[A]] } diff --git a/core/shared/src/main/scala/zio/ZIO.scala b/core/shared/src/main/scala/zio/ZIO.scala index 56e64674557f..84385bc64795 100644 --- a/core/shared/src/main/scala/zio/ZIO.scala +++ b/core/shared/src/main/scala/zio/ZIO.scala @@ -16,18 +16,17 @@ package zio -import zio.internal.{FiberScope, Platform} +import zio.internal.FiberScope import zio.metrics.{MetricLabel, Metrics} import zio.stacktracer.TracingImplicits.disableAutoTrace import java.io.IOException +import java.util.concurrent.ConcurrentLinkedQueue import java.util.function.IntFunction import scala.annotation.implicitNotFound -import scala.collection.mutable.{Builder, ListBuffer} +import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.reflect.ClassTag -import scala.util.control.NoStackTrace -import izumi.reflect.macrortti.LightTypeTag /** * A `ZIO[R, E, A]` value is an immutable value (called an "effect") that @@ -249,31 +248,27 @@ sealed trait ZIO[-R, +E, +A] final def cachedInvalidate( timeToLive0: => Duration )(implicit trace: Trace): ZIO[R, Nothing, (IO[E, A], UIO[Unit])] = - ZIO.suspendSucceed { - val timeToLive = timeToLive0 + ZIO.environmentWith[R] { r => + val timeToLive = timeToLive0.toNanos + val cache = Ref.Synchronized.unsafe.make[Option[(Long, Promise[E, A])]](None)(Unsafe) + + def compute(start: Long): ZIO[R, Nothing, Option[(Long, Promise[E, A])]] = + for { + p <- Promise.make[E, A] + _ <- self.intoPromise(p) + } yield Some((start + timeToLive, p)) - def get(cache: Ref.Synchronized[Option[(Long, Promise[E, A])]]): ZIO[R, E, A] = + val get: ZIO[R, E, A] = ZIO.uninterruptibleMask { restore => Clock.nanoTime.flatMap { time => - cache.modifyZIO { - case Some((end, p)) if end - time > 0 => - Exit.succeed(p.await -> Some((end, p))) - case _ => - Promise.make[E, A].map { p => - val effect = self.onExit(p.done(_)) - effect -> Some((time + timeToLive.toNanos, p)) - } - }.flatMap(restore(_)) + cache.updateSomeAndGetZIO { + case None => restore(compute(time)) + case Some((end, _)) if end - time <= 0 => restore(compute(time)) + }.flatMap(a => restore(a.get._2.await)) } } - def invalidate(cache: Ref.Synchronized[Option[(Long, Promise[E, A])]]): UIO[Unit] = - cache.set(None) - - for { - r <- ZIO.environment[R] - cache <- Ref.Synchronized.make[Option[(Long, Promise[E, A])]](None) - } yield (get(cache).provideEnvironment(r), invalidate(cache)) + get.provideEnvironment(r) -> cache.set(None) } /** @@ -3552,23 +3547,25 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific * Applies the function `f` to each element of the `Collection[A]` and returns * the result in a new `Collection[B]` using the specified execution strategy. */ - final def foreachExec[R, E, A, B, Collection[+Element] <: Iterable[Element]](as: Collection[A])( + def foreachExec[R, E, A, B, Collection[+Element] <: Iterable[Element]]( + as: Collection[A] + )( exec: => ExecutionStrategy )( f: A => ZIO[R, E, B] )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZIO[R, E, Collection[B]] = - if (as.isEmpty) ZIO.succeed(bf.fromSpecific(as)(Nil)) - else - ZIO.suspendSucceed { - exec match { - case ExecutionStrategy.Parallel => - ZIO.withParallelismUnboundedMask(restore => ZIO.foreachPar(as)(a => restore(f(a)))) - case ExecutionStrategy.ParallelN(n) => - ZIO.withParallelismMask(n)(restore => ZIO.foreachPar(as)(a => restore(f(a)))) - case ExecutionStrategy.Sequential => - ZIO.foreach(as)(f) + as.size match { + case 0 => ZIO.succeed(bf.fromSpecific(as)(Nil)) + case 1 => ZIO.suspendSucceed(f(as.head)).map(b => bf.fromSpecific(as)(as.map(_ => b))) + case size => + ZIO.suspendSucceed { + exec match { + case ExecutionStrategy.Parallel => ZIO.foreachParUnbounded(as, size)(f) + case ExecutionStrategy.ParallelN(n) => ZIO.foreachPar(n)(as, size)(f) + case ExecutionStrategy.Sequential => ZIO.foreach(as)(f) + } } - } + } /** * Applies the function `f` to each element of the `Collection[A]` in @@ -3733,7 +3730,7 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific * fiber. */ def fromFiber[E, A](fiber: => Fiber[E, A])(implicit trace: Trace): IO[E, A] = - succeed(fiber).flatMap(_.join) + ZIO.suspendSucceed(fiber.join) /** * Creates a `ZIO` value that represents the exit value of the specified @@ -3866,16 +3863,16 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific * error channel, making it easier to compose in some scenarios. */ def fromOption[A](v: => Option[A])(implicit trace: Trace): IO[Option[Nothing], A] = - succeed(v).flatMap(_.fold[IO[Option[Nothing], A]](fail(None))(ZIO.successFn)) + ZIO.suspendSucceed(v.fold[IO[Option[Nothing], A]](fail(None))(ZIO.successFn)) /** * Lifts a `Try` into a `ZIO`. */ def fromTry[A](value: => scala.util.Try[A])(implicit trace: Trace): Task[A] = - attempt(value).flatMap { + ZIO.suspend(value match { case scala.util.Success(v) => Exit.succeed(v) case scala.util.Failure(t) => ZIO.fail(t) - } + }) /** * Returns a collection of all `FiberRef` values for the fiber running this @@ -4621,9 +4618,7 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific * consumes only a small amount of heap regardless of `n`. */ def replicate[R, E, A](n: Int)(effect: ZIO[R, E, A])(implicit trace: Trace): Iterable[ZIO[R, E, A]] = - new Iterable[ZIO[R, E, A]] { - override def iterator: Iterator[ZIO[R, E, A]] = Iterator.range(0, n).map(_ => effect) - } + Iterable.fill(n)(effect) /** * Performs this effect the specified number of times and collects the @@ -4771,6 +4766,18 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific def serviceWithZIO[Service]: ServiceWithZIOPartiallyApplied[Service] = new ServiceWithZIOPartiallyApplied[Service] + /** + * Builds a ZIO from the specified function. + */ + def fromFunction[In](in: In)(implicit constructor: FunctionConstructor[In], trace: Trace): constructor.Out = + constructor(in) + + /** + * Builds a ZIO from the specified function. + */ + def fromFunctionZIO[In](in: In)(implicit constructor: ZIOFunctionConstructor[In], trace: Trace): constructor.Out = + constructor(in) + /** * Returns an effect that shifts execution to the specified executor. This is * useful to specify a default executor that effects sequenced after this one @@ -6027,7 +6034,7 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific } } - trait ZIOConstructorLowPriority1 extends ZIOConstructorLowPriority2 { + sealed trait ZIOConstructorLowPriority1 extends ZIOConstructorLowPriority2 { /** * Constructs a `ZIO[Any, E, A]` from an `Either[E, A]`. @@ -6066,7 +6073,7 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific } } - trait ZIOConstructorLowPriority2 extends ZIOConstructorLowPriority3 { + sealed trait ZIOConstructorLowPriority2 extends ZIOConstructorLowPriority3 { /** * Constructs a `ZIO[Any, Throwable, A]` from an `A`. @@ -6292,39 +6299,50 @@ object ZIO extends ZIOCompanionPlatformSpecific with ZIOCompanionVersionSpecific ): ZIO[R, E, Unit] = foreachParUnboundedDiscard(as, size)(ZIO.identityFn) - private def foreachPar[R, E, A, B, Collection[+Element] <: Iterable[Element]](n: => Int)( + private def foreachPar[R, E, A, B, Collection[+Element] <: Iterable[Element]]( + parallelism: Int + )( as: Collection[A], size: Int )( fn: A => ZIO[R, E, B] )(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZIO[R, E, Collection[B]] = - ZIO.suspendSucceed { - val array = Array.ofDim[AnyRef](size) - val zioFunction: ((A, Int)) => ZIO[R, E, Any] = { case (a, i) => - fn(a).flatMap { b => array(i) = b.asInstanceOf[AnyRef]; Exit.unit } + if (parallelism <= 1) + foreach(as)(fn) + else if (parallelism >= size) + foreachParUnbounded(as, size)(fn) + else + ZIO.suspendSucceed { + val array = Array.ofDim[AnyRef](size) + val zioFunction: ((A, Int)) => ZIO[R, E, Any] = { case (a, i) => + fn(a).flatMap { b => array(i) = b.asInstanceOf[AnyRef]; Exit.unit } + } + foreachParDiscard(parallelism)(as.zipWithIndex, size)(zioFunction) + .as(bf.fromSpecific(as)(array.asInstanceOf[Array[B]])) } - foreachParDiscard(n)(as.zipWithIndex, size)(zioFunction) - .as(bf.fromSpecific(as)(array.asInstanceOf[Array[B]])) - } private def foreachParDiscard[R, E, A]( - n: Int + parallelism: Int )(as: Iterable[A], size: Int)(f: A => ZIO[R, E, Any])(implicit trace: Trace): ZIO[R, E, Unit] = - size match { - case 0 => Exit.unit - case 1 => f(as.head).unit - case size => - def worker(queue: Queue[A]): ZIO[R, E, Unit] = - queue.poll.flatMap { - case Some(a) => f(a) *> worker(queue) - case _ => Exit.unit - } - Queue.bounded[A](size).flatMap { queue => - val nWorkers = n.min(size) - val workers = ZIO.replicate(nWorkers)(worker(queue)) - queue.offerAll(as) *> ZIO.collectAllParUnboundedDiscard(workers, nWorkers) - } - } + if (parallelism <= 1) + foreachDiscard(as)(f) + else if (parallelism >= size) + foreachParUnboundedDiscard(as, size)(f) + else + ZIO.suspendSucceed { + import scala.jdk.CollectionConverters._ + + val queue = new ConcurrentLinkedQueue[A](as.asJavaCollection) + + lazy val worker: ZIO[R, E, Unit] = + ZIO.suspendSucceed(queue.poll() match { + case null => Exit.unit + case a => f(a) *> worker + }) + + val workers = ZIO.replicate(parallelism)(worker) + ZIO.collectAllParUnboundedDiscard(workers, parallelism) + } private def foreachParUnbounded[R, E, A, B, Collection[+Element] <: Iterable[Element]]( as: Collection[A], @@ -6529,10 +6547,15 @@ sealed trait Exit[+E, +A] extends ZIO[Any, E, A] { self => } final def getOrThrow()(implicit ev: E <:< Throwable, unsafe: Unsafe): A = - getOrElse(cause => throw cause.squashTrace) + getOrElse(cause => throw cause.traced(externalStackTrace).squashTrace) final def getOrThrowFiberFailure()(implicit unsafe: Unsafe): A = - getOrElse(c => throw FiberFailure(c)) + getOrElse(cause => throw FiberFailure(cause.traced(externalStackTrace))) + + private def externalStackTrace: StackTrace = { + val stackTrace = new Throwable().getStackTrace.dropWhile(_.getClassName.startsWith("zio.Exit")) + StackTrace.fromJava(FiberId.None, stackTrace)(Trace.empty) + } /** * Determines if the result is a failure. diff --git a/core/shared/src/main/scala/zio/ZIOApp.scala b/core/shared/src/main/scala/zio/ZIOApp.scala index 26e94384c817..ee403221bc23 100644 --- a/core/shared/src/main/scala/zio/ZIOApp.scala +++ b/core/shared/src/main/scala/zio/ZIOApp.scala @@ -72,7 +72,7 @@ trait ZIOApp extends ZIOAppPlatformSpecific with ZIOAppVersionSpecific { */ final def exit(code: ExitCode)(implicit trace: Trace): UIO[Unit] = ZIO.succeed { - if (!shuttingDown.getAndSet(true)) { + if (shuttingDown.compareAndSet(false, true)) { try Platform.exit(code.code)(Unsafe.unsafe) catch { case _: SecurityException => @@ -99,8 +99,8 @@ trait ZIOApp extends ZIOAppPlatformSpecific with ZIOAppVersionSpecific { def runtime: Runtime[Any] = Runtime.default protected def installSignalHandlers(runtime: Runtime[Any])(implicit trace: Trace): UIO[Any] = - ZIO.attempt { - if (!ZIOApp.installedSignals.getAndSet(true)) { + ZIO.ignore { + if (ZIOApp.installedSignals.compareAndSet(false, true)) { val dumpFibers = () => runtime.unsafe.run(Fiber.dumpAll)(trace, Unsafe.unsafe).getOrThrowFiberFailure()(Unsafe.unsafe) @@ -109,7 +109,7 @@ trait ZIOApp extends ZIOAppPlatformSpecific with ZIOAppVersionSpecific { Platform.addSignalHandler("USR1", dumpFibers)(Unsafe.unsafe) } } - }.ignore + } } object ZIOApp { diff --git a/core/shared/src/main/scala/zio/ZIOFunctionConstructor.scala b/core/shared/src/main/scala/zio/ZIOFunctionConstructor.scala new file mode 100644 index 000000000000..127dc3055035 --- /dev/null +++ b/core/shared/src/main/scala/zio/ZIOFunctionConstructor.scala @@ -0,0 +1,104 @@ +package zio + +/** + * A `ZIOFunctionConstructor[Input]` knows how to construct a `ZIO` value from a + * function of type `In`. This allows the type of the `ZIO` value constructed to + * depend on `In`. + */ +sealed abstract class ZIOFunctionConstructor[In] { + + /** + * The type of the `ZIO` value. + */ + type Out + + /** + * Constructs a `ZIO` value from the specified input. + */ + def apply(in: In)(implicit trace: Trace): Out +} + +object ZIOFunctionConstructor { + + type WithOut[In, Out0] = ZIOFunctionConstructor[In] { type Out = Out0 } + + implicit def function1Constructor[A: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[A => ZIO[R, E, Z], ZIO[R with A, E, Z]] = + new ZIOFunctionConstructor[A => ZIO[R, E, Z]] { + type Out = ZIO[R with A, E, Z] + def apply(f: A => ZIO[R, E, Z])(implicit trace: Trace): ZIO[R with A, E, Z] = + ZIO.serviceWithZIO[A](f) + } + + implicit def function2Constructor[A: Tag, B: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B) => ZIO[R, E, Z], ZIO[R with A with B, E, Z]] = + new ZIOFunctionConstructor[(A, B) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B, E, Z] + def apply(f: (A, B) => ZIO[R, E, Z])(implicit trace: Trace): ZIO[R with A with B, E, Z] = + ZIO.environmentWithZIO[A with B](env => f(env.get[A], env.get[B])) + } + + implicit def function3Constructor[A: Tag, B: Tag, C: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B, C) => ZIO[R, E, Z], ZIO[R with A with B with C, E, Z]] = + new ZIOFunctionConstructor[(A, B, C) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B with C, E, Z] + def apply(f: (A, B, C) => ZIO[R, E, Z])(implicit trace: Trace): ZIO[R with A with B with C, E, Z] = + ZIO.environmentWithZIO[A with B with C](env => f(env.get[A], env.get[B], env.get[C])) + } + + implicit def function4Constructor[A: Tag, B: Tag, C: Tag, D: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B, C, D) => ZIO[R, E, Z], ZIO[R with A with B with C with D, E, Z]] = + new ZIOFunctionConstructor[(A, B, C, D) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B with C with D, E, Z] + def apply(f: (A, B, C, D) => ZIO[R, E, Z])(implicit trace: Trace): ZIO[R with A with B with C with D, E, Z] = + ZIO.environmentWithZIO[A with B with C with D](env => f(env.get[A], env.get[B], env.get[C], env.get[D])) + } + + implicit def function5Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B, C, D, F) => ZIO[R, E, Z], ZIO[ + R with A with B with C with D with F, + E, + Z + ]] = + new ZIOFunctionConstructor[(A, B, C, D, F) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B with C with D with F, E, Z] + def apply( + f: (A, B, C, D, F) => ZIO[R, E, Z] + )(implicit trace: Trace): ZIO[R with A with B with C with D with F, E, Z] = + ZIO.environmentWithZIO[A with B with C with D with F](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F]) + ) + } + + implicit def function6Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, G: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B, C, D, F, G) => ZIO[R, E, Z], ZIO[ + R with A with B with C with D with F with G, + E, + Z + ]] = + new ZIOFunctionConstructor[(A, B, C, D, F, G) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B with C with D with F with G, E, Z] + def apply( + f: (A, B, C, D, F, G) => ZIO[R, E, Z] + )(implicit trace: Trace): ZIO[R with A with B with C with D with F with G, E, Z] = + ZIO.environmentWithZIO[A with B with C with D with F with G](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F], env.get[G]) + ) + } + + implicit def function7Constructor[A: Tag, B: Tag, C: Tag, D: Tag, F: Tag, G: Tag, H: Tag, R, E, Z] + : ZIOFunctionConstructor.WithOut[(A, B, C, D, F, G, H) => ZIO[R, E, Z], ZIO[ + R with A with B with C with D with F with G with H, + E, + Z + ]] = + new ZIOFunctionConstructor[(A, B, C, D, F, G, H) => ZIO[R, E, Z]] { + type Out = ZIO[R with A with B with C with D with F with G with H, E, Z] + def apply( + f: (A, B, C, D, F, G, H) => ZIO[R, E, Z] + )(implicit trace: Trace): ZIO[R with A with B with C with D with F with G with H, E, Z] = + ZIO.environmentWithZIO[A with B with C with D with F with G with H](env => + f(env.get[A], env.get[B], env.get[C], env.get[D], env.get[F], env.get[G], env.get[H]) + ) + } +} diff --git a/core/shared/src/main/scala/zio/ZInputStream.scala b/core/shared/src/main/scala/zio/ZInputStream.scala index db297754040d..04f1bd944f7f 100644 --- a/core/shared/src/main/scala/zio/ZInputStream.scala +++ b/core/shared/src/main/scala/zio/ZInputStream.scala @@ -20,7 +20,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.io.IOException -trait ZInputStream { +sealed trait ZInputStream { def readN(n: Int)(implicit trace: Trace): IO[Option[IOException], Chunk[Byte]] def skip(n: Long)(implicit trace: Trace): IO[IOException, Long] def readAll(bufferSize: Int)(implicit trace: Trace): IO[Option[IOException], Chunk[Byte]] diff --git a/core/shared/src/main/scala/zio/ZKeyedPool.scala b/core/shared/src/main/scala/zio/ZKeyedPool.scala index 97f35445c899..60fcaf692379 100644 --- a/core/shared/src/main/scala/zio/ZKeyedPool.scala +++ b/core/shared/src/main/scala/zio/ZKeyedPool.scala @@ -18,7 +18,7 @@ package zio import zio.internal.Platform -trait ZKeyedPool[+Err, -Key, Item] { +sealed trait ZKeyedPool[+Err, -Key, Item] { /** * Retrieves an item from the pool belonging to the given key in a scoped diff --git a/core/shared/src/main/scala/zio/ZOutputStream.scala b/core/shared/src/main/scala/zio/ZOutputStream.scala index fd3400741124..31e06eca9684 100644 --- a/core/shared/src/main/scala/zio/ZOutputStream.scala +++ b/core/shared/src/main/scala/zio/ZOutputStream.scala @@ -20,7 +20,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace import java.io.IOException -trait ZOutputStream { +sealed trait ZOutputStream { def write(chunk: Chunk[Byte])(implicit trace: Trace): IO[IOException, Unit] } diff --git a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala index 3e81ec775d79..f3f31ad582cb 100644 --- a/core/shared/src/main/scala/zio/internal/FiberRuntime.scala +++ b/core/shared/src/main/scala/zio/internal/FiberRuntime.scala @@ -1312,9 +1312,12 @@ final class FiberRuntime[E, A](fiberId: FiberId.Runtime, fiberRefs0: FiberRefs, } if (runtimeMetricsEnabled) { - val tags = getFiberRef(FiberRef.currentTags) - Metric.runtime.fiberFailures.unsafe.update(1, tags)(Unsafe) - cause.foldContext(tags)(FiberRuntime.fiberFailureTracker) + val filteredCause = cause.filter(_.traces.exists(_.fiberId eq fiberId)) + if (!filteredCause.isEmpty) { + val tags = getFiberRef(FiberRef.currentTags) + Metric.runtime.fiberFailures.unsafe.update(1, tags)(Unsafe) + filteredCause.foldContext(tags)(FiberRuntime.fiberFailureTracker) + } } } catch { case throwable: Throwable => diff --git a/core/shared/src/main/scala/zio/internal/MutableConcurrentQueue.scala b/core/shared/src/main/scala/zio/internal/MutableConcurrentQueue.scala index 599aeb68fd07..81dbc0436aa9 100644 --- a/core/shared/src/main/scala/zio/internal/MutableConcurrentQueue.scala +++ b/core/shared/src/main/scala/zio/internal/MutableConcurrentQueue.scala @@ -31,8 +31,7 @@ private[zio] object MutableConcurrentQueue { if (capacity == 1) new OneElementConcurrentQueue() else RingBuffer[A](capacity) - def unbounded[A]: MutableConcurrentQueue[A] = - new LinkedQueue[A] + def unbounded[A]: MutableConcurrentQueue[A] = LinkedQueue[A](addMetrics = false) /** * Rounds up to the nearest power of 2 and subtracts 1. e.g., diff --git a/core/shared/src/main/scala/zio/internal/impls/LinkedQueue.scala b/core/shared/src/main/scala/zio/internal/impls/LinkedQueue.scala index 94f3281d80d5..9541f58cc23d 100644 --- a/core/shared/src/main/scala/zio/internal/impls/LinkedQueue.scala +++ b/core/shared/src/main/scala/zio/internal/impls/LinkedQueue.scala @@ -23,7 +23,19 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong import scala.annotation.nowarn -private[zio] final class LinkedQueue[A] extends MutableConcurrentQueue[A] with Serializable { +private[zio] final class LinkedQueue[A] private (addMetrics: Boolean) + extends MutableConcurrentQueue[A] + with Serializable { + + /** + * Public constructor to create a new LinkedQueue. + * + * Required for retrocompatibility. See https://github.com/zio/zio/pull/8784 + * + * Use [[LinkedQueue.apply]] to use a different `addMetrics` value. + */ + def this() = this(addMetrics = true) + override final val capacity = Int.MaxValue private[this] val jucConcurrentQueue = new ConcurrentLinkedQueue[A]() @@ -32,32 +44,32 @@ private[zio] final class LinkedQueue[A] extends MutableConcurrentQueue[A] with S * performance implications. Having a better solution would be * desirable. */ - private[this] val enqueuedCounter = new AtomicLong(0) - private[this] val dequeuedCounter = new AtomicLong(0) + private[this] val enqueuedCounter = if (addMetrics) new AtomicLong(0) else null + private[this] val dequeuedCounter = if (addMetrics) new AtomicLong(0) else null override def size(): Int = jucConcurrentQueue.size() - override def enqueuedCount(): Long = enqueuedCounter.get() + override def enqueuedCount(): Long = if (enqueuedCounter ne null) enqueuedCounter.get() else 0L - override def dequeuedCount(): Long = dequeuedCounter.get() + override def dequeuedCount(): Long = if (dequeuedCounter ne null) dequeuedCounter.get() else 0L override def offer(a: A): Boolean = { val success = jucConcurrentQueue.offer(a) - if (success) enqueuedCounter.incrementAndGet() + if ((enqueuedCounter ne null) && success) enqueuedCounter.incrementAndGet() success } override def offerAll[A1 <: A](as: Iterable[A1]): Chunk[A1] = { import collection.JavaConverters._ jucConcurrentQueue.addAll(as.asJavaCollection): @nowarn("msg=JavaConverters") - enqueuedCounter.addAndGet(as.size.toLong) + if (enqueuedCounter ne null) enqueuedCounter.addAndGet(as.size.toLong) Chunk.empty } override def poll(default: A): A = { val polled = jucConcurrentQueue.poll() if (polled != null) { - dequeuedCounter.incrementAndGet() + if (dequeuedCounter ne null) dequeuedCounter.incrementAndGet() polled } else default } @@ -66,3 +78,7 @@ private[zio] final class LinkedQueue[A] extends MutableConcurrentQueue[A] with S override def isFull(): Boolean = false } + +object LinkedQueue { + def apply[A](addMetrics: Boolean): LinkedQueue[A] = new LinkedQueue[A](addMetrics) +} diff --git a/core/shared/src/main/scala/zio/internal/macros/Graph.scala b/core/shared/src/main/scala/zio/internal/macros/Graph.scala index 17f673bbf9fc..74be1a76a469 100644 --- a/core/shared/src/main/scala/zio/internal/macros/Graph.scala +++ b/core/shared/src/main/scala/zio/internal/macros/Graph.scala @@ -2,7 +2,11 @@ package zio.internal.macros import zio.internal.macros.LayerTree._ -final case class Graph[Key, A](nodes: List[Node[Key, A]], keyEquals: (Key, Key) => Boolean) { +final case class Graph[Key, A]( + nodes: List[Node[Key, A]], + keyEquals: (Key, Key) => Boolean, + unknownLayerFactory: Key => Option[Node[Key, A]] +) { def buildComplete(outputs: List[Key]): Either[::[GraphError[Key, A]], LayerTree[A]] = forEach(outputs) { output => @@ -23,7 +27,7 @@ final case class Graph[Key, A](nodes: List[Node[Key, A]], keyEquals: (Key, Key) .map(_ >>> LayerTree.succeed(node.value)) def map[B](f: A => B): Graph[Key, B] = - Graph(nodes.map(_.map(f)), keyEquals) + Graph(nodes.map(_.map(f)), keyEquals, unknownLayerFactory(_).map(_.map(f))) private val nodeWithOutputCache = new java.util.HashMap[Key, Option[Node[Key, A]]] @@ -31,7 +35,7 @@ final case class Graph[Key, A](nodes: List[Node[Key, A]], keyEquals: (Key, Key) nodeWithOutputCache.computeIfAbsent(output, findNodeWithOutput).toRight(::(error, Nil)) private def findNodeWithOutput(output: Key): Option[Node[Key, A]] = - nodes.find(_.outputs.exists(keyEquals(_, output))) + nodes.find(_.outputs.exists(keyEquals(_, output))).orElse(unknownLayerFactory(output)) private def buildNode( node: Node[Key, A], diff --git a/core/shared/src/main/scala/zio/internal/macros/LayerBuilder.scala b/core/shared/src/main/scala/zio/internal/macros/LayerBuilder.scala index 61c606070448..c57a1088ca20 100644 --- a/core/shared/src/main/scala/zio/internal/macros/LayerBuilder.scala +++ b/core/shared/src/main/scala/zio/internal/macros/LayerBuilder.scala @@ -47,7 +47,7 @@ import scala.collection.{immutable, mutable} */ final case class LayerBuilder[Type, Expr]( target0: List[Type], - remainder: List[Type], + remainder: RemainderMethod[Type], providedLayers0: List[Expr], layerToDebug: PartialFunction[Expr, Debug], sideEffectType: Type, @@ -64,11 +64,11 @@ final case class LayerBuilder[Type, Expr]( ) { lazy val target = - if (method.isProvideSomeShared) target0.filterNot(t1 => remainder.exists(t2 => typeEquals(t1, t2))) + if (method.isProvideSomeShared) target0.filterNot(t1 => remainder.providedTypes.exists(t2 => typeEquals(t1, t2))) else target0 private lazy val remainderNodes: List[Node[Type, Expr]] = - remainder.map(typeToNode).distinct + remainder.providedTypes.map(typeToNode).distinct private val (providedLayers, maybeDebug): (List[Expr], Option[ZLayer.Debug]) = { val maybeDebug = providedLayers0.collectFirst(layerToDebug) @@ -79,9 +79,33 @@ final case class LayerBuilder[Type, Expr]( private val (sideEffectNodes, providedLayerNodes): (List[Node[Type, Expr]], List[Node[Type, Expr]]) = providedLayers.map(exprToNode).partition(_.outputs.exists(typeEquals(_, sideEffectType))) + /** + * Returns tree representing final `ZLayer[In, Err, Out]` + * + * ==If [[remainder]] is [[RemainderMethod.Inferred]]== + * - `In` will be some generated type + * - `Err` will be the union of all errors in the provided layers + * - `Out` will be AND'ed together types from [[target0]] + * + * ==If [[remainder]] is [[RemainderMethod.Provided]] and provide method is not [[ProvideMethod.ProvideSomeShared]]== + * - `In` will be AND'ed together types from [[remainder]] + * - `Err` will be the union of all errors in the provided layers + * - `Out` will be AND'ed together types from [[target0]] + * + * ==If [[remainder]] is [[RemainderMethod.Provided]] and provide method is [[ProvideMethod.ProvideSomeShared]]== + * - `In` will be AND'ed together types from [[remainder]] + * - `Err` will be the union of all errors in the provided layers + * - `Out` will be generated type (AND'ed together types from [[target0]] + * minus types from [[remainder]]) + */ def build: Expr = { assertNoAmbiguity() + val remainderTypeFactory = remainder match { + case RemainderMethod.Provided(_) => (_: Type) => None + case RemainderMethod.Inferred => (t: Type) => Some(typeToNode(t)) + } + /** * Build the layer tree. This represents the structure of a successfully * constructed ZLayer that will build the target types. This, of course, may @@ -89,7 +113,7 @@ final case class LayerBuilder[Type, Expr]( */ val layerTreeEither: Either[::[GraphError[Type, Expr]], LayerTree[Expr]] = { val nodes: List[Node[Type, Expr]] = providedLayerNodes ++ remainderNodes ++ sideEffectNodes - val graph = Graph(nodes, typeEquals) + val graph = Graph(nodes, typeEquals, remainderTypeFactory) for { original <- graph.buildComplete(target) @@ -173,7 +197,7 @@ final case class LayerBuilder[Type, Expr]( reportWarn(message) } - if (remainder.isEmpty) { + if (remainder == RemainderMethod.Provided[Type](List.empty)) { val message = "\n" + TerminalRendering.provideSomeNothingEnvError reportWarn(message) } @@ -385,3 +409,15 @@ object ProvideMethod { case object ProvideSomeShared extends ProvideMethod case object ProvideCustom extends ProvideMethod } + +sealed trait RemainderMethod[+T] extends Product with Serializable { + final def providedTypes: List[T] = this match { + case RemainderMethod.Provided(types) => types + case RemainderMethod.Inferred => List.empty + } +} + +object RemainderMethod { + case class Provided[T](types: List[T]) extends RemainderMethod[T] + case object Inferred extends RemainderMethod[Nothing] +} diff --git a/core/shared/src/main/scala/zio/package.scala b/core/shared/src/main/scala/zio/package.scala index a01775c85c81..179c31f94dae 100644 --- a/core/shared/src/main/scala/zio/package.scala +++ b/core/shared/src/main/scala/zio/package.scala @@ -46,7 +46,7 @@ package object zio type Trace = Tracer.instance.Type with Tracer.Traced - trait Tag[A] extends EnvironmentTag[A] { + sealed trait Tag[A] extends EnvironmentTag[A] { def tag: LightTypeTag } diff --git a/core/shared/src/main/scala/zio/stm/TDequeue.scala b/core/shared/src/main/scala/zio/stm/TDequeue.scala index 4b28ffde3b1f..fa5778a93fa2 100644 --- a/core/shared/src/main/scala/zio/stm/TDequeue.scala +++ b/core/shared/src/main/scala/zio/stm/TDequeue.scala @@ -21,7 +21,7 @@ import zio._ /** * A transactional queue that can only be dequeued. */ -trait TDequeue[+A] extends Serializable { +sealed trait TDequeue[+A] extends Serializable { /** * The maximum capacity of the queue. @@ -138,3 +138,6 @@ trait TDequeue[+A] extends Serializable { final def takeN(n: Int): ZSTM[Any, Nothing, Chunk[A]] = takeBetween(n, n) } +private[zio] object TDequeue { + private[zio] trait Internal[+A] extends TDequeue[A] +} diff --git a/core/shared/src/main/scala/zio/stm/TEnqueue.scala b/core/shared/src/main/scala/zio/stm/TEnqueue.scala index cb77b503b99f..ec516551c678 100644 --- a/core/shared/src/main/scala/zio/stm/TEnqueue.scala +++ b/core/shared/src/main/scala/zio/stm/TEnqueue.scala @@ -21,7 +21,7 @@ import zio._ /** * A transactional queue that can only be enqueued. */ -trait TEnqueue[-A] extends Serializable { +sealed trait TEnqueue[-A] extends Serializable { /** * The maximum capacity of the queue. @@ -73,3 +73,6 @@ trait TEnqueue[-A] extends Serializable { def isFull: USTM[Boolean] = size.map(_ == capacity) } +private[zio] object TEnqueue { + private[zio] trait Internal[-A] extends TEnqueue[A] +} diff --git a/core/shared/src/main/scala/zio/stm/THub.scala b/core/shared/src/main/scala/zio/stm/THub.scala index 4645ac1704eb..a7676933baac 100644 --- a/core/shared/src/main/scala/zio/stm/THub.scala +++ b/core/shared/src/main/scala/zio/stm/THub.scala @@ -23,7 +23,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace * A `THub` is a transactional message hub. Publishers can publish messages to * the hub and subscribers can subscribe to take messages from the hub. */ -abstract class THub[A] extends TEnqueue[A] { +sealed abstract class THub[A] extends TEnqueue.Internal[A] { /** * Publishes a message to the hub, returning whether the message was published @@ -253,7 +253,7 @@ object THub { subscriberCount: TRef[Int], subscribers: TRef[Set[TRef[TRef[Node[A]]]]] ): TDequeue[A] = - new TDequeue[A] { + new TDequeue.Internal[A] { override def capacity: Int = requestedCapacity override def isShutdown: USTM[Boolean] = diff --git a/core/shared/src/main/scala/zio/stm/TQueue.scala b/core/shared/src/main/scala/zio/stm/TQueue.scala index 40205b2f7f5f..8598d7119ecb 100644 --- a/core/shared/src/main/scala/zio/stm/TQueue.scala +++ b/core/shared/src/main/scala/zio/stm/TQueue.scala @@ -25,7 +25,7 @@ import scala.collection.immutable.{Queue => ScalaQueue} * A `TQueue` is a transactional queue. Offerors can offer values to the queue * and takers can take values from the queue. */ -trait TQueue[A] extends TDequeue[A] with TEnqueue[A] { +sealed trait TQueue[A] extends TDequeue.Internal[A] with TEnqueue.Internal[A] { override final def awaitShutdown: USTM[Unit] = isShutdown.flatMap(b => if (b) ZSTM.unit else ZSTM.retry) diff --git a/core/shared/src/main/scala/zio/stm/TSemaphore.scala b/core/shared/src/main/scala/zio/stm/TSemaphore.scala index 5731cf404e7b..3ca0e810ad84 100644 --- a/core/shared/src/main/scala/zio/stm/TSemaphore.scala +++ b/core/shared/src/main/scala/zio/stm/TSemaphore.scala @@ -108,6 +108,43 @@ final class TSemaphore private (val permits: TRef[Long]) extends Serializable { permits.unsafeSet(journal, current + n) } + /** + * Tries to acquire a single permit in a transactional context. Returns `true` + * if the permit was acquired, otherwise `false`. + */ + def tryAcquire: USTM[Boolean] = tryAcquireN(1L) + + /** + * Tries to acquire the specified number of permits in a transactional + * context. Returns `true` if the permits were acquired, otherwise `false`. + */ + def tryAcquireN(n: Long): USTM[Boolean] = + ZSTM.Effect { (journal, _, _) => + assertNonNegative(n) + + val available: Long = permits.unsafeGet(journal) + if (available >= n) { + permits.unsafeSet(journal, available - n) + true + } else false + } + + /** + * Executes the specified effect, acquiring `1` permit if available and + * releasing them after execution. Returns `None` if no permits were + * available. + */ + def tryWithPermit[R, E, A](zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, Option[A]] = + tryWithPermits(1L)(zio) + + /** + * Executes the specified effect, acquiring `n` permits if available and + * releasing them after execution. Returns `None` if no permits were + * available. + */ + def tryWithPermits[R, E, A](n: Long)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, Option[A]] = + ZSTM.acquireReleaseWith(tryAcquireN(n))(releaseN(n).commit.whenDiscard(_))(zio.when(_)) + /** * Executes the specified effect, acquiring a permit immediately before the * effect begins execution and releasing it immediately after the effect diff --git a/docs/reference/concurrency/hub.md b/docs/reference/concurrency/hub.md index fd8ec075868c..9aa457da373c 100644 --- a/docs/reference/concurrency/hub.md +++ b/docs/reference/concurrency/hub.md @@ -131,6 +131,10 @@ As you can see, the operators on `Hub` are identical to the ones on `Queue` with In fact, a `Hub` can be viewed as a `Queue` that can only be written to. +```scala mdoc:invisible +trait Enqueue[-A] +``` + ```scala mdoc:nest trait Hub[A] extends Enqueue[A] ``` diff --git a/docs/reference/di/dependency-propagation.md b/docs/reference/di/dependency-propagation.md index 6896d4f1acdf..14824a78f79a 100644 --- a/docs/reference/di/dependency-propagation.md +++ b/docs/reference/di/dependency-propagation.md @@ -55,7 +55,7 @@ val loggingImpl = new EmailService { val effect = app.provideEnvironment(ZEnvironment(loggingImpl)) ``` -Most of the time, we don't use `ZIO#provideEnvironment` directly to provide our services; instead, we use `ZLayer` to construct the dependency graph of our application, then we use methods like `ZIO#provide`, `ZIO#provideSome` and `ZIO#provideCustom` to propagate dependencies into the environment of our ZIO effect. +Most of the time, we don't use `ZIO#provideEnvironment` directly to provide our services; instead, we use `ZLayer` to construct the dependency graph of our application, then we use methods like `ZIO#provide`, `ZIO#provideSome`, `ZIO#provideSomeAuto` and `ZIO#provideCustom` to propagate dependencies into the environment of our ZIO effect. ## Using `ZIO#provide` Method @@ -126,3 +126,12 @@ val mainEffectSome: ZIO[Bar, Nothing, Unit] = :::caution When using `ZIO#provideSome[R0]`, we should provide the remaining type as `R0` type parameter. This workaround helps the compiler to infer the proper types. ::: + +## Using `ZIO#provideSomeAuto` Method + +In Scala 3 enhanced version of `ZIO#provideSome` is introduced. The `ZIO#provideSomeAuto` method automatically infers the remaining type of the effect. + +```scala +val mainEffectSomeAuto = + myApp.provideSomeAuto(FooLive.layer) // No need to provide `Bar` anywhere +``` diff --git a/docs/reference/service-pattern/accessor-methods.md b/docs/reference/service-pattern/accessor-methods.md index 4efc23acd852..8c825966071a 100644 --- a/docs/reference/service-pattern/accessor-methods.md +++ b/docs/reference/service-pattern/accessor-methods.md @@ -65,38 +65,6 @@ Accessor methods have some drawbacks: The recommended [service pattern](service-pattern.md) injects service dependencies directly, and therefore has none of these problems. -## Generating Accessor Methods with Macros - -:::info -Accessor Methods macros are only available for Scala versions `2.x`. They will _not_ be made available for scala `3.x`. -::: - -Writing accessor methods is a repetitive task and is tedious for services with many methods. We can automate the -generation of accessor methods using the `zio-macro` module. - -To install the `zio-macro` add the following line in the `build.sbt` file: - -```scala -libraryDependencies += "dev.zio" %% "zio-macros" % "" -``` - -In addition, enable macro expansion with: - -- for Scala `2.13` add the compiler option: - - ```scala - scalacOptions += "-Ymacro-annotations" - ``` - -- for Scala `< 2.13` add the macro paradise compiler plugin: - - ```scala - compilerPlugin(("org.scalamacros" % "paradise" % "2.1.1") cross CrossVersion.full) - ``` - -If you are using IntelliJ, macro generated accessors will not be available in IDE hints -without [ZIO plugin](../../guides/tutorials/running-our-first-zio-project-with-intellij-idea.md). - ## Monomorphic Services We can use the `@accessible` macro to generate _service member accessors_: diff --git a/docs/reference/stream/chunk.md b/docs/reference/stream/chunk.md index b57537a31c80..5a054524a111 100644 --- a/docs/reference/stream/chunk.md +++ b/docs/reference/stream/chunk.md @@ -9,12 +9,12 @@ import zio._ ``` ## Why Chunk? -Arrays are fast and don’t box primitive values but due to `ClassTag` requirements and mutability they are painful to use and don't integrate well info functional code. ZIO chunks are backed by arrays so they also have zero boxing for primitives while providing an immutable interface and avoiding `ClassTag` requirements. +Arrays are fast and don’t box primitive values but due to `ClassTag` requirements and mutability they are painful to use and don't integrate well into functional code. ZIO chunks are backed by arrays so they also have zero boxing for primitives while providing an immutable interface and avoiding `ClassTag` requirements. -Lets to get more details behind why Chunk invented: +Let's get more details behind why Chunk was invented: ### Immutability -In Scala, there is no immutable data type that can efficiently represent primitive data types. There is Array, but Array is a mutable interface. The Array data type can efficiently represent primitives without boxing but only by exposing some unsafe mutable methods like `update`. +In Scala, there is no immutable data type that can efficiently represent primitive data types. There is Array, but Array is a mutable interface. The Array data type can efficiently represent primitives without boxing, but only by exposing some unsafe mutable methods like `update`. ### Ergonomic Design Every time, when we create an array of generic types in Scala, we need a [ClassTag](https://www.scala-lang.org/api/current/scala/reflect/ClassTag.html) to provide runtime information about that generic type, which is very inconvenient and isn't ergonomic. It leads us to a very cumbersome API. @@ -120,7 +120,7 @@ Chunk("A","B") == Chunk("A", "C") Chunk(1,2,3).toArray ``` -`toSeq`converts the chunk into `Seq`. +`toSeq` converts the chunk into `Seq`. ``` scala mdoc Chunk(1,2,3).toSeq diff --git a/docs/reference/stream/subscriptionref.md b/docs/reference/stream/subscriptionref.md index 6b7b291b4fe9..d046e6cac203 100644 --- a/docs/reference/stream/subscriptionref.md +++ b/docs/reference/stream/subscriptionref.md @@ -5,6 +5,12 @@ title: "SubscriptionRef" A `SubscriptionRef[A]` is a `Ref` that lets us subscribe to receive the current value along with all changes to that value. +```scala mdoc:invisible +object Ref { + abstract class Synchronized[A] +} +``` + ```scala mdoc import zio._ import zio.stream._ diff --git a/docs/reference/test/aspects/passing-failed-tests.md b/docs/reference/test/aspects/passing-failed-tests.md index 29407d576a17..ad95cb371e23 100644 --- a/docs/reference/test/aspects/passing-failed-tests.md +++ b/docs/reference/test/aspects/passing-failed-tests.md @@ -9,8 +9,8 @@ The `failing` aspect makes a test that failed for any reason pass. import zio._ import zio.test.{test, _} -test("failing a passing test") { - assertTrue(true) +test("passing a failing test") { + assertTrue(false) } @@ TestAspect.failing ``` @@ -20,8 +20,8 @@ If the test passes this aspect will make it fail: import zio._ import zio.test.{test, _} -test("passing a failing test") { - assertTrue(false) +test("failing a passing test") { + assertTrue(true) } @@ TestAspect.failing ``` diff --git a/docs/reference/test/sharing-layers-within-the-same-file.md b/docs/reference/test/sharing-layers-within-the-same-file.md index c9d3b6223fd7..1e2bd4fdc0ba 100644 --- a/docs/reference/test/sharing-layers-within-the-same-file.md +++ b/docs/reference/test/sharing-layers-within-the-same-file.md @@ -3,7 +3,13 @@ id: sharing-layers-within-the-same-file title: "Sharing Layers within the Same File" --- -The `Spec` data type has a very nice mechanism to share layers within all tests in a suite. So instead of acquiring and releasing dependencies for each test, we can share the layer within all tests. The test framework acquires that layer for once and shares that between all tests. When the execution of all tests is finished, that layer will be released. To share layers between multiple specs we can use one of the provide methods ending with `Shared` (`provideShared`/`provideCustomShared`/`provideSomeShared`/`provideLayerShared`/`provideCustomLayerShared`/`provideSomeLayerShared`): +The `Spec` data type has a mechanism to share layers within all tests in a suite. Instead of acquiring and releasing dependencies for each test, we can share the layer within all tests of a given suite. The suite acquires that layer for once and shares that between all tests. When the execution of all tests in that suite is finished, the layer will be released. To share layers between multiple specs we can use one of the provide methods ending with `Shared`: +- `provideShared` +- `provideCustomShared` +- `provideSomeShared` +- `provideLayerShared` +- `provideCustomLayerShared` +- `provideSomeLayerShared` ```scala { diff --git a/internal-macros/src/main/scala/zio/internal/TerminalRendering.scala b/internal-macros/src/main/scala/zio/internal/TerminalRendering.scala index 3e97ba9524d5..990d1bda24e7 100644 --- a/internal-macros/src/main/scala/zio/internal/TerminalRendering.scala +++ b/internal-macros/src/main/scala/zio/internal/TerminalRendering.scala @@ -77,7 +77,7 @@ object TerminalRendering { if (isUsingProvideSome) { s""" Alternatively, you may add them to the remainder type ascription: - + ${methodName("provideSome")}[${allMissingTypes.map(_.magenta.bold).mkString(" & ")}] """ @@ -92,7 +92,7 @@ object TerminalRendering { | ${message.bold} | |$errors - |$provideSomeSuggestion + |$provideSomeSuggestion |${line.red} | |""".stripMargin diff --git a/managed/shared/src/main/scala-3/zio/managed/ZManagedVersionSpecific.scala b/managed/shared/src/main/scala-3/zio/managed/ZManagedVersionSpecific.scala index b3890b410f48..5f7dcfcccd42 100644 --- a/managed/shared/src/main/scala-3/zio/managed/ZManagedVersionSpecific.scala +++ b/managed/shared/src/main/scala-3/zio/managed/ZManagedVersionSpecific.scala @@ -41,7 +41,7 @@ object ZManagedMacros { schedule: Expr[ZManaged[R, E, A]], layer: Expr[Seq[ZLayer[_, E, _]]] )(using Quotes): Expr[ZManaged[R0, E, A]] = { - val layerExpr = LayerMacros.constructLayer[R0, R, E](layer) + val layerExpr = LayerMacros.constructStaticProvideLayer[R0, R, E](layer) '{ $schedule.provideLayer($layerExpr.asInstanceOf[ZLayer[R0, E, R]]) } diff --git a/managed/shared/src/main/scala/zio/managed/ZManaged.scala b/managed/shared/src/main/scala/zio/managed/ZManaged.scala index 2a7a7ea49d3a..ee009d2c1ce8 100644 --- a/managed/shared/src/main/scala/zio/managed/ZManaged.scala +++ b/managed/shared/src/main/scala/zio/managed/ZManaged.scala @@ -2432,7 +2432,7 @@ object ZManaged extends ZManagedPlatformSpecific { * [[ZManaged]] to the `apply` method will create (inside an effect) a managed * resource which is already acquired and cannot fail. */ - abstract class PreallocationScope { + sealed abstract class PreallocationScope { def apply[R, E, A](managed: => ZManaged[R, E, A]): ZIO[R, E, Managed[Nothing, A]] } @@ -2507,7 +2507,7 @@ object ZManaged extends ZManagedPlatformSpecific { * managed resource to the `apply` method will return an effect that allocates * the resource and returns it with an early-release handle. */ - abstract class Scope { + sealed abstract class Scope { def apply[R, E, A](managed: => ZManaged[R, E, A]): ZIO[R, E, (ZManaged.Finalizer, A)] } diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 4b1ad86f5703..e2673bc9aae9 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -177,7 +177,7 @@ object BuildHelper { case Some((2, 13)) => List("2.13+", "2.12-2.13") case Some((3, _)) => - List("2.13+") + List("2.13+", "3") case _ => List() } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a8bd08bdfff9..3843d0182f21 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,7 +3,7 @@ import sbt.* object Dependencies { // Runtime dependencies val JunitVersion = "4.13.2" - val JunitPlatformEngineVersion = "1.12.1" + val JunitPlatformEngineVersion = "1.12.2" val IzumiReflectVersion = "3.0.1" val MagnoliaScala2Version = "1.1.10" val MagnoliaScala3Version = "1.3.16" @@ -16,7 +16,7 @@ object Dependencies { val ScalaJsDomVersion = "2.8.0" // Documentations and example dependencies - val CatsEffectVersion = "3.6.0" + val CatsEffectVersion = "3.6.1" val DoobieVersion = "1.0.0-RC5" val Fs2Version = "3.12.0" val Http4sVersion = "0.23.29" diff --git a/project/MimaSettings.scala b/project/MimaSettings.scala index 36ed9a008fb1..69fd86d52bc3 100644 --- a/project/MimaSettings.scala +++ b/project/MimaSettings.scala @@ -34,7 +34,10 @@ object MimaSettings { exclude[NewMixinForwarderProblem]("zio.Exit.mapBoth"), exclude[NewMixinForwarderProblem]("zio.Exit.mapError"), exclude[NewMixinForwarderProblem]("zio.Exit.mapErrorCause"), - exclude[NewMixinForwarderProblem]("zio.Exit.unit") + exclude[NewMixinForwarderProblem]("zio.Exit.unit"), + exclude[Problem]("zio.Promise#internal*"), + exclude[Problem]("zio.Promise$internal*"), + exclude[Problem]("zio.Queue#Strategy*.shutdown") ), mimaFailOnProblem := failOnProblem ) diff --git a/project/plugins.sbt b/project/plugins.sbt index 0551c9d268bd..91e20015fa8f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ -addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "2.0.9") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.14.2") +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "2.0.10") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.14.3") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.13.1") addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0") @@ -10,8 +10,8 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.3.2") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2") -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.18.2") -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.6.5") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.19.0") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.7.1") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.6") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.4") addSbtPlugin("pl.project13.scala" % "sbt-jcstress" % "0.2.0") diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZPipelineSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZPipelineSpec.scala index f2b32361c730..56dfb47f426f 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZPipelineSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZPipelineSpec.scala @@ -2,8 +2,9 @@ package zio.stream import zio._ import zio.stream.encoding.EncodingException -import zio.test.Assertion.{equalTo, fails} +import zio.test.Assertion.{equalTo, fails, isEmpty} import zio.test._ + import scala.io.Source object ZPipelineSpec extends ZIOBaseSpec { @@ -294,7 +295,103 @@ object ZPipelineSpec extends ZIOBaseSpec { } .runCollect .map(assert(_)(equalTo(Chunk.range(1, 21)))) - } + }, + suite("mapEitherChunk")( + test("with empty chunk") { + val chunk = Chunk.empty[Int] + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapEitherChunked(i => Right(i))) + .run(ZSink.collectAll) + } yield assert(result)(isEmpty) + }, + test("with a 1 element chunk - Right") { + val chunk = Chunk(1) + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapEitherChunked(i => Right(i))) + .run(ZSink.collectAll) + } yield assert(result)(equalTo(Chunk(1))) + }, + test("with a 1 element chunk - Left") { + val chunk = Chunk(1) + for { + collector <- Queue.unbounded[Int] + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapEitherChunked(i => Left(i))) + .run(ZSink.fromQueue(collector)) + .exit + collected <- collector.takeAll + } yield assert(result)(fails(equalTo(1))) && assert(collected)(isEmpty) + }, + test("Keeps order and values intact") { + val range = 1.to(10) + val chunk = Chunk.fromIterable(range) + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapEitherChunked(i => Right(i))) + .run(ZSink.collectAll) + } yield assert(result)(equalTo(Chunk(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))) + }, + test("stop at the first Left") { + val range = 1.to(10) + val chunk = Chunk.fromIterable(range) + for { + collector <- Queue.unbounded[Int] + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapEitherChunked(i => if (i == 5) Left(i) else Right(i))) + .run(ZSink.fromQueue(collector)) + .exit + collected <- collector.takeAll + } yield assert(result)(fails(equalTo(5))) && assert(collected)(equalTo(Chunk(1, 2, 3, 4))) + } + ), + suite("mapChunksEither")( + test("with empty chunk - Right") { + val chunk = Chunk.empty[Int] + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapChunksEither(chunk => Right(chunk))) + .run(ZSink.collectAll) + } yield assert(result)(isEmpty) + }, + test("with empty chunk - Left") { + val chunk = Chunk.empty[Int] + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapChunksEither(chunk => Left(chunk))) + .run(ZSink.collectAll) + } yield assert(result)(isEmpty) + }, + test("fails with the err in the Left") { + val chunk = Chunk.range(0, 10) + for { + collector <- Queue.unbounded[Int] + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapChunksEither(_ => Left("this is an error"))) + .run(ZSink.fromQueue(collector)) + .exit + collected <- collector.takeAll + } yield assert(result)(fails(equalTo("this is an error"))) && assert(collected)(isEmpty) + }, + test("returns the chunk in the Right") { + val chunk = Chunk.range(1, 11) + for { + result <- ZStream + .fromChunk(chunk) + .via(ZPipeline.mapChunksEither(chunk => Right(chunk.map(_ * 10)))) + .run(ZSink.collectAll) + } yield assert(result)(equalTo(Chunk(10, 20, 30, 40, 50, 60, 70, 80, 90, 100))) + } + ) ) val weirdStringGenForSplitLines: Gen[Any, Chunk[String]] = Gen diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZSinkSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZSinkSpec.scala index d8b1195112f3..1cb1244491f6 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZSinkSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZSinkSpec.scala @@ -829,7 +829,7 @@ object ZSinkSpec extends ZIOBaseSpec { suite("fromQueueWithShutdown")( test("should enqueue all elements and shutsdown queue") { - def createQueueSpy[A](q: Queue[A]) = new Queue[A] { + def createQueueSpy[A](q: Queue[A]) = new Queue.Internal[A] { @volatile private var isShutDown = false diff --git a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala index 98a722109237..cd05965a42d5 100644 --- a/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala +++ b/streams-tests/shared/src/test/scala/zio/stream/ZStreamSpec.scala @@ -5396,7 +5396,20 @@ object ZStreamSpec extends ZIOBaseSpec { .map(_.foldLeft(assertCompletes)(_ && _)) } @@ withLiveClock // Can't emulate the bug with the TestClock unfortunately - @@ exceptJS(nonFlaky(20)) + @@ exceptJS(nonFlaky(20)), + test("environment") { + for { + streamEnv <- Ref.make[String]("") + finalizerEnv <- Ref.make[String]("") + _ <- ZStream + .serviceWithZIO[String](streamEnv.set) + .ensuring(ZIO.serviceWithZIO[String](finalizerEnv.set)) + .provideEnvironment(ZEnvironment("env")) + .runDrain + streamEnv <- streamEnv.get + finalizerEnv <- finalizerEnv.get + } yield assertTrue(streamEnv == "env", finalizerEnv == "env") + } ), suite("from")( test("Chunk") { diff --git a/streams/shared/src/main/scala-3/zio.stream/ZStreamVersionSpecific.scala b/streams/shared/src/main/scala-3/zio.stream/ZStreamVersionSpecific.scala index ceb7e500f4a5..d160f93bfe91 100644 --- a/streams/shared/src/main/scala-3/zio.stream/ZStreamVersionSpecific.scala +++ b/streams/shared/src/main/scala-3/zio.stream/ZStreamVersionSpecific.scala @@ -21,7 +21,7 @@ private[stream] object ZStreamProvideMacro { zstream: Expr[ZStream[R, E, A]], layer: Expr[Seq[ZLayer[_, E, _]]] )(using Quotes): Expr[ZStream[R0, E, A]] = { - val layerExpr = LayerMacros.constructLayer[R0, R, E](layer) + val layerExpr = LayerMacros.constructStaticProvideLayer[R0, R, E](layer) '{ $zstream.provideLayer($layerExpr.asInstanceOf[ZLayer[R0, E, R]]) } } } diff --git a/streams/shared/src/main/scala/zio/stream/SubscriptionRef.scala b/streams/shared/src/main/scala/zio/stream/SubscriptionRef.scala index e971f0c5ff3b..289c29d408b4 100644 --- a/streams/shared/src/main/scala/zio/stream/SubscriptionRef.scala +++ b/streams/shared/src/main/scala/zio/stream/SubscriptionRef.scala @@ -23,7 +23,7 @@ import zio.stacktracer.TracingImplicits.disableAutoTrace * A `SubscriptionRef[A]` is a `Ref` that can be subscribed to in order to * receive the current value as well as all changes to the value. */ -sealed trait SubscriptionRef[A] extends Ref.Synchronized[A] { +sealed trait SubscriptionRef[A] extends Ref.Synchronized.Internal[A] { /** * A stream containing the current value of the `Ref` as well as all changes diff --git a/streams/shared/src/main/scala/zio/stream/ZChannel.scala b/streams/shared/src/main/scala/zio/stream/ZChannel.scala index 65f17137403b..bd154279e2cd 100644 --- a/streams/shared/src/main/scala/zio/stream/ZChannel.scala +++ b/streams/shared/src/main/scala/zio/stream/ZChannel.scala @@ -2,8 +2,8 @@ package zio.stream import zio.{ZIO, _} import zio.stacktracer.TracingImplicits.disableAutoTrace +import zio.stream.internal.ChannelExecutor.ChannelState import zio.stream.internal.{AsyncInputConsumer, AsyncInputProducer, ChannelExecutor, SingleProducerAsyncInput} -import ChannelExecutor.ChannelState import java.util.concurrent.atomic.AtomicReference @@ -1299,27 +1299,31 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon * emitted element. */ final def toPull(implicit trace: Trace): ZIO[Env with Scope, Nothing, ZIO[Env, OutErr, Either[OutDone, OutElem]]] = - ZIO.scope.flatMap(scope => self.toPullIn(scope)) + ZIO.scopeWith(self.toPullIn(_)) final def toPullIn( scope: => Scope )(implicit trace: Trace): ZIO[Env, Nothing, ZIO[Env, OutErr, Either[OutDone, OutElem]]] = - ZIO.uninterruptible { + ZIO.suspendSucceed { + val scope0 = scope + val exec = new ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( - () => self, - null, - identity[URIO[Env, Any]] + initialChannel = () => self, + providedEnv = null, + executeCloseLastSubstream = ZIO.identityFn ) - for { - environment <- ZIO.environment[Env] - scope <- ZIO.succeed(scope) - _ <- scope.addFinalizerExit { exit => - val finalizer = exec.close(exit) - if (finalizer ne null) finalizer.provideEnvironment(environment) - else ZIO.unit - } - } yield exec - }.map { exec => + + val setFinalizer: ZIO[Env, Nothing, Unit] = + ZIO + .environmentWithZIO[Env] { environment => + scope0.addFinalizerExit { exit => + val finalizer = exec.close(exit) + if (finalizer ne null) finalizer.provideEnvironment(environment) + else Exit.unit + } + } + .uninterruptible + def interpret( channelState: ChannelExecutor.ChannelState[Env, OutErr] ): ZIO[Env, OutErr, Either[OutDone, OutElem]] = @@ -1333,34 +1337,45 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon ZIO.succeed(Right(exec.getEmit)) case ChannelState.Effect(zio) => zio *> interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]) - case r @ ChannelState.Read(upstream, onEffect, onEmit, onDone) => + case r: ChannelState.Read[Env, OutErr] => + val continue = () => interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]) + ChannelExecutor.readUpstream[Env, OutErr, OutErr, Either[OutDone, OutElem]]( - r.asInstanceOf[ChannelState.Read[Env, OutErr]], - () => interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]), - Exit.failCause + r = r, + onSuccess = continue, + onFailure = Exit.failCause ) } - ZIO.suspendSucceed(interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]])) + val pull: ZIO[Env, OutErr, Either[OutDone, OutElem]] = + ZIO.suspendSucceed(interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]])) + + setFinalizer.as(pull) } final def toPullInAlt( scope: => Scope )(implicit trace: Trace): ZIO[Env, Nothing, ZIO[Env, Either[OutErr, OutDone], OutElem]] = - ZIO.uninterruptible { + ZIO.suspendSucceed { + val scope0 = scope + val exec = new ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( - () => self, - null, - ZIO.identityFn[URIO[Env, Any]] + initialChannel = () => self, + providedEnv = null, + executeCloseLastSubstream = ZIO.identityFn[URIO[Env, Any]] ) - ZIO.environmentWithZIO[Env] { environment => - scope.addFinalizerExit { exit => - val finalizer = exec.close(exit) - if (finalizer ne null) finalizer.provideEnvironment(environment) - else ZIO.unit - } - } *> Exit.succeed(exec) - }.map { exec => + + val setFinalizer: ZIO[Env, Nothing, Unit] = + ZIO + .environmentWithZIO[Env] { environment => + scope0.addFinalizerExit { exit => + val finalizer = exec.close(exit) + if (finalizer ne null) finalizer.provideEnvironment(environment) + else Exit.unit + } + } + .uninterruptible + def interpret( channelState: ChannelExecutor.ChannelState[Env, OutErr] ): ZIO[Env, Either[OutErr, OutDone], OutElem] = @@ -1373,16 +1388,24 @@ sealed trait ZChannel[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDon case ChannelState.Emit => Exit.succeed(exec.getEmit) case ChannelState.Effect(zio) => - zio.mapError(Left(_)) *> interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]) - case r @ ChannelState.Read(upstream, onEffect, onEmit, onDone) => + zio.foldZIO( + failure = e => Exit.fail(Left(e)), + success = _ => interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]) + ) + case r: ChannelState.Read[Env, OutErr] => + val continue = () => interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]) + ChannelExecutor.readUpstream[Env, OutErr, Either[OutErr, OutDone], OutElem]( - r.asInstanceOf[ChannelState.Read[Env, OutErr]], - () => interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]]), - Exit.failCause(_).mapError(Left(_)) + r = r, + onSuccess = continue, + onFailure = Exit.failCause(_).mapError(Left(_)) ) } - ZIO.suspendSucceed(interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]])) + val pull: ZIO[Env, Either[OutErr, OutDone], OutElem] = + ZIO.suspendSucceed(interpret(exec.run().asInstanceOf[ChannelState[Env, OutErr]])) + + setFinalizer.as(pull) } /** Converts this channel to a [[ZPipeline]] */ diff --git a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala index 9393d4b11cfe..099de5c07424 100644 --- a/streams/shared/src/main/scala/zio/stream/ZPipeline.scala +++ b/streams/shared/src/main/scala/zio/stream/ZPipeline.scala @@ -21,19 +21,9 @@ import zio.internal.SingleThreadedRingBuffer import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.encoding.EncodingException import zio.stream.internal.CharacterSet.{BOM, CharsetUtf32BE, CharsetUtf32LE} -import zio.stream.internal.SingleProducerAsyncInput - -import java.nio.{Buffer, ByteBuffer, CharBuffer} -import java.nio.charset.{ - CharacterCodingException, - Charset, - CharsetDecoder, - CoderResult, - MalformedInputException, - StandardCharsets, - UnmappableCharacterException -} -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} + +import java.nio.charset._ +import java.nio.{ByteBuffer, CharBuffer} /** * A `ZPipeline[Env, Err, In, Out]` is a polymorphic stream transformer. @@ -1777,6 +1767,29 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { )(implicit trace: Trace): ZPipeline[Env, Err, In, Out] = new ZPipeline(ZChannel.identity[Nothing, Chunk[In], Any].mapOutZIO(f)) + /** + * Creates a pipeline that maps chunks of elements with the specified + * function. + * + * Will stop on the first Left found + */ + def mapChunksEither[Env, Err, In, Out]( + f: Chunk[In] => Either[Err, Chunk[Out]] + )(implicit trace: Trace): ZPipeline[Env, Err, In, Out] = { + lazy val reader: ZChannel[Env, Err, Chunk[In], Any, Err, Chunk[Out], Any] = + ZChannel.readWithCause( + chunk => + f(chunk) match { + case r: Right[?, Chunk[Out]] => ZChannel.write(r.value) *> reader + case l: Left[Err, ?] => ZChannel.refailCause(Cause.fail(l.value)) + }, + err => ZChannel.refailCause(err), + done => ZChannel.succeed(done) + ) + + new ZPipeline(reader) + } + /** * Creates a pipeline that maps elements with the specified function that * returns a stream. @@ -1785,9 +1798,61 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors { f: In => ZStream[Env, Err, Out] )(implicit trace: Trace): ZPipeline[Env, Err, In, Out] = new ZPipeline( - ZChannel.identity[Nothing, Chunk[In], Any].concatMap(_.map(f).map(_.channel).fold(ZChannel.unit)(_ *> _)) + ZChannel + .identity[Nothing, Chunk[In], Any] + .concatMap( + _.foldLeft(ZChannel.unit: ZChannel[Env, Any, Any, Any, Err, Chunk[Out], Any])((acc, elem) => + acc *> f(elem).channel + ) + ) ) + /** + * Creates a pipeline that maps elements with the specified function. + * + * Will stop on the first Left found + */ + def mapEitherChunked[Env, Err, In, Out]( + f: In => Either[Err, Out] + )(implicit trace: Trace): ZPipeline[Env, Err, In, Out] = { + lazy val reader: ZChannel[Env, Err, Chunk[In], Any, Err, Chunk[Out], Any] = + ZChannel.readWithCause( + chunk => { + val size = chunk.size + + if (size == 0) reader + else if (size == 1) { + val a = chunk.head + + f(a) match { + case r: Right[?, Out] => ZChannel.write(Chunk.single(r.value)) *> reader + case l: Left[Err, ?] => ZChannel.refailCause(Cause.fail(l.value)) + } + } else { + val builder: ChunkBuilder[Out] = ChunkBuilder.make[Out](chunk.size) + val iterator: Iterator[In] = chunk.iterator + var error: Err = null.asInstanceOf[Err] + + while (iterator.hasNext && (error == null)) { + val a = iterator.next() + f(a) match { + case r: Right[?, Out] => builder.addOne(r.value) + case l: Left[Err, ?] => error = l.value + } + } + + val values = builder.result() + val next = if (error == null) reader else ZChannel.refailCause(Cause.fail(error)) + if (values.nonEmpty) ZChannel.write(values) *> next else next + } + }, + err => ZChannel.refailCause(err), + done => ZChannel.succeed(done) + ) + + new ZPipeline(reader) + } + /** * Creates a pipeline that maps elements with the specified effectful * function. diff --git a/streams/shared/src/main/scala/zio/stream/ZStream.scala b/streams/shared/src/main/scala/zio/stream/ZStream.scala index 335b77cebd78..e02aab0f1762 100644 --- a/streams/shared/src/main/scala/zio/stream/ZStream.scala +++ b/streams/shared/src/main/scala/zio/stream/ZStream.scala @@ -3523,24 +3523,10 @@ final class ZStream[-R, +E, +A] private (val channel: ZChannel[R, Any, Any, Any, * Converts this stream into a `scala.collection.Iterator` wrapped in a scoped * [[ZIO]]. The returned iterator will only be valid within the scope. */ - def toIterator(implicit trace: Trace): ZIO[R with Scope, Nothing, Iterator[Either[E, A]]] = - for { - runtime <- ZIO.runtime[R] - pull <- toPull - } yield { - def unfoldPull: Iterator[Either[E, A]] = - runtime.unsafe.run(pull)(trace, Unsafe.unsafe) match { - case Exit.Success(chunk) => chunk.iterator.map(Right(_)) ++ unfoldPull - case Exit.Failure(cause) => - cause.failureOrCause match { - case Left(None) => Iterator.empty - case Left(Some(e)) => Iterator.single(Left(e)) - case Right(c) => throw FiberFailure(c) - } - } - - unfoldPull - } + def toIterator(implicit trace: Trace): ZIO[R with Scope, Nothing, Iterator[Either[E, A]]] = for { + runtime <- ZIO.runtime[R] + pull <- either.toPull + } yield unfoldPull(runtime, pull)(trace, Unsafe.unsafe).flatten /** * Returns in a scope a ZIO effect that can be used to repeatedly pull chunks @@ -6205,7 +6191,7 @@ object ZStream extends ZStreamPlatformSpecificConstructors { } private def mapDequeue[A, B](dequeue: Dequeue[A])(f: A => B): Dequeue[B] = - new Dequeue[B] { + new Dequeue.Internal[B] { def awaitShutdown(implicit trace: Trace): UIO[Unit] = dequeue.awaitShutdown def capacity: Int = diff --git a/streams/shared/src/main/scala/zio/stream/internal/ChannelExecutor.scala b/streams/shared/src/main/scala/zio/stream/internal/ChannelExecutor.scala index 33f218b2889d..63b9cef51d51 100644 --- a/streams/shared/src/main/scala/zio/stream/internal/ChannelExecutor.scala +++ b/streams/shared/src/main/scala/zio/stream/internal/ChannelExecutor.scala @@ -8,7 +8,7 @@ import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.collection.mutable.Stack -private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( +private[zio] final class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]( initialChannel: () => ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone], @volatile private var providedEnv: ZEnvironment[Any], executeCloseLastSubstream: URIO[Env, Any] => URIO[Env, Any] @@ -21,7 +21,7 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, val currInput = input input = prev - if (currInput ne null) currInput.close(exit) else ZIO.unit + if (currInput ne null) currInput.close(exit) else Exit.unit } private[this] final def popAllFinalizers( @@ -176,12 +176,7 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, leftExec.input = previousInput input = leftExec - addFinalizer { exit => - val effect = restorePipe(exit, previousInput) - - if (effect ne null) effect - else ZIO.unit - } + addFinalizer(exit => restorePipe(exit, previousInput)) currentChannel = right().asInstanceOf[Channel[Env]] @@ -260,12 +255,7 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, val previousInput = input input = null - addFinalizer { exit => - val effect = restorePipe(exit, previousInput) - - if (effect ne null) effect - else ZIO.unit - } + addFinalizer(exit => restorePipe(exit, previousInput)) currentChannel = nextChannel case ZChannel.Bridge(bridgeInput, channel) => @@ -311,16 +301,8 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, } result = ChannelState.Effect( - drainer.forkDaemon.flatMap { fiber => - ZIO.succeed(addFinalizer { exit => - fiber.interrupt *> - ZIO.suspendSucceed { - val effect = restorePipe(exit, inputExecutor) - - if (effect ne null) effect - else ZIO.unit - } - }) + drainer.forkDaemon.map { fiber => + addFinalizer(exit => fiber.interrupt *> restorePipe(exit, inputExecutor)) } ) } @@ -468,8 +450,7 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem, ): URIO[Env, Any] = if (finalizers.isEmpty) null else - ZIO - .foreach(finalizers)(_.apply(ex).exit) + provide(ZIO.foreach(finalizers)(_.apply(ex).exit)) .map(results => Exit.collectAll(results) getOrElse Exit.unit) .unexit diff --git a/streams/shared/src/main/scala/zio/stream/internal/ZInputStream.scala b/streams/shared/src/main/scala/zio/stream/internal/ZInputStream.scala index b0a882b05fc0..373fb7f7ba96 100644 --- a/streams/shared/src/main/scala/zio/stream/internal/ZInputStream.scala +++ b/streams/shared/src/main/scala/zio/stream/internal/ZInputStream.scala @@ -16,7 +16,7 @@ package zio.stream.internal -import zio.{Chunk, Exit, FiberFailure, Runtime, Trace, Unsafe, ZIO} +import zio._ import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec @@ -108,20 +108,9 @@ private[zio] class ZInputStream(private var chunks: Iterator[Chunk[Byte]]) exten } private[zio] object ZInputStream { - def fromPull[R](runtime: Runtime[R], pull: ZIO[R, Option[Throwable], Chunk[Byte]])(implicit - trace: Trace - ): ZInputStream = { - def unfoldPull: Iterator[Chunk[Byte]] = - runtime.unsafe.run(pull)(trace, Unsafe.unsafe) match { - case Exit.Success(chunk) => Iterator.single(chunk) ++ unfoldPull - case Exit.Failure(cause) => - cause.failureOrCause match { - case Left(None) => Iterator.empty - case Left(Some(e)) => throw e - case Right(c) => throw FiberFailure(c) - } - } - - new ZInputStream(Iterator.empty ++ unfoldPull) - } + def fromPull[R]( + runtime: Runtime[R], + pull: ZIO[R, Option[Throwable], Chunk[Byte]] + )(implicit trace: Trace): ZInputStream = + new ZInputStream(Iterator.empty ++ stream.unfoldPull(runtime, pull)(trace, Unsafe.unsafe)) } diff --git a/streams/shared/src/main/scala/zio/stream/internal/ZReader.scala b/streams/shared/src/main/scala/zio/stream/internal/ZReader.scala index 041af35452b5..850f80031d29 100644 --- a/streams/shared/src/main/scala/zio/stream/internal/ZReader.scala +++ b/streams/shared/src/main/scala/zio/stream/internal/ZReader.scala @@ -16,7 +16,7 @@ package zio.stream.internal -import zio.{Chunk, Exit, FiberFailure, Runtime, Trace, Unsafe, ZIO} +import zio._ import zio.stacktracer.TracingImplicits.disableAutoTrace import scala.annotation.tailrec @@ -105,20 +105,9 @@ private[zio] class ZReader(private var chunks: Iterator[Chunk[Char]]) extends ja } private[zio] object ZReader { - def fromPull[R](runtime: Runtime[R], pull: ZIO[R, Option[Throwable], Chunk[Char]])(implicit - trace: Trace - ): ZReader = { - def unfoldPull: Iterator[Chunk[Char]] = - runtime.unsafe.run(pull)(trace, Unsafe.unsafe) match { - case Exit.Success(chunk) => Iterator.single(chunk) ++ unfoldPull - case Exit.Failure(cause) => - cause.failureOrCause match { - case Left(None) => Iterator.empty - case Left(Some(e)) => throw e - case Right(c) => throw FiberFailure(c) - } - } - - new ZReader(Iterator.empty ++ unfoldPull) - } + def fromPull[R]( + runtime: Runtime[R], + pull: ZIO[R, Option[Throwable], Chunk[Char]] + )(implicit trace: Trace): ZReader = + new ZReader(Iterator.empty ++ stream.unfoldPull(runtime, pull)(trace, Unsafe.unsafe)) } diff --git a/streams/shared/src/main/scala/zio/stream/package.scala b/streams/shared/src/main/scala/zio/stream/package.scala index 95deb354e529..d8daa6338184 100644 --- a/streams/shared/src/main/scala/zio/stream/package.scala +++ b/streams/shared/src/main/scala/zio/stream/package.scala @@ -24,4 +24,19 @@ package object stream { type UStream[+A] = ZStream[Any, Nothing, A] type Sink[+OutErr, -In, +L, +Z] = ZSink[Any, OutErr, In, L, Z] + + private[stream] def unfoldPull[R, A]( + runtime: Runtime[R], + pull: ZIO[R, Option[Throwable], A] + )(implicit trace: Trace, unsafe: Unsafe): Iterator[A] = + runtime.unsafe.run(pull) match { + case Exit.Success(value) => + Iterator.single(value) ++ unfoldPull(runtime, pull) + case Exit.Failure(cause) => + cause.failureOrCause match { + case Left(None) => Iterator.empty + case Left(Some(e)) => Exit.fail(e).getOrThrow() + case Right(c) => Exit.failCause(c).getOrThrowFiberFailure() + } + } } diff --git a/test-junit-engine-tests/maven/src/test/scala/zio/test/junit/maven/TaggedSpec.scala b/test-junit-engine-tests/maven/src/test/scala/zio/test/junit/maven/TaggedSpec.scala new file mode 100644 index 000000000000..2a73649b27c6 --- /dev/null +++ b/test-junit-engine-tests/maven/src/test/scala/zio/test/junit/maven/TaggedSpec.scala @@ -0,0 +1,22 @@ +package zio.test.junit.maven + +import zio.test.junit._ +import zio.test._ +import zio.test.Assertion._ + +object TaggedSpec extends ZIOSpecDefault { + override def spec = suite("TaggedSpec")( + test("should run for tag tagged") { + assert(12)(equalTo(12)) + }, + test("should run for tag a and tagged") { + assert(12)(equalTo(12)) + } @@ TestAspect.tag("a"), + test("should run for tag b and tagged") { + assert(12)(equalTo(12)) + } @@ TestAspect.tag("b"), + test("should run for tag a b tagged") { + assert(12)(equalTo(12)) + } @@ TestAspect.tag("a") @@ TestAspect.tag("b"), + ) @@ TestAspect.tag("tagged") +} diff --git a/test-junit-engine-tests/src/test/scala/zio/test/junit/MavenJunitSpec.scala b/test-junit-engine-tests/src/test/scala/zio/test/junit/MavenJunitSpec.scala index f6daf2da0239..3440622d5a34 100644 --- a/test-junit-engine-tests/src/test/scala/zio/test/junit/MavenJunitSpec.scala +++ b/test-junit-engine-tests/src/test/scala/zio/test/junit/MavenJunitSpec.scala @@ -13,7 +13,7 @@ import scala.xml.XML * when running from IDE run `sbt publishM2`, copy the snapshot version the * artifacts were published under (something like: * `1.0.2+0-37ee0765+20201006-1859-SNAPSHOT`) and put this into `VM Parameters`: - * `-Dproject.dir=\$PROJECT_DIR\$/test-junit-tests/jvm + * `-Dproject.dir=\$PROJECT_DIR\$/test-junit-engine-tests * -Dproject.version=\$snapshotVersion` */ object MavenJunitSpec extends ZIOSpecDefault { @@ -40,6 +40,21 @@ object MavenJunitSpec extends ZIOSpecDefault { ) && assertTrue(reportDefect.length == 1) // spec with defect is reported } + }, + test("Spec tags are exposed to junit") { + ZIO + .foreach(Seq(Set("a") -> 2, Set("b") -> 2, Set("tagged") -> 4, Set("a", "b") -> 3)) { + case (tags, expectedTests) => + for { + mvn <- makeMaven + mvnResult <- mvn.clean() *> mvn.test(tags) + report <- mvn.parseSurefireReport("zio.test.junit.maven.TaggedSpec") + } yield { + assert(mvnResult)(equalTo(0)) && + assert(report.length)(equalTo(expectedTests)) + } + } + .map(TestResult.allSuccesses) } ) @@ TestAspect.sequential /*@@ // flaky: sometimes maven fails to download dependencies in CI @@ -52,7 +67,7 @@ object MavenJunitSpec extends ZIOSpecDefault { .orElseFail( new AssertionError( "Missing project.dir system property\n" + - "when running from IDE put this into `VM Parameters`: `-Dproject.dir=$PROJECT_DIR$/test-junit-tests/jvm`" + "when running from IDE put this into `VM Parameters`: `-Dproject.dir=$PROJECT_DIR$/test-junit-engine-tests`" ) ) projectVer <- @@ -75,8 +90,14 @@ object MavenJunitSpec extends ZIOSpecDefault { def clean(): Task[Int] = run("clean") - def test(): Task[Int] = run( + def test(tags: Set[String] = Set.empty): Task[Int] = run( "test", + if (tags.isEmpty) { + // only run tests without tags + "-Dgroups=none()" + } else { + s"-Dgroups=${tags.mkString(",")}" + }, s"-Dzio.version=$projectVersion", s"-Dscala.version=$scalaVersion", s"-Dscala.compat.version=$scalaCompatVersion", diff --git a/test-junit-engine/src/main/scala/zio/test/junit/ZIOTestDescriptor.scala b/test-junit-engine/src/main/scala/zio/test/junit/ZIOTestDescriptor.scala index 4fd06ab334a4..f58b6fb19960 100644 --- a/test-junit-engine/src/main/scala/zio/test/junit/ZIOTestDescriptor.scala +++ b/test-junit-engine/src/main/scala/zio/test/junit/ZIOTestDescriptor.scala @@ -16,9 +16,12 @@ package zio.test.junit -import org.junit.platform.engine.{TestDescriptor, UniqueId} import org.junit.platform.engine.support.descriptor.AbstractTestDescriptor -import zio.test.TestAnnotationMap +import org.junit.platform.engine.{TestDescriptor, TestTag, UniqueId} +import zio.test.{TestAnnotation, TestAnnotationMap} + +import java.util +import scala.jdk.CollectionConverters._ /** * Describes a JUnit 5 test descriptor for a single ZIO tests. @@ -45,6 +48,9 @@ class ZIOTestDescriptor( ) extends AbstractTestDescriptor(uniqueId, label, ZIOTestSource(testClass, annotations)) { setParent(parent) override def getType: TestDescriptor.Type = TestDescriptor.Type.TEST + + override def getTags: util.Set[TestTag] = + annotations.get(TestAnnotation.tagged).map(TestTag.create).asJava } object ZIOTestDescriptor { diff --git a/test-sbt/jvm/src/test/scala/zio/test/sbt/ZTestEventSpec.scala b/test-sbt/jvm/src/test/scala/zio/test/sbt/ZTestEventSpec.scala index 850454b3c5fb..ed255776ae5e 100644 --- a/test-sbt/jvm/src/test/scala/zio/test/sbt/ZTestEventSpec.scala +++ b/test-sbt/jvm/src/test/scala/zio/test/sbt/ZTestEventSpec.scala @@ -63,7 +63,7 @@ object ZTestEventSpec extends ZIOSpecDefault { Some( new Exception( s"""| ${ConsoleUtils.bold(red("- test"))} - | Exception in thread "zio-fiber-" java.lang.String: boom""".stripMargin + | java.lang.String: boom""".stripMargin ) ), 0L, diff --git a/test-tests/jvm-native/src/test/scala/zio/test/results/ResultFileOpsJsonSpec.scala b/test-tests/jvm-native/src/test/scala/zio/test/results/ResultFileOpsJsonSpec.scala index a97bc8032082..0626fef937d1 100644 --- a/test-tests/jvm-native/src/test/scala/zio/test/results/ResultFileOpsJsonSpec.scala +++ b/test-tests/jvm-native/src/test/scala/zio/test/results/ResultFileOpsJsonSpec.scala @@ -7,66 +7,56 @@ import java.nio.file.Path object ResultFileOpsJsonSpec extends ZIOBaseSpec { def spec = suite("ResultFileOpsJsonSpec")( - test("simple write")( + test("trailing comma from last entry is removed")( for { - _ <- writeToTestFile("a") - results <- readFile - } yield assertTrue(results == List("a")) - ).provide(test, Scope.default), - test("clobbered concurrent writes") { - val linesToWrite = - List( - "a", - "b", - "c", - "d", - "e" - ).map(_ * 100) + path <- writeToTestFile(parallel = false)("\naaa", "\nbbb", "\nccc") + results <- readFile(path) + } yield assertTrue( + results(0) == "{", + results(1) == " \"results\": [", + results(2) == "aaa", + results(3) == "bbb", + results(4) == "ccc", + results(5) == " ]", + results(6) == "}" + ) + ), + test("trailing comma from last entry is removed")( for { - _ <- - ZIO.foreachPar( - linesToWrite - )(writeToTestFile) - results <- readFile - } yield assertTrue(linesToWrite.forall(results.contains(_))) - } - .provide(test, Scope.default) - @@ TestAspect.nonFlaky, - test("generated concurrent writes clean") { - checkN(10)(Gen.listOfN(3)(Gen.alphaNumericStringBounded(0, 700))) { linesToWrite => + path <- writeToTestFile(parallel = false)("\naaa,", "\nbbb,", "\nccc,") + results <- readFile(path) + } yield assertTrue( + results(0) == "{", + results(1) == " \"results\": [", + results(2) == "aaa,", + results(3) == "bbb,", + results(4) == "ccc", + results(5) == " ]", + results(6) == "}" + ) + ), + test("is thread safe") { + checkN(10)(Gen.setOfN(100)(Gen.alphaNumericStringBounded(1, 10))) { linesToWrite => for { - _ <- - ZIO.foreachPar( - linesToWrite - )(writeToTestFile) - results <- readFile - } yield assertTrue(linesToWrite.forall(results.contains(_))) + path <- writeToTestFile(parallel = true)(linesToWrite.toList.map("\n" + _): _*) + results <- readFile(path).map(_.toSet) + union = linesToWrite.intersect(results) + } yield assertTrue(union == linesToWrite) } - }.provide(test, Scope.default) + } ) - private def writeToTestFile(content: String) = - ZIO.serviceWithZIO[ResultFileOps](_.write(content + "\n", append = true)) - - val readFile: ZIO[Path with Scope, Nothing, List[String]] = - for { - tmpFilePath <- ZIO.service[Path] - source = scala.io.Source.fromFile(tmpFilePath.toString) - _ <- ZIO.addFinalizer(ZIO.succeed(source.close())) - lines <- ZIO.attempt { - source.getLines().toList - }.orDie - } yield lines + private def writeToTestFile(parallel: Boolean)(content: String*): Task[Path] = + ZIO.scoped(for { + path <- ZIO.attemptBlocking(java.nio.file.Files.createTempFile("zio-test", ".json")) + writer <- ResultFileOps.Json(path.toString) + _ <- + if (parallel) ZIO.foreachParDiscard(content)(writer.write(_)) + else ZIO.foreachDiscard(content)(writer.write(_)) + } yield path) - val test: ZLayer[Any, Throwable, Path with ResultFileOps.Json] = - ZLayer.fromZIO { - for { - fileLock <- Ref.Synchronized.make[Unit](()) - result <- ZIO - .attempt( - java.nio.file.Files.createTempFile("zio-test", ".json") - ) - .map(path => (path, ResultFileOps.Json(path.toString, fileLock))) - } yield result - }.flatMap(tup => ZLayer.succeed(tup.get._1) ++ ZLayer.succeed(tup.get._2)) + def readFile(path: Path): Task[Vector[String]] = + ZIO.acquireReleaseWith(ZIO.succeed(scala.io.Source.fromFile(path.toString)))(s => ZIO.succeed(s.close)) { s => + ZIO.succeed(s.getLines().toVector) + } } diff --git a/test-tests/shared/src/test/scala-3/zio/test/SmartAssertionScala3Spec.scala b/test-tests/shared/src/test/scala-3/zio/test/SmartAssertionScala3Spec.scala new file mode 100644 index 000000000000..609eaedb1402 --- /dev/null +++ b/test-tests/shared/src/test/scala-3/zio/test/SmartAssertionScala3Spec.scala @@ -0,0 +1,95 @@ +package zio.test + +object SmartAssertionScala3Spec extends ZIOBaseSpec { + + override def spec = + suite("SmartAssertionScala3Spec")( + suite("new instance creation")( + test("anonymous class (trait) with overload and type args - new instance") { + trait ClassWithOverload[X] { + def overloaded: Int = 1 + def overloaded(x: Int): Int = 1 + } + assertTrue(new ClassWithOverload[Int]() {}.overloaded == 1) + }, + test("anonymous class with overload - new instance") { + class ClassWithOverload { + def overloaded: Int = 1 + def overloaded(x: Int): Int = 1 + } + assertTrue(new ClassWithOverload() {}.overloaded == 1) + }, + test("anonymous class (trait) with overload - new instance") { + trait ClassWithOverload { + def overloaded: Int = 1 + def overloaded(x: Int): Int = 1 + } + assertTrue(new ClassWithOverload() {}.overloaded == 1) + }, + test("trait with parameter and overloaded methods") { + trait TraitOverloadedWithParameter(x: Int) { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + + assertTrue(new TraitOverloadedWithParameter(1) {}.overloaded == 1) + }, + test("trait with overloaded methods with type args") { + trait TraitOverloadedAndTypeArgs[A] { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + + assertTrue(new TraitOverloadedAndTypeArgs[Int] {}.overloaded == 1) + }, + test("trait with parameter and overloaded methods with type args") { + trait TraitOverloadedWithParameterAndTypeArgs[A](x: A) { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + + assertTrue(new TraitOverloadedWithParameterAndTypeArgs[Int](1) {}.overloaded == 1) + }, + test("inlined trait with overloaded methods and parameter and type arg") { + trait TraitOverloadedWithParameterAndTypeArgs[A](x: A) { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + @scala.annotation.nowarn + inline def create = new TraitOverloadedWithParameterAndTypeArgs[Int](1) {} + + assertTrue(create.overloaded == 1) + }, + test("inlined trait with overloaded methods and parameter") { + trait TraitOverloadedWithParameter(x: Int) { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + @scala.annotation.nowarn + inline def create = + new TraitOverloadedWithParameter(1) {} + + assertTrue(create.overloaded == 1) + }, + test("inlined class with overloaded methods and parameter") { + class ClassOverloadedWithParameter(x: Int) { + def overloaded: Int = 1 + + def overloaded(x: Int) = 1 + } + + @scala.annotation.nowarn + inline def create = + new ClassOverloadedWithParameter(1) + + assertTrue(create.overloaded == 1) + } + ) + ) + +} diff --git a/test-tests/shared/src/test/scala-3/zio/test/TestProvideAutoSpec.scala b/test-tests/shared/src/test/scala-3/zio/test/TestProvideAutoSpec.scala new file mode 100644 index 000000000000..4e6334e4b54f --- /dev/null +++ b/test-tests/shared/src/test/scala-3/zio/test/TestProvideAutoSpec.scala @@ -0,0 +1,70 @@ +package zio.test + +import zio.* +import zio.test.Assertion.* + +object TestProvideAutoSpec extends ZIOBaseSpec { + def spec = + suite("TestProvideAutoSpec")( + suite(".provideSomeAuto") { + val stringLayer = ZLayer.succeed("10") + + val myTest = test("provides some") { + ZIO.environment[Int with String].map { env => + assertTrue(env.get[String].toInt == env.get[Int]) + } + }.provideSomeAuto(stringLayer) + + myTest.provide(ZLayer.succeed(10)) + } + ) + + object TestLayer { + trait OldLady { + def willDie: UIO[Boolean] + } + + object OldLady { + def live: URLayer[Fly, OldLady] = ZLayer.succeed(new OldLady { + override def willDie: UIO[Boolean] = ZIO.succeed(false) + }) + } + + trait Fly {} + object Fly { + def live: URLayer[Spider, Fly] = ZLayer.succeed(new Fly {}) + def manEatingFly: URLayer[OldLady, Fly] = ZLayer.succeed(new Fly {}) + } + + trait Spider {} + object Spider { + def live: ULayer[Spider] = ZLayer.succeed(new Spider {}) + } + + trait Repository + + case class MongoRepository() extends Repository + object MongoRepository { + val live: ULayer[MongoRepository] = ZLayer.fromFunction(MongoRepository.apply _) + } + + case class GreenplumRepository() extends Repository + object GreenplumRepository { + val live: ULayer[GreenplumRepository] = ZLayer.fromFunction(GreenplumRepository.apply _) + } + + case class RepositoryLive(mongo: MongoRepository, gp: GreenplumRepository) extends Repository + object RepositoryLive { + val live: URLayer[MongoRepository with GreenplumRepository, RepositoryLive] = + ZLayer.fromFunction(RepositoryLive.apply _) + } + + trait Service + case class ServiceLive(repo: Repository) extends Service + object ServiceLive { + val live: URLayer[Repository, ServiceLive] = ZLayer.fromFunction(ServiceLive.apply _) + } + + } + +} diff --git a/test-tests/shared/src/test/scala/zio/test/ConsoleTestOutputSpec.scala b/test-tests/shared/src/test/scala/zio/test/ConsoleTestOutputSpec.scala index fb4f74a28b1d..5cad4e91ce8f 100644 --- a/test-tests/shared/src/test/scala/zio/test/ConsoleTestOutputSpec.scala +++ b/test-tests/shared/src/test/scala/zio/test/ConsoleTestOutputSpec.scala @@ -41,6 +41,9 @@ object ConsoleTestOutputSpec extends ZIOBaseSpec { test("empty test suite") { runLog(suite4).map(res => containsUnstyled(res, suite4Expected)) }, + test("nested suite with sequential tests") { + runLog(suite5).map(res => containsUnstyled(res, suite5Expected)) + }, test("failure of simple assertion") { runLog(test5).map(res => containsUnstyled(res, test5Expected)) }, diff --git a/test-tests/shared/src/test/scala/zio/test/IntellijRendererSpec.scala b/test-tests/shared/src/test/scala/zio/test/IntellijRendererSpec.scala index cd01f0e2894b..f06e7267ab66 100644 --- a/test-tests/shared/src/test/scala/zio/test/IntellijRendererSpec.scala +++ b/test-tests/shared/src/test/scala/zio/test/IntellijRendererSpec.scala @@ -29,6 +29,9 @@ object IntellijRendererSpec extends ZIOBaseSpec { test("correctly reports empty test suite") { runLog(suite4).map(res => suite4Expected.map(expected => containsUnstyled(res, expected)).reduce(_ && _)) }, + test("nested suite with sequential tests") { + runLog(suite5).map(res => suite5Expected.map(expected => containsUnstyled(res, expected)).reduce(_ && _)) + }, test("correctly reports failure of simple assertion") { runLog(test5).map(res => test5Expected.map(expected => containsUnstyled(res, expected)).reduce(_ && _)) }, @@ -90,6 +93,15 @@ object IntellijRendererSpec extends ZIOBaseSpec { ) ++ suite1Expected ++ Vector(suiteStarted("Empty"), suiteFinished("Empty")) ++ test3Expected ++ Vector(suiteFinished("Suite4")) + def suite5Expected(implicit sourceLocation: SourceLocation): Vector[String] = Vector( + suiteStarted("Suite1"), + suiteStarted("Suite2"), + testStarted("Test1"), + testFinished("Test1"), + suiteFinished("Suite2"), + suiteFinished("Suite1") + ) + def test5Expected(implicit sourceLocation: SourceLocation): Vector[String] = Vector( testStarted("Addition works fine"), testFailed( diff --git a/test-tests/shared/src/test/scala/zio/test/ReportingTestUtils.scala b/test-tests/shared/src/test/scala/zio/test/ReportingTestUtils.scala index 10a6b6e07038..d6c6b03e7707 100644 --- a/test-tests/shared/src/test/scala/zio/test/ReportingTestUtils.scala +++ b/test-tests/shared/src/test/scala/zio/test/ReportingTestUtils.scala @@ -234,6 +234,20 @@ object ReportingTestUtils { test3Expected() } + def suite5(implicit sourceLocation: SourceLocation): Spec[Any, Nothing] = + suite("Suite1")( + suite("Suite2")( + test("Test1")(assertCompletes) + ) @@ TestAspect.sequential + ) + + def suite5Expected(implicit sourceLocation: SourceLocation): Vector[String] = + Vector( + expectedSuccess("Suite1"), + expectedSuccess("Suite2"), + expectedSuccess("Test1") + ) + def assertSourceLocation()(implicit sourceLocation: SourceLocation): String = cyan(s"at ${sourceLocation.path}:${sourceLocation.line} ") diff --git a/test-tests/shared/src/test/scala/zio/test/SmartAssertionSpec.scala b/test-tests/shared/src/test/scala/zio/test/SmartAssertionSpec.scala index 044f712da768..cb7ad4f875ad 100644 --- a/test-tests/shared/src/test/scala/zio/test/SmartAssertionSpec.scala +++ b/test-tests/shared/src/test/scala/zio/test/SmartAssertionSpec.scala @@ -87,6 +87,19 @@ object SmartAssertionSpec extends ZIOBaseSpec { assertTrue(zio.Duration.fromNanos(1000) == zio.Duration.Zero) } ) @@ failing, + suite("class constructors")( + test("string constructor") { + val bytes: Array[Byte] = "test".getBytes + assertTrue(new String(bytes) == "test") + }, + test("class constructor") { + assertTrue(new Service("serviceName").name == "serviceName") + }, + test("generic class contructor") { + assertTrue(new GenericService[Int](52).value == 52) && + assertTrue(new GenericService(52).value == 52) + } + ), suite("contains")( test("Option") { assertTrue(company.users.head.posts.head.publishDate.contains(LocalDateTime.MAX)) @@ -252,6 +265,13 @@ object SmartAssertionSpec extends ZIOBaseSpec { assertTrue(b > aL) && assertTrue(bL > a) && assertTrue(b >= aL) && assertTrue(bL >= a) }, + test("comparison compiles when comparing types with ops implicit class containing compare operation") { + import java.time.temporal.ChronoUnit._ + + val duration = Duration(500, MILLIS) + assertTrue(duration < Duration(1, SECONDS)) && + assertTrue("testing" > "test") + }, test("exists must succeed when at least one element of iterable satisfy specified assertion") { assertTrue(Seq(1, 42, 5).exists(_ == 42)) }, @@ -608,6 +628,34 @@ object SmartAssertionSpec extends ZIOBaseSpec { val exit: Exit[String, Int] = Exit.succeed(1) assertTrue(exit.isSuccess) }, + test("class with overload - new instance") { + class ClassWithOverload { + def overloaded: Int = 1 + def overloaded(x: Int): Int = x + } + assertTrue(new ClassWithOverload().overloaded == 1) + }, + test("class with overload with type args - new instance") { + class ClassWithOverload[A] { + def overloaded: Int = 1 + def overloaded(x: Int): Int = x + } + assertTrue(new ClassWithOverload[Int]().overloaded == 1) + }, + test("class with overload with args - new instance") { + class ClassWithOverload(x: Int) { + def overloaded: Int = x + def overloaded(x: Int): Int = x + } + assertTrue(new ClassWithOverload(1).overloaded == 1) + }, + test("class with overload with args and type args - new instance") { + class ClassWithOverload[A](x: Int) { + def overloaded: Int = x + def overloaded(x: Int): Int = x + } + assertTrue(new ClassWithOverload[Int](1).overloaded == 1) + }, test("equalTo on java.lang.Boolean works") { val jBool = java.lang.Boolean.FALSE assertTrue(jBool == false) diff --git a/test-tests/shared/src/test/scala/zio/test/SmartTestTypes.scala b/test-tests/shared/src/test/scala/zio/test/SmartTestTypes.scala index 30059a826eb7..ac86474c4e1a 100644 --- a/test-tests/shared/src/test/scala/zio/test/SmartTestTypes.scala +++ b/test-tests/shared/src/test/scala/zio/test/SmartTestTypes.scala @@ -15,4 +15,8 @@ object SmartTestTypes { case class Company(name: String, users: List[User]) + class Service(val name: String) + + class GenericService[A](val value: A) + } diff --git a/test-tests/shared/src/test/scala/zio/test/TestProvideSpec.scala b/test-tests/shared/src/test/scala/zio/test/TestProvideSpec.scala index 27accb27320b..28d04367b6f6 100644 --- a/test-tests/shared/src/test/scala/zio/test/TestProvideSpec.scala +++ b/test-tests/shared/src/test/scala/zio/test/TestProvideSpec.scala @@ -153,39 +153,78 @@ object TestProvideSpec extends ZIOBaseSpec { ) ).provideShared(refLayer) @@ TestAspect.sequential }, - suite(".provideSomeShared") { + suite(".provideSomeShared")( + suite("providing layer") { + val addOne: ZIO[IntService, Nothing, Int] = + ZIO.serviceWithZIO[IntService](_.add(1)) - val addOne: ZIO[IntService, Nothing, Int] = - ZIO.serviceWithZIO[IntService](_.add(1)) + val appendBang: ZIO[StringService, Nothing, String] = + ZIO.serviceWithZIO[StringService](_.append("!")) - val appendBang: ZIO[StringService, Nothing, String] = - ZIO.serviceWithZIO[StringService](_.append("!")) + val intService: ULayer[IntService] = ZLayer(Ref.make(0).map(IntService(_))) + val stringService: ULayer[StringService] = + ZLayer(Ref.make("Hello").map(StringService(_)).debug("MAKING")) - val intService: ULayer[IntService] = ZLayer(Ref.make(0).map(IntService(_))) - val stringService: ULayer[StringService] = - ZLayer(Ref.make("Hello").map(StringService(_)).debug("MAKING")) + def customTest(int: Int) = + test(s"test $int") { + for { + x <- addOne + str <- appendBang + } yield assertTrue(x == int && str == s"Hello!") + } - def customTest(int: Int) = - test(s"test $int") { - for { - x <- addOne - str <- appendBang - } yield assertTrue(x == int && str == s"Hello!") - } + suite("layers are shared between tests and suites")( + suite("suite 1")( + customTest(1), + customTest(2) + ), + suite("suite 4")( + customTest(3), + customTest(4) + ) + ) + .provideSomeShared[StringService](intService) + .provide(stringService) @@ TestAspect.sequential + }, + suite("providing 3rd layer as input") { + val initialValue: ULayer[Int] = ZLayer.succeed(0) - suite("layers are shared between tests and suites")( - suite("suite 1")( - customTest(1), - customTest(2) - ), - suite("suite 4")( - customTest(3), - customTest(4) + val addOne: ZIO[IntService, Nothing, Int] = + ZIO.serviceWithZIO[IntService](_.add(1)) + + val appendBang: ZIO[StringService, Nothing, String] = + ZIO.serviceWithZIO[StringService](_.append("!")) + + val intService = ZLayer.fromZIO(for { + initial <- ZIO.service[Int] + ref <- Ref.make(initial) + } yield IntService(ref)) + + val stringService: ULayer[StringService] = + ZLayer(Ref.make("Hello").map(StringService(_)).debug("MAKING")) + + def customTest(int: Int) = + test(s"test $int") { + for { + x <- addOne + str <- appendBang + } yield assertTrue(x == int && str == s"Hello!") + } + + suite("layers are shared between tests and suites")( + suite("suite 1")( + customTest(1), + customTest(2) + ), + suite("suite 4")( + customTest(3), + customTest(4) + ) ) - ) - .provideSomeShared[StringService](intService) - .provide(stringService) @@ TestAspect.sequential - } @@ TestAspect.exceptScala3 + .provideSomeShared[StringService & Int](intService) + .provide(stringService ++ initialValue) @@ TestAspect.sequential + } + ) ) object TestLayer { diff --git a/test/js/src/main/scala/zio/test/TestClockPlatformSpecific.scala b/test/js/src/main/scala/zio/test/TestClockPlatformSpecific.scala index ba45aebe90ac..10f6b76f9549 100644 --- a/test/js/src/main/scala/zio/test/TestClockPlatformSpecific.scala +++ b/test/js/src/main/scala/zio/test/TestClockPlatformSpecific.scala @@ -23,7 +23,7 @@ private[test] trait TestClockPlatformSpecific { self: TestClock.Test => def scheduler(implicit trace: Trace): UIO[Scheduler] = ZIO.runtime[Any].map { runtime => - new Scheduler { + new Scheduler.Internal { def schedule(runnable: Runnable, duration: Duration)(implicit unsafe: Unsafe): Scheduler.CancelToken = { val fiber = runtime.unsafe.fork(sleep(duration) *> ZIO.succeed(runnable.run())) diff --git a/test/jvm-native/src/main/scala/zio/test/TestClockPlatformSpecific.scala b/test/jvm-native/src/main/scala/zio/test/TestClockPlatformSpecific.scala index 72c4957e96ab..fedd72395fd7 100644 --- a/test/jvm-native/src/main/scala/zio/test/TestClockPlatformSpecific.scala +++ b/test/jvm-native/src/main/scala/zio/test/TestClockPlatformSpecific.scala @@ -29,7 +29,7 @@ trait TestClockPlatformSpecific { self: TestClock.Test => def scheduler(implicit trace: Trace): UIO[Scheduler] = (ZIO.executor <*> ZIO.runtime[Any]).map { case (executor, runtime) => - new Scheduler { + new Scheduler.Internal { def schedule(runnable: Runnable, duration: Duration)(implicit unsafe: Unsafe): Scheduler.CancelToken = { val fiber = runtime.unsafe.fork((sleep(duration) *> ZIO.succeed(runnable.run()))) diff --git a/test/jvm-native/src/main/scala/zio/test/results/ResultFileOps.scala b/test/jvm-native/src/main/scala/zio/test/results/ResultFileOps.scala index c63ea2023535..701bdfe81a00 100644 --- a/test/jvm-native/src/main/scala/zio/test/results/ResultFileOps.scala +++ b/test/jvm-native/src/main/scala/zio/test/results/ResultFileOps.scala @@ -2,83 +2,67 @@ package zio.test.results import zio._ -import java.io.IOException -import scala.io.Source +import java.util.concurrent.ConcurrentLinkedQueue private[test] trait ResultFileOps { - def write(content: => String, append: Boolean): ZIO[Any, IOException, Unit] + def write(content: => String): UIO[Unit] } private[test] object ResultFileOps { val live: ZLayer[Any, Nothing, ResultFileOps] = - ZLayer.scoped( - Json.apply - ) + ZLayer.scoped(Json.apply) - private[test] case class Json(resultPath: String, lock: Ref.Synchronized[Unit]) extends ResultFileOps { - def write(content: => String, append: Boolean): ZIO[Any, IOException, Unit] = - lock.updateZIO(_ => - ZIO - .acquireReleaseWith(ZIO.attemptBlockingIO(new java.io.FileWriter(resultPath, append)))(f => - ZIO.attemptBlocking(f.close()).orDie - ) { f => - ZIO.attemptBlockingIO(f.append(content)) - } - .ignore - ) + private[test] final class Json private (resultPath: String) extends ResultFileOps { + private val queue = new ConcurrentLinkedQueue[String]() + + def write(content: => String): UIO[Unit] = + ZIO.succeed { + queue.offer(content) + () + } + + private def close: UIO[Unit] = + ZIO.attemptBlocking { + makeOutputDirectory() + flushUnsafe() + }.orDie - private val makeOutputDirectory = ZIO.attempt { + private def makeOutputDirectory(): Unit = { import java.nio.file.{Files, Paths} val fp = Paths.get(resultPath) Files.createDirectories(fp.getParent) - }.unit + () + } - private def closeJson: ZIO[Scope, Throwable, Unit] = - removeLastComma *> - write("\n ]\n}", append = true).orDie + private def flushUnsafe(): Unit = { + val file = new java.io.FileWriter(resultPath, false) + try { + file.write("""|{ + | "results": [""".stripMargin) - private def writeJsonPreamble: URIO[Any, Unit] = - write( - """|{ - | "results": [""".stripMargin, - append = false - ).orDie - - private val removeLastComma = - for { - source <- ZIO.succeed(Source.fromFile(resultPath)) - updatedLines = { - val lines = source.getLines().toList - if (lines.nonEmpty && lines.last.endsWith(",")) { - val lastLine = lines.last - val newLastLine = lastLine.dropRight(1) - lines.init :+ newLastLine + var next = queue.poll() + while (next ne null) { + val current = next + next = queue.poll() + if ((next eq null) && current.endsWith(",")) { + file.write(current.dropRight(1)) } else { - lines + file.write(current) } } - _ <- ZIO.when(updatedLines.nonEmpty) { - val firstLine :: rest = updatedLines - for { - _ <- write(firstLine + "\n", append = false) - _ <- ZIO.foreach(rest)(line => write(line + "\n", append = true)) - _ <- ZIO.addFinalizer(ZIO.attempt(source.close()).orDie) - } yield () - } - } yield () - + file.write("\n ]\n}") + } finally { + file.close() + } + } } object Json { + def apply(filename: String): ZIO[Scope, Nothing, Json] = + ZIO.acquireRelease(ZIO.succeed(new Json(filename)))(_.close) + def apply: ZIO[Scope, Nothing, Json] = - ZIO.acquireRelease( - for { - fileLock <- Ref.Synchronized.make[Unit](()) - instance = Json("target/test-reports-zio/output.json", fileLock) - _ <- instance.makeOutputDirectory.orDie - _ <- instance.writeJsonPreamble - } yield instance - )(instance => instance.closeJson.orDie) + apply("target/test-reports-zio/output.json") } } diff --git a/test/jvm-native/src/main/scala/zio/test/results/ResultPrinterJson.scala b/test/jvm-native/src/main/scala/zio/test/results/ResultPrinterJson.scala index e1c595b4ebca..1b6f60acaed5 100644 --- a/test/jvm-native/src/main/scala/zio/test/results/ResultPrinterJson.scala +++ b/test/jvm-native/src/main/scala/zio/test/results/ResultPrinterJson.scala @@ -8,13 +8,11 @@ private[test] object ResultPrinterJson { ZLayer.make[ResultPrinter]( ResultSerializer.live, ResultFileOps.live, - ZLayer.fromFunction( - LiveImpl(_, _) - ) + ZLayer.fromFunction(LiveImpl.apply _) ) private case class LiveImpl(serializer: ResultSerializer, resultFileOps: ResultFileOps) extends ResultPrinter { override def print[E](event: ExecutionEvent.Test[E]): ZIO[Any, Nothing, Unit] = - resultFileOps.write(serializer.render(event), append = true).orDie + resultFileOps.write(serializer.render(event)) } } diff --git a/test/shared/src/main/scala-3/zio/test/Macros.scala b/test/shared/src/main/scala-3/zio/test/Macros.scala index 68d3bc48b8a0..8b2d8c1be85b 100644 --- a/test/shared/src/main/scala-3/zio/test/Macros.scala +++ b/test/shared/src/main/scala-3/zio/test/Macros.scala @@ -46,11 +46,13 @@ object SmartAssertMacros { ): Option[(quotes.reflect.Term, String, List[quotes.reflect.TypeRepr], Option[List[quotes.reflect.Term]])] = { import quotes.reflect._ tree match { - case Select(lhs, name) => Some((lhs, name, List.empty, None)) - case TypeApply(Select(lhs, name), tpes) => Some((lhs, name, tpes.map(_.tpe), None)) - case Apply(Select(lhs, name), args) => Some((lhs, name, List.empty, Some(args))) - case Apply(TypeApply(Select(lhs, name), tpes), args) => Some((lhs, name, tpes.map(_.tpe), Some(args))) - case _ => None + case Select(lhs, name) => Some((lhs, name, List.empty, None)) + case TypeApply(Select(lhs, name), tpes) => Some((lhs, name, tpes.map(_.tpe), None)) + case Apply(select @ Select(lhs, name), args) if !select.symbol.isClassConstructor => + Some((lhs, name, List.empty, Some(args))) + case Apply(TypeApply(select @ Select(lhs, name), tpes), args) if !select.symbol.isClassConstructor => + Some((lhs, name, tpes.map(_.tpe), Some(args))) + case _ => None } } } @@ -61,6 +63,12 @@ object SmartAssertMacros { def apply(using Quotes)(term: quotes.reflect.Term) = new PositionContext(term.pos.start) } + def unsupportedOperationErrorExpr(using Quotes) = '{ + scala.compiletime.error( + "Unsupported operation in 'assertTrue'\nPlease open an issue: https://github.com/zio/zio/issues/new" + ) + } + def transformAs[Start: Type, End: Type]( expr: Expr[TestLens[End]] )(start: Expr[TestArrow[Any, Start]])(using PositionContext, Quotes): Expr[TestArrow[Any, End]] = { @@ -165,7 +173,7 @@ object SmartAssertMacros { val arrow = Inlined(a, b, transform(expr.asExprOf[A]).asTerm).asExprOf[zio.test.TestArrow[Any, A]] '{ $arrow.span($preMacroExpansionSpan) } - case Unseal(Apply(Select(lhs, op @ (">" | ">=" | "<" | "<=")), List(rhs))) => + case Unseal(tree @ Apply(Select(lhs, op @ (">" | ">=" | "<" | "<=")), List(rhs))) => def tpesPriority(tpe: TypeRepr): Int = tpe.toString match { case "Byte" => 0 @@ -229,7 +237,7 @@ object SmartAssertMacros { .span($span) }.asExprOf[TestArrow[Any, A]] } - case _ => throw new Error("NO") + case _ => unsupportedOperationErrorExpr } } case Some(false) => @@ -263,7 +271,10 @@ object SmartAssertMacros { .span($span) }.asExprOf[TestArrow[Any, A]] } - case _ => throw new Error("NO") + case (None, _) => + val span = getSpan(tree) + '{ TestArrow.succeed($expr).span($span) } + case _ => unsupportedOperationErrorExpr } } case None => @@ -297,7 +308,7 @@ object SmartAssertMacros { .span($span) }.asExprOf[TestArrow[Any, A]] } - case _ => throw new Error("NO") + case _ => unsupportedOperationErrorExpr } } } @@ -334,29 +345,42 @@ object SmartAssertMacros { } case Unseal(method @ MethodCall(lhs, name, tpeArgs, args)) => - def body(param: Term) = + def body(param: Term): Term = (tpeArgs, args) match { case (Nil, None) => try Select.unique(param, name) catch { case _: AssertionError => - def getFieldOrMethod(s: Symbol) = - s.fieldMembers + def getFieldOrMethod(tpe: TypeRepr, owner: Tree): Select = { + val s = tpe.typeSymbol + val member = s.fieldMembers .find(f => f.name == name) .orElse(s.methodMember(name).filter(_.declarations.nonEmpty).headOption) + .getOrElse( + report.errorAndAbort(s"Could not resolve $name on ${owner.show(using Printer.TreeStructure)}") + ) + Select(param, member) + } - // Tries to find directly the referenced method on lhs's type (or if lhs is method, on lhs's returned type) - lhs.symbol.tree match { - case DefDef(_, _, tpt, _) => - getFieldOrMethod(tpt.symbol) match { - case Some(fieldOrMethod) => Select(param, fieldOrMethod) - case None => throw new Error(s"Could not resolve $name on $tpt") - } - case _ => - getFieldOrMethod(lhs.symbol) match { - case Some(fieldOrMethod) => Select(param, fieldOrMethod) - case None => throw new Error(s"Could not resolve $name on $lhs") - } + lhs.underlyingArgument match { + case Block(List(cls: ClassDef), term) => + // if this is new instance of anonymous class - take symbol from it instead of block + getFieldOrMethod(term.tpe, term) + + case Typed(Block(List(cls: ClassDef), term), _) => + getFieldOrMethod(term.tpe, term) + + // Tries to find directly the referenced method on lhs's type (or if lhs is method, on lhs's returned type) + case lhs => + if lhs.symbol == Symbol.noSymbol then + report.errorAndAbort(s"Can't get symbol of ${lhs.show(using Printer.TreeStructure)}") + else + lhs.symbol.tree match { + case DefDef(_, _, tpt, _) => + getFieldOrMethod(tpt.tpe, tpt) + case _ => + getFieldOrMethod(lhs.tpe, lhs) + } } } case (tpeArgs, Some(args)) => Select.overloaded(param, name, tpeArgs, args) diff --git a/test/shared/src/main/scala-3/zio/test/SpecLayerMacros.scala b/test/shared/src/main/scala-3/zio/test/SpecLayerMacros.scala index 8694d64d7083..02a6a9215ba0 100644 --- a/test/shared/src/main/scala-3/zio/test/SpecLayerMacros.scala +++ b/test/shared/src/main/scala-3/zio/test/SpecLayerMacros.scala @@ -9,14 +9,66 @@ object SpecLayerMacros { def provideImpl[R0: Type, R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using Quotes ): Expr[Spec[R0, E]] = { - val expr = LayerMacros.constructLayer[R0, R, E](layer) + val expr = LayerMacros.constructStaticProvideLayer[R0, R, E](layer) '{ $spec.provideLayer($expr) } } + def provideSomeImpl[R0: Type, R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using + Quotes + ): Expr[Spec[R0, E]] = { + val expr = LayerMacros.constructStaticProvideSomeLayer[R0, R, E](layer) + '{ $spec.provideLayer($expr) } + } + + def provideAutoImpl[R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using + Quotes + ): Expr[Spec[_, E]] = { + val layerExpr = LayerMacros.constructDynamicLayer[R, E](layer) + // https://github.com/scala/scala3/issues/22886 + layerExpr match { + case '{ $layer: ZLayer[in, e, out] } => + '{ $spec.provideLayer($layer) } + } + } + def provideSharedImpl[R0: Type, R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using Quotes ): Expr[Spec[R0, E]] = { - val expr = LayerMacros.constructLayer[R0, R, E](layer) + import quotes.reflect._ + val expr = LayerMacros.constructStaticProvideLayer[R0, R, E](layer) '{ $spec.provideLayerShared($expr) } } + + def provideSomeSharedImpl[R0: Type, R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using + Quotes + ): Expr[Spec[R0, E]] = { + import quotes.reflect._ + val layerExpr: Expr[ZLayer[R0, E, ?]] = LayerMacros.constructStaticProvideSomeSharedLayer[R0, R, E](layer) + layerExpr match { + case '{ $layer: ZLayer[in, e, out] } => + /** + * Contract of [[zio.internal.macros.LayerBuilder.build]] ensures this + */ + val proof = Expr.summon[(R0 & out) <:< R] match { + case Some(e) => + e + case None => + report.errorAndAbort( + s"Cannot proof that R0 (${Type.show[R0]}) & out (${Type.show[out]}) <:< R (${Type.show[R]})" + ) + } + '{ $spec.provideSomeLayerShared[R0]($layer)(using $proof) } + } + } + + def provideSharedAutoImpl[R: Type, E: Type](spec: Expr[Spec[R, E]], layer: Expr[Seq[ZLayer[_, E, _]]])(using + Quotes + ): Expr[Spec[_, E]] = { + val layerExpr = LayerMacros.constructDynamicLayer[R, E](layer) + // https://github.com/scala/scala3/issues/22886 + layerExpr match { + case '{ $layer: ZLayer[in, e, out] } => + '{ $spec.provideLayerShared($layer) } + } + } } diff --git a/test/shared/src/main/scala-3/zio/test/SpecVersionSpecific.scala b/test/shared/src/main/scala-3/zio/test/SpecVersionSpecific.scala index 9f82947d7cae..cad41b73c3ae 100644 --- a/test/shared/src/main/scala-3/zio/test/SpecVersionSpecific.scala +++ b/test/shared/src/main/scala-3/zio/test/SpecVersionSpecific.scala @@ -10,9 +10,46 @@ private[test] trait SpecVersionSpecific[-R, +E] { self: Spec[R, E] => inline def provide[E1 >: E](inline layer: ZLayer[_, E1, _]*): Spec[Any, E1] = ${ SpecLayerMacros.provideImpl[Any, R, E1]('self, 'layer) } + /** + * Splits the environment into two parts, providing each test with one part + * using the specified layer and leaving the remainder `R0`. + * + * {{{ + * val spec: ZSpec[Clock with Random, Nothing] = ??? + * val clockLayer: ZLayer[Any, Nothing, Clock] = ??? + * + * val spec2: ZSpec[Random, Nothing] = spec.provideSome[Random](clockLayer) + * }}} + */ def provideSome[R0] = new ProvideSomePartiallyApplied[R0, R, E](self) + /** + * Equivalent to [[provideSome]], but does not require providing the remainder + * type + * + * {{{ + * val spec: ZSpec[Clock with Random, Nothing] = ??? + * val clockLayer: ZLayer[Any, Nothing, Clock] = ??? + * + * val spec2 = spec.provideSome(clockLayer) // Inferred type is ZSpec[Random, Nothing] + * }}} + */ + inline transparent def provideSomeAuto[E1 >: E](inline layer: ZLayer[_, E1, _]*): Spec[_, E1] = + ${ SpecLayerMacros.provideAutoImpl[R, E1]('self, 'layer) } + + /** + * Splits the environment into two parts, providing all tests with a shared + * version of one part using the specified layer and leaving the remainder + * `R0`. + * + * {{{ + * val spec: ZSpec[Int with Random, Nothing] = ??? + * val intLayer: ZLayer[Any, Nothing, Int] = ??? + * + * val spec2 = spec.provideSomeShared[Random](intLayer) + * }}} + */ def provideSomeShared[R0] = new ProvideSomeSharedPartiallyApplied[R0, R, E](self) @@ -71,10 +108,10 @@ private[test] trait SpecVersionSpecific[-R, +E] { self: Spec[R, E] => final class ProvideSomePartiallyApplied[R0, -R, +E](val self: Spec[R, E]) extends AnyVal { inline def apply[E1 >: E](inline layer: ZLayer[_, E1, _]*): Spec[R0, E1] = - ${ SpecLayerMacros.provideImpl[R0, R, E1]('self, 'layer) } + ${ SpecLayerMacros.provideSomeImpl[R0, R, E1]('self, 'layer) } // TODO: } final class ProvideSomeSharedPartiallyApplied[R0, -R, +E](val self: Spec[R, E]) extends AnyVal { inline def apply[E1 >: E](inline layer: ZLayer[_, E1, _]*): Spec[R0, E1] = - ${ SpecLayerMacros.provideSharedImpl[R0, R, E1]('self, 'layer) } + ${ SpecLayerMacros.provideSomeSharedImpl[R0, R, E1]('self, 'layer) } } diff --git a/test/shared/src/main/scala/zio/test/Gen.scala b/test/shared/src/main/scala/zio/test/Gen.scala index 50c6aa5b2404..dbf53fc88906 100644 --- a/test/shared/src/main/scala/zio/test/Gen.scala +++ b/test/shared/src/main/scala/zio/test/Gen.scala @@ -18,13 +18,13 @@ package zio.test import zio.Random._ import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.stream.{Stream, ZStream} +import zio.stream.ZStream import zio.{Chunk, NonEmptyChunk, Random, Trace, UIO, URIO, ZIO, Zippable} import java.nio.charset.StandardCharsets import java.util.UUID -import scala.collection.JavaConverters._ import scala.collection.immutable.SortedMap +import scala.jdk.CollectionConverters._ import scala.math.Numeric.DoubleIsFractional /** diff --git a/test/shared/src/main/scala/zio/test/TestRunner.scala b/test/shared/src/main/scala/zio/test/TestRunner.scala index 01d60b41a286..bba89e8a6541 100644 --- a/test/shared/src/main/scala/zio/test/TestRunner.scala +++ b/test/shared/src/main/scala/zio/test/TestRunner.scala @@ -66,10 +66,7 @@ final case class TestRunner[R, E](executor: TestExecutor[R, E]) { self => def runAsync(spec: Spec[R, E])(k: => Unit)(implicit trace: Trace, unsafe: Unsafe): Unit = { val fiber = runtime.unsafe.fork(self.run("Test Task name unavailable in this context.", spec)) - fiber.unsafe.addObserver { - case Exit.Success(_) => k - case Exit.Failure(c) => throw FiberFailure(c) - } + fiber.unsafe.addObserver { exit => exit.getOrThrowFiberFailure(); k } } /** diff --git a/test/shared/src/main/scala/zio/test/TimeVariants.scala b/test/shared/src/main/scala/zio/test/TimeVariants.scala index 762e16871b25..097f3dd01846 100644 --- a/test/shared/src/main/scala/zio/test/TimeVariants.scala +++ b/test/shared/src/main/scala/zio/test/TimeVariants.scala @@ -19,8 +19,7 @@ package zio.test import zio.{Duration, Trace} import java.time._ -import scala.annotation.nowarn -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ trait TimeVariants { @@ -287,7 +286,6 @@ trait TimeVariants { /** * A generator of `java.time.ZoneId` values. Doesn't have any shrinking. */ - @nowarn("msg=JavaConverters") final def zoneId(implicit trace: Trace): Gen[Any, ZoneId] = Gen.elements(ZoneId.getAvailableZoneIds.asScala.map(ZoneId.of).toList: _*).noShrink diff --git a/test/shared/src/main/scala/zio/test/package.scala b/test/shared/src/main/scala/zio/test/package.scala index 708803161ae9..b039b9371f44 100644 --- a/test/shared/src/main/scala/zio/test/package.scala +++ b/test/shared/src/main/scala/zio/test/package.scala @@ -18,9 +18,9 @@ package zio import zio.internal.stacktracer.{SourceLocation, Tracer} import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.stream.{ZChannel, ZSink, ZStream} +import zio.stream.{ZSink, ZStream} import zio.test.ReporterEventRenderer.ConsoleEventRenderer -import zio.test.Spec.LabeledCase +import zio.test.Spec.{ExecCase, LabeledCase} import scala.language.implicitConversions @@ -974,18 +974,18 @@ package object test extends CompileVariants { Spec.labeled( label, if (specs.isEmpty) Spec.empty - else if (specs.length == 1) { - wrapIfLabelledCase(specs.head) - } else Spec.multiple(Chunk.fromIterable(specs).map(spec => suiteConstructor(spec))) + else if (specs.length == 1) wrapIfLabelledCase(specs.head) + else Spec.multiple(Chunk.fromIterable(specs).map(spec => suiteConstructor(spec))) ) - // Ensures we render suite label when we have an individual Labeled test case - private def wrapIfLabelledCase[In](spec: In)(implicit suiteConstructor: SuiteConstructor[In], trace: Trace) = - spec match { - case Spec(LabeledCase(_, _)) => - Spec.multiple(Chunk(suiteConstructor(spec))) - case _ => suiteConstructor(spec) + // Ensures we render suite label when we have an individual Labeled / Exec test case + private def wrapIfLabelledCase[In](spec: In)(implicit suiteConstructor: SuiteConstructor[In], trace: Trace) = { + val suite = suiteConstructor(spec) + suite.caseValue match { + case _: LabeledCase[?] | _: ExecCase[?] => Spec.multiple(Chunk.single(suite)) + case _ => suite } + } /** * Builds a spec with a single test. diff --git a/test/shared/src/main/scala/zio/test/results/ResultSerializer.scala b/test/shared/src/main/scala/zio/test/results/ResultSerializer.scala index d14164859c60..083a5233a546 100644 --- a/test/shared/src/main/scala/zio/test/results/ResultSerializer.scala +++ b/test/shared/src/main/scala/zio/test/results/ResultSerializer.scala @@ -13,7 +13,7 @@ object ResultSerializer { object Json extends ResultSerializer { def render[E](executionEvent: ExecutionEvent.Test[E]): String = executionEvent match { - case ExecutionEvent.Test(labelsReversed, test, annotations, ancestors, duration, id, fullyQualifiedName) => + case ExecutionEvent.Test(labelsReversed, test, annotations, _, duration, _, fullyQualifiedName) => s""" | { | "name" : "$fullyQualifiedName/${labelsReversed.reverse @@ -28,12 +28,7 @@ object ResultSerializer { } private def render[E](test: Either[TestFailure[E], TestSuccess]): String = - test match { - case Left(value) => - "Failure" - case Right(value) => - "Success" - } + if (test.isRight) "Success" else "Failure" private[results] def render(testAnnotationMap: TestAnnotationMap): String = TestAnnotationRenderer.default diff --git a/website/package.json b/website/package.json index a8665a94f2b3..456f3c92eaab 100644 --- a/website/package.json +++ b/website/package.json @@ -22,7 +22,7 @@ "@docusaurus/theme-mermaid": "^3.5.2", "@docusaurus/theme-search-algolia": "^3.5.2", "@mdx-js/react": "^3.0.1", - "@tailwindcss/postcss": "4.1.3", + "@tailwindcss/postcss": "4.1.7", "@zio.dev/izumi-reflect": "2024.11.11-da5b828d4d6e", "@zio.dev/zio-amqp": "1.0.0-alpha.3", "@zio.dev/zio-aws": "2025.2.9-b3c92258585e", @@ -31,17 +31,17 @@ "@zio.dev/zio-cli": "0.7.0", "@zio.dev/zio-config": "4.0.4", "@zio.dev/zio-direct": "1.0.0-RC7", - "@zio.dev/zio-dynamodb": "1.0.0-RC18", + "@zio.dev/zio-dynamodb": "1.0.0-RC19", "@zio.dev/zio-ftp": "0.4.3", "@zio.dev/zio-http": "3.2.0", - "@zio.dev/zio-json": "0.7.39", + "@zio.dev/zio-json": "0.7.43", "@zio.dev/zio-kafka": "2.12.0", "@zio.dev/zio-lambda": "1.0.5", "@zio.dev/zio-logging": "2.5.0", "@zio.dev/zio-meta": "0.0.0--21-54bc2e8b-SNAPSHOT", "@zio.dev/zio-metrics-connectors": "2.3.1", "@zio.dev/zio-parser": "0.1.9", - "@zio.dev/zio-prelude": "1.0.0-RC39", + "@zio.dev/zio-prelude": "1.0.0-RC40", "@zio.dev/zio-process": "0.7.2", "@zio.dev/zio-profiling": "0.3.2", "@zio.dev/zio-query": "0.7.7", @@ -50,10 +50,10 @@ "@zio.dev/zio-rocksdb": "0.4.2", "@zio.dev/zio-s3": "2022.11.21-367e7009c0f5", "@zio.dev/zio-sbt": "0.4.0-alpha.31", - "@zio.dev/zio-schema": "1.6.6", + "@zio.dev/zio-schema": "1.7.0", "@zio.dev/zio-sqs": "2022.11.23-5a814304824c", "@zio.dev/zio-streams-compress": "1.1.0", - "@zio.dev/zio-telemetry": "3.1.3", + "@zio.dev/zio-telemetry": "3.1.4", "@zio.dev/zio2-interop-cats2": "2023.1.23-571dabdd91f9", "babel": "^6.23.0", "blended-include-code-plugin": "0.1.2", @@ -69,11 +69,11 @@ "react-icons": "5.5.0", "react-markdown": "10.1.0", "remark-kroki-plugin": "0.1.1", - "tailwindcss": "4.1.3", + "tailwindcss": "4.1.7", "tslib": "^2.4.0" }, "resolutions": { - "cytoscape": "3.31.2" + "cytoscape": "3.32.0" }, "browserslist": { "production": [ @@ -89,11 +89,11 @@ }, "devDependencies": { "@tsconfig/docusaurus": "2.0.3", - "@types/react": "19.1.0", + "@types/react": "19.1.4", "@types/react-helmet": "6.1.11", "@types/react-router-dom": "5.3.3", "prettier": "3.5.3", "prettier-plugin-tailwindcss": "0.6.11", - "typescript": "5.8.2" + "typescript": "5.8.3" } }