-
Notifications
You must be signed in to change notification settings - Fork 64
Fix channel drain close race #1193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and
privacy statement. We’ll occasionally send you account related emails.
Already on GitHub?
Sign in
to your account
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Only one test failed here but it was the one with |
This run was "better". 5 tests failed: https://github.com/getkyo/kyo/actions/runs/15068698744/job/42359899400 |
@steinybot not sure I understood, are the tests still flaky? |
private def asResult[A](value: A): Result[Closed, A] = | ||
if isClosed.get() then closedResult else Result.succeed(value) | ||
|
||
private def asOptimisticResult[A](value: Chunk[A]): Result[Closed, Chunk[A]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name doesn't seem very informative, how about asDrainResult
? Could you also add a comment explaining the edge case the method handles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing. I went for a slightly different naming. Let me know what you think.
@@ -633,19 +639,20 @@ object Channel: | |||
|
|||
def drain()(using AllowUnsafe) = | |||
@tailrec | |||
def loop(current: Chunk[A]): Result[Closed, Chunk[A]] = | |||
def loop(current: Chunk[A], i: Int): Result[Closed, Chunk[A]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is i
unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, this was left in for verifying the tests. Removed.
@@ -1102,4 +1120,52 @@ class ChannelTest extends Test: | |||
} | |||
} | |||
|
|||
private def verifyRaceDrainWithClose( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I imagine it was a though one to debug 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha yeah, the closer I got to minimising the problem it started disappearing. Good ol' Heisenbug. It seems obvious in hindsight but I have been looking at that code for so long I failed to notice.
} | ||
} | ||
_ <- producer.interrupt | ||
yield assert(result.isSuccess) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a bit better. It could actually check that the right number of elements were drained. I'd have to swap things around a bit. I should be able to simplify this a bit so long as it doesn't mess up the timings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a different version of these tests which are easier to read and test the actual counts but to get similar failure rates it runs about 5 times slower (~500ms on my M1): https://github.com/steinybot/kyo/pull/2/files
Not sure if it is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having better coverage of an edge case is worth it! If we see it's impacting build times too much, we can follow up later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm merging the PR to stabilize the build but feel free to post follow ups
Ah sorry, no, they shouldn't be flaky. I first commited the tests to prove that they failed, then commited the fix. For some reason I thought that my comment would appear in the activity before the later commit. |
Problem
Given:
close
or,closeAwaitEmpty
.drain
and,take
to block until there are new items or the channel is closed.Then:
Closed
rather than the items drained so far,Solution
If draining encounters a
Failure[Closed]
and items have been drained, then returns those items.Notes
This is a little tricky to reproduce as it relies on timing. For whatever reason it was fairly reliable to reproduce when running all the gRPC tests.
I had asked about this and @fwbrasil had said:
#1191 was an attempt to resolve this although unfortunately it does not (although it does make other parts of the above pattern much easier to implement).
With this change, it still doesn't guarantee that new items won't arrive. The guarantee is that if it wasn't closed when drain was first called (really when the underlying queue drain is first called) then it will always return a success.