8000 [core] Channel/Queue closeAndWaitEmpty by fwbrasil · Pull Request #1191 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[core] Channel/Queue closeAndWaitEmpty #1191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from 8000
May 15, 2025
Merged

[core] Channel/Queue closeAndWaitEmpty #1191

merged 5 commits into from
May 15, 2025

Conversation

fwbrasil
Copy link
Collaborator

Fixes #721

Problem

Queue and Channel don't offer a way to close them only after all pending items are consumed. This is an important limitation in producer/consumer scenarios where the producer has has finished processing but it needs to wait for consumers to drain the queue or channel before closing it.

Solution

Introduce Channel.closeAndWaitEmpty and Queue.closeAndWaitEmpty

private def opClosed: Maybe[Result.Error[Closed]] =
state.get() match
case State.FullyClosed(r) => Present(r)
case _ => Absent

protected inline def op[A](inline f: => A): Result[Closed, A] =
Copy link
Collaborator Author
@fwbrasil fwbrasil May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The half open state allows all operations (op method) and fails in case of offers or adds (offerOp)

Copy link
Collaborator
@steinybot steinybot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very neat. Thanks for doing this.

Approving since it looks good, only nits.

state.get() match
case State.Open => Absent
case State.HalfOpen(_, r) => Present(r)
case State.FullyClosed(r) => Present(r)

protected inline def offerOp[A](inline f: => Boolean, inline raceRepair: => Boolean): Result[Closed, Boolean] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this change but it looks like A is unused here.

* closed.
*
* @return
* true if the queue was successfully closed and emptied, false if it was already closed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth elaborating that it could be false if half open?

If two producers attempt to call closeAwaitEmpty on a non-empty queue, one will block, the other will return false. If the other then calls closed it would be false which might be confusing.

_closed.get().getOrElse(Result(f))
opClosed.getOrElse {
val r = Result(f)
handleHalfOpen()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit picking but is it worth skipping this for operations that don't remove elements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@@ -455,6 +512,7 @@ object Queue:
loop(0)
b.result()
end _drain
protected def _isEmpty() = q.isEmpty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should the others be protected too? Same thing with the _drains.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, it's actually unnecessary since anonymous classes are final. I've removed protected

_ <- c.put(1)
_ <- c.put(2)
fiber <- Async.run(c.closeAwaitEmpty)
_ <- c.take
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that closed is false?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking:

  • 2 producers calling closeAwaitEmpty
  • 2 producers, one calling closeAwaitEmpty and another calling close

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here too if you agree.

@fwbrasil fwbrasil merged commit 0155542 into main May 15, 2025
3 checks passed
@fwbrasil fwbrasil deleted the closeAwaitEmpty branch May 15, 2025 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ability to close Channel after remaining elements have been taken
2 participants
0