-
Notifications
You must be signed in to change notification settings - Fork 63
[core] Channel/Queue closeAndWaitEmpty #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
private def opClosed: Maybe[Result.Error[Closed]] = | ||
state.get() match | ||
case State.FullyClosed(r) => Present(r) | ||
case _ => Absent | ||
|
||
protected inline def op[A](inline f: => A): Result[Closed, A] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The half open state allows all operations (op
method) and fails in case of offers or adds (offerOp
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very neat. Thanks for doing this.
Approving since it looks good, only nits.
state.get() match | ||
case State.Open => Absent | ||
case State.HalfOpen(_, r) => Present(r) | ||
case State.FullyClosed(r) => Present(r) | ||
|
||
protected inline def offerOp[A](inline f: => Boolean, inline raceRepair: => Boolean): Result[Closed, Boolean] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this change but it looks like A
is unused here.
* closed. | ||
* | ||
* @return | ||
* true if the queue was successfully closed and emptied, false if it was already closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth elaborating that it could be false
if half open?
If two producers attempt to call closeAwaitEmpty
on a non-empty queue, one will block, the other will return false
. If the other then calls closed
it would be false
which might be confusing.
_closed.get().getOrElse(Result(f)) | ||
opClosed.getOrElse { | ||
val r = Result(f) | ||
handleHalfOpen() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit picking but is it worth skipping this for operations that don't remove elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
@@ -455,6 +512,7 @@ object Queue: | |||
loop(0) | |||
b.result() | |||
end _drain | |||
protected def _isEmpty() = q.isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should the others be protected too? Same thing with the _drain
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, it's actually unnecessary since anonymous classes are final. I've removed protected
_ <- c.put(1) | ||
_ <- c.put(2) | ||
fiber <- Async.run(c.closeAwaitEmpty) | ||
_ <- c.take |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check that closed
is false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about checking:
- 2 producers calling
closeAwaitEmpty
- 2 producers, one calling
closeAwaitEmpty
and another callingclose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here too if you agree.
Fixes #721
Problem
Queue
andChannel
don't offer a way to close them only after all pending items are consumed. This is an important limitation in producer/consumer scenarios where the producer has has finished processing but it needs to wait for consumers to drain the queue or channel before closing it.Solution
Introduce
Channel.closeAndWaitEmpty
andQueue.closeAndWaitEmpty