-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Enqueue+Dequeue: add shutdownCause
method
#9928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/2.x
Are you sure you want to change the base?
Conversation
val capacity: Int = | ||
hub.capacity | ||
def isShutdown(implicit trace: Trace): UIO[Boolean] = | ||
ZIO.succeed(shutdownFlag.get) | ||
def publish(a: A)(implicit trace: Trace): UIO[Boolean] = | ||
ZIO.suspendSucceed { | ||
if (shutdownFlag.get) ZIO.interrupt | ||
if (shutdownFlag.get) interrupted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could actually unify on the shutdownHook
being the single atomic for this class. Checking promise.unsafe.isDone
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I actually thought about doing that in the past as well. Not sure why I didn't do it 🤔
- implement method for `Hub` and `Queue`
@@ -161,14 +167,16 @@ object Queue extends QueuePlatformSpecific { | |||
strategy: Strategy[A] | |||
) extends Queue[A] { | |||
|
|||
private def interrupted(implicit trace: Trace): UIO[Nothing] = shutdownHook.await *> ZIO.interrupt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If shutdownHook
was fulfilled with an interruption then we don't need the *> ZIO.interrupt
(AFAICT)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't typecheck, but yes
* Shuts down the Dequeue with a specific Cause, either `Die` or `Interrupt`. | ||
* Future calls to `take*` fail immediately. | ||
*/ | ||
def shutdownCause(cause: Cause[Nothing])(implicit trace: Trace): UIO[Unit] = shutdown(trace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the original ticket says:
shutdownCause
would also return the items currently buffered in the queue in order to dispose of them
Do you plan to implement that?
/claim #9844
Hub
andQueue