8000 Fix channel drain close race by steinybot · Pull Request #1193 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix channel drain close race #1193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2025
Merged

Conversation

steinybot
Copy link
Collaborator

Problem

Given:

  • A channel.
  • A producer that puts items into the channel.
  • The produce closes the channel either:
    1. when it is empty via close or,
    2. using closeAwaitEmpty.
  • A consumer that reads from the channel optimistically using drain and,
  • if empty, the consumer uses take to block until there are new items or the channel is closed.

Then:

  • There is a race condition where the consumer is in the middle of draining,
  • the channel is not empty,
  • the producer closes the channel,
  • the consumer returns Closed rather than the items drained so far,
  • and so items are effectively lost from the end of the channel.

Solution

If draining encounters a Failure[Closed] and items have been drained, then returns those items.

Notes

This is a little tricky to reproduce as it relies on timing. For whatever reason it was fairly reliable to reproduce when running all the gRPC tests.

I had asked about this and @fwbrasil had said:

I'd say the behavior for the draining operations are expected since they just obtain the values but don't guarantee that new items won't arrive. The trouble is the closing since the pending backlog in this case is returned to the producer given it's the one to call close. The returned backlog to the producer should contain the missing message.

#1191 was an attempt to resolve this although unfortunately it does not (although it does make other parts of the above pattern much easier to implement).

With this change, it still doesn't guarantee that new items won't arrive. The guarantee is that if it wasn't closed when drain was first called (really when the underlying queue drain is first called) then it will always return a success.

@steinybot
Copy link
Collaborator Author

Only one test failed here but it was the one with closeAwaitEmpty: https://github.com/getkyo/kyo/actions/runs/15068698744/job/42359374252#step:6:6695

@steinybot
Copy link
Collaborator Author

This run was "better". 5 tests failed: https://github.com/getkyo/kyo/actions/runs/15068698744/job/42359899400
Screenshot 2025-05-17 at 1 02 57 AM

@steinybot steinybot marked this pull request as ready for review May 16, 2025 13:04
@fwbrasil
Copy link
Collaborator

@steinybot not sure I understood, are the tests still flaky?

private def asResult[A](value: A): Result[Closed, A] =
if isClosed.get() then closedResult else Result.succeed(value)

private def asOptimisticResult[A](value: Chunk[A]): Result[Closed, Chunk[A]] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name doesn't seem very informative, how about asDrainResult? Could you also add a comment explaining the edge case the method handles?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. I went for a slightly different naming. Let me know what you think.

@@ -633,19 +639,20 @@ object Channel:

def drain()(using AllowUnsafe) =
@tailrec
def loop(current: Chunk[A]): Result[Closed, Chunk[A]] =
def loop(current: Chunk[A], i: Int): Result[Closed, Chunk[A]] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is i unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, this was left in for verifying the tests. Removed.

@@ -1102,4 +1120,52 @@ class ChannelTest extends Test:
}
}

private def verifyRaceDrainWithClose(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I imagine it was a though one to debug 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha yeah, the closer I got to minimising the problem it started disappearing. Good ol' Heisenbug. It seems obvious in hindsight but I have been looking at that code for so long I failed to notice.

}
}
_ <- producer.interrupt
yield assert(result.isSuccess)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a bit better. It could actually check that the right number of elements were drained. I'd have to swap things around a bit. I should be able to simplify this a bit so long as it doesn't mess up the timings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a different version of these tests which are easier to read and test the actual counts but to get similar failure rates it runs about 5 times slower (~500ms on my M1): https://github.com/steinybot/kyo/pull/2/files

Not sure if it is worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having better coverage of an edge case is worth it! If we see it's impacting build times too much, we can follow up later

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm merging the PR to stabilize the build but feel free to post follow ups

@steinybot
Copy link
Collaborator Author

@steinybot not sure I understood, are the tests still flaky?

Ah sorry, no, they shouldn't be flaky. I first commited the tests to prove that they failed, then commited the fix. For some reason I thought that my comment would appear in the activity before the later commit.

@steinybot steinybot requested a review from fwbrasil May 17, 2025 09:31
@steinybot steinybot mentioned this pull request May 20, 2025
@fwbrasil fwbrasil merged commit f0702ef into getkyo:main May 21, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0