8000 Enqueue+Dequeue: add `shutdownCause` method by hearnadam · Pull Request #9928 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Enqueue+Dequeue: add shutdownCause method #9928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions core-tests/shared/src/test/scala/zio/HubSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,131 @@ object HubSpec extends ZIOBaseSpec {
}
}
),
suite("shutdown")(
test("shutdown with take fiber") {
for {
selfId <- ZIO.fiberId
hub <- Hub.bounded[Int](3)
f <- ZIO.scoped(hub.subscribe.flatMap(_.take)).fork
_ <- hub.shutdown
res <- f.join.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(Cause.interrupt(selfId))))
},
test("shutdown with publish fiber") {
for {
selfId <- ZIO.fiberId
hub <- Hub.bounded[Int](2)
_ <- hub.publish(1)
_ <- hub.publish(1)
f <- hub.publish(1).fork
_ <- hub.shutdown
res <- f.join.sandbox.either
} yield assert(res)(isLeft(equalTo(Cause.interrupt(selfId))))
},
test("shutdown with publish") {
for {
selfId <- ZIO.fiberId
hub <- Hub.bounded[Int](1)
_ <- hub.shutdown
res <- hub.publish(1).sandbox.either
} yield assert(res)(isLeft(equalTo(Cause.interrupt(selfId))))
},
test("shutdown with publishAll") {
for {
selfId <- ZIO.fiberId
hub <- Hub.bounded[Int](1)
_ <- hub.shutdown
res <- hub.publishAll(List(1)).sandbox.either
} yield assert(res)(isLeft(equalTo(Cause.interrupt(selfId))))
},
test("shutdown with size") {
for {
selfId <- ZIO.fiberId
hub <- Hub.bounded[Int](1)
_ <- hub.shutdown
res <- hub.size.sandbox.either
} yield assert(res)(isLeft(equalTo(Cause.interrupt(selfId))))
}
),
suite("shutdownCause")(
test("shutdown with take fiber using Cause.die") {
for {
hub <- Hub.bounded[Int](3)
f <- ZIO.scoped(hub.subscribe.flatMap(_.take)).fork
cause = Cause.die(new RuntimeException("test"))
_ <- hub.shutdownCause(cause)
res <- f.join.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with publish fiber using Cause.die") {
for {
hub <- Hub.bounded[Int](2)
_ <- hub.publish(1)
_ <- hub.publish(1)
f <- hub.publish(1).fork
cause = Cause.die(new RuntimeException("test"))
_ <- hub.shutdownCause(cause)
res <- f.join.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with publish using Cause.die") {
for {
hub <- Hub.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- hub.shutdownCause(cause)
res <- hub.publish(1).sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with publishAll using Cause.die") {
for {
hub <- Hub.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- hub.shutdownCause(cause)
res <- hub.publishAll(List(1)).sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with size using Cause.die") {
for {
hub <- Hub.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- hub.shutdownCause(cause)
res <- hub.size.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
}
),
suite("awaitShutdown")(
test("single") {
for {
hub <- Hub.bounded[Int](3)
p <- Promise.make[Nothing, Boolean]
_ <- (hub.awaitShutdown *> p.succeed(true)).fork
_ <- hub.shutdown
res <- p.await
} yield assert(res)(isTrue)
},
test("multiple") {
for {
hub <- Hub.bounded[Int](3)
p1 <- Promise.make[Nothing, Boolean]
p2 <- Promise.make[Nothing, Boolean]
_ <- (hub.awaitShutdown *> p1.succeed(true)).fork
_ <- (hub.awaitShutdown *> p2.succeed(true)).fork
_ <- hub.shutdown
res1 <- p1.await
res2 <- p2.await
} yield assert(res1)(isTrue) &&
assert(res2)(isTrue)
},
test("already shutdown") {
for {
hub <- Hub.bounded[Int](3)
_ <- hub.shutdown
p <- Promise.make[Nothing, Boolean]
_ <- (hub.awaitShutdown *> p.succeed(true)).fork
res <- p.await
} yield assert(res)(isTrue)
}
),
suite("concurrent publishers and subscribers")(
test("one to one") {
check(smallInt, Gen.listOf(smallInt)) { (n, as) =>
Expand Down
64 changes: 64 additions & 0 deletions core-tests/shared/src/test/scala/zio/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,70 @@ object QueueSpec extends ZIOBaseSpec {
_ <- f.await
} yield assertCompletes
} @@ exceptJS(nonFlaky),
suite("shutdownCause")(
test("shutdown with take fiber using Cause.die") {
for {
queue <- Queue.bounded[Int](3)
f <- queue.take.fork
_ <- waitForSize(queue, -1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- f.join.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with offer fiber using Cause.die") {
for {
queue <- Queue.bounded[Int](2)
_ <- queue.offer(1)
_ <- queue.offer(1)
f <- queue.offer(1).fork
_ <- waitForSize(queue, 3)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- f.join.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with offer using Cause.die") {
for {
queue <- Queue.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- queue.offer(1).sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with take using Cause.die") {
for {
queue <- Queue.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- queue.take.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with takeAll using Cause.die") {
for {
queue <- Queue.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- queue.takeAll.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with takeUpTo using Cause.die") {
for {
queue <- Queue.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- queue.takeUpTo(1).sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
},
test("shutdown with size using Cause.die") {
for {
queue <- Queue.bounded[Int](1)
cause = Cause.die(new RuntimeException("test"))
_ <- queue.shutdownCause(cause)
res <- queue.size.sandbox.either
} yield assert(res.left.map(_.untraced))(isLeft(equalTo(cause)))
}
),
suite("back-pressured bounded queue stress testing") {
val genChunk = Gen.chunkOfBounded(20, 100)(smallInt)
List(
Expand Down
2 changes: 1 addition & 1 deletion core-tests/shared/src/test/scala/zio/ZPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ object ZPoolSpec extends ZIOBaseSpec {
scope <- Scope.make
pool <- scope.extend(ZPool.make(get, 10))
_ <- ZIO.scoped(pool.get).fork.repeatN(99)
_ <- scope.close(Exit.succeed(()))
_ <- scope.close(Exit.unit)
_ <- count.get.repeatUntil(_ == 0)
} yield assertCompletes
} @@ exceptJS(nonFlaky) +
Expand Down
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/zio/Dequeue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ sealed trait Dequeue[+A] extends Serializable {
*/
def shutdown(implicit trace: Trace): UIO[Unit]

/**
* Shuts down the Dequeue with a specific Cause, either `Die` or `Interrupt`.
* Future calls to `take*` fail immediately.
*/
def shutdownCause(cause: Cause[Nothing])(implicit trace: Trace): UIO[Unit] = shutdown(trace)
Copy link
Contributor
@mberndt123 mberndt123 Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the original ticket says:

shutdownCause would also return the items currently buffered in the queue in order to dispose of them

Do you plan to implement that?


/**
* Retrieves the size of the queue. This may be negative if fibers are
* suspended waiting for elements to be added to the queue or greater than the
Expand Down
6 changes: 6 additions & 0 deletions core/shared/src/main/scala/zio/Enqueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ sealed trait Enqueue[-A] extends Serializable {
*/
def shutdown(implicit trace: Trace): UIO[Unit]

/**
* Shuts down the queue with a specific Cause, either `Die` or `Interrupt`.
* Future calls to `offer*` and `take*` fail immediately.
*/
def shutdownCause(cause: Cause[Nothing])(implicit trace: Trace): UIO[Unit] = shutdown(trace)

/**
* Retrieves the size of the queue. This may be negative if fibers are
* suspended waiting for elements to be added to the queue or greater than the
Expand Down
Loading
Loading
0