-
Notifications
You must be signed in to change notification settings - Fork 294
rewrite buffer
to use bounded MPSC
#635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
T::Future: Send + 'static, | ||
Request: Send + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the unfortunate new bounds i mentioned in the PR description
// fill the channel so all subsequent requests will wait for capacity | ||
let service1 = assert_ready_ok!(task::spawn(service.ready()).poll()); | ||
assert_pending!(worker.poll()); | ||
let mut response2 = task::spawn(service1.call("world")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test required adding an additional message to the channel for the service to be "full", I described this in the PR description
assert!(worker.is_woken(), "worker task should be woken by request"); | ||
assert_pending!(wo 8000 rker.poll()); | ||
|
||
// fill the channel so all subsequent requests will wait for capacity | ||
let service1 = assert_ready_ok!(task::spawn(service.ready()).poll()); | ||
assert_pending!(worker.poll()); | ||
let mut response2 = task::spawn(service1.call("world")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test required adding an additional message to the channel for the buffer to be "full", I described this in the PR description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me!
## Motivation Currently, `tower::buffer` uses `tokio::sync::mpsc`'s _unbounded_ channel plus a `tokio::sync::Semaphore`, in order to re-implement a bounded channel. This was necessary because when this code was updated to the latest version of `tokio`, there was no way to reserve a non-borrowed send permit from a `Sender`. Thus, it was necessary to use the `Semaphore` for the future that is polled in `poll_ready` to acquire send capacity, since a `Permit` from the `Sender` could not be stored in the struct until it's consumed in `call`. This code is Not Ideal. Reimplementing the bounded channel makes the implementation more complicated, and means that there is a bunch of extra stuff we have to do to e.g. propagate cancellations/service errors to tasks waiting on `poll_ready`. The bounded MPSC would solve most of this for us. It might also be a bit more efficient, since we would only have a single reference-counted heap allocation (the `Sender`), rather than two (the `Sender` _and_ the `Arc<Semaphore>`). Now that `tokio::sync::mpsc::Sender` has a `reserve_owned` method, we no longer need to use the semaphore, and can simplify the buffer implementation a bit. ## Solution This branch changes the buffer to use only a single bounded MPSC, rather than an unbounded MPSC and a semaphore. The bounded MPSC internally manages its semaphore, so we can now remove a lot of complexity in the current implementation. I had to change some of the integration tests slightly as part of this change. This is because the buffer implementation using semaphore permits is _very subtly_ different from one using a bounded channel. In the `Semaphore`-based implementation, a semaphore permit is stored in the `Message` struct sent over the channel. This is so that the capacity is used as long as the message is in flight. However, when the worker task is processing a message that's been recieved from the channel, the permit is still not dropped. Essentially, the one message actively held by the worker task _also_ occupies one "slot" of capacity, so the actual channel capacity is one less than the value passed to the constructor, _once the first request has been sent to the worker_. The bounded MPSC changed this behavior so that capacity is only occupied while a request is actually in the channel, which broke some tests that relied on the old (and technically wrong) behavior. ## Notes There is one sort of significant issue with this change, which is why it's currently a draft. The issue is that this unfortunately requires adding `Send` and `'static` bounds to the `T::Future` and `Request` types. This is because they occur within the `Message` type that's sent over the channel, and a MPSC's `OwnedPermit<T>` for a message of type `T` is only `Send` or `'static` when `T` is. The `ReusableBoxFuture` that the `reserve_owned` future is stored in requires the future be `Send + 'static`, which it is not when `OwnedPermit` isn't. I don't believe that it's actually _necessary_ for the `OwnedPermit<T>` type to require `T: Send` in order to be `Send`, or `T: 'static` in order to be valid for the `'static` lifetime. An `OwnedPermit` never actually contains an instance of type `T`, it just represents the _capacity_ to send that type to the channel. The channel itself will actually contain the values of type `T`. Therefore, it's possible this could be changed upstream in Tokio, although I haven't looked into it yet. In practice, these bounds shouldn't really be a problem, but adding them is a breaking change. We could alternatively just wait for `tower` 0.5 to merge this PR.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
in `tokio-util` v0.7, the semantics of `PollSender` was changed to have a `poll_reserve` rather than `poll_send_done`, which is the required behavior for `Buffer`. The implementation of `PollSender` now looks more or less identical to the code I had written using `ReusableBoxFuture` and `reserve_owned`. Therefore, we can now just use the `tokio_util::sync::PollSender` type, and the buffer implementation is now much simpler.
6ad7e0a
to
d71ddf6
Compare
Now that |
@hawkw is this still blocked on the breaking change? |
yes, this introduces trait bounds that were previously not required. It's possible an upstream change in |
I vote that we go ahead with a breaking change for this? |
cc @davidpdrsn |
< 8000 p dir="auto">Yeah, I'm hoping to prepare an 0.5 in the next couple days. we should ship another 0.4 release first though, which is why it's a draft. I do also want to look at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm cool with moving ahead with a 0.5
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
in tower-rs#635, some subtle breaking changes were made to how `Buffer` works. this is documented in the description of that PR, here: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. this is a change in behavior that might affect downstream callers. this commit adds mention of these changes to the changelog, to help consumers navigate the upgrade from tower 0.4 to 0.5. Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
in #635, some subtle breaking changes were made to how `Buffer` works. this is documented in the description of that PR, here: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. this is a change in behavior that might affect downstream callers. this commit adds mention of these changes to the changelog, to help consumers navigate the upgrade from tower 0.4 to 0.5. Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io>
* chore(deps)!: upgrade to tower 0.5 this commit updates our tower dependency from 0.4 to 0.5. note that this commit does not affect the `tower-service` and `tower-layer` crates, reëxported by `tower` itself. the `Service<T>` trait and the closely related `Layer<S>` trait have not been changed. the `tower` crate's utilities have changed in various ways, some of particular note for the linkerd2 proxy. see these items, excerpted from the tower changelog: - **retry**: **Breaking Change** `retry::Policy::retry` now accepts `&mut Req` and `&mut Res` instead of the previous mutable versions. This increases the flexibility of the retry policy. To update, update your method signature to include `mut` for both parameters. ([tower-rs/tower#584]) - **retry**: **Breaking Change** Change Policy to accept &mut self ([tower-rs/tower#681]) - **retry**: **Breaking Change** `Budget` is now a trait. This allows end-users to implement their own budget and bucket implementations. ([tower-rs/tower#703]) - **util**: **Breaking Change** `Either::A` and `Either::B` have been renamed `Either::Left` and `Either::Right`, respectively. ([tower-rs/tower#637]) - **util**: **Breaking Change** `Either` now requires its two services to have the same error type. ([tower-rs/tower#637]) - **util**: **Breaking Change** `Either` no longer implemenmts `Future`. ([tower-rs/tower#637]) - **buffer**: **Breaking Change** `Buffer<S, Request>` is now generic over `Buffer<Request, S::Future>.` ([tower-rs/tower#654]) see: * <tower-rs/tower#584> * <tower-rs/tower#681> * <tower-rs/tower#703> * <tower-rs/tower#637> * <tower-rs/tower#654> the `Either` trait bounds are particularly impactful for us. because this runs counter to how we treat errors (skewing towards boxed errors, in general), we temporarily vendor a version of `Either` from the 0.4 release, whose variants have been renamed to match the 0.5 interface. updating to box the inner `A` and `B` services' errors, so we satiate the new `A::Error = B::Error` bounds, can be addressed as a follow-on. that's intentionally left as a separate change, due to the net size of our patchset between this branch and #3504. * <tower-rs/tower@v0.4.x...master> * <https://github.com/tower-rs/tower/blob/master/tower/CHANGELOG.md> this work is based upon #3504. for more information, see: * linkerd/linkerd2#8733 * #3504 Signed-off-by: katelyn martin <kate@buoyant.io> X-Ref: tower-rs/tower#815 X-Ref: tower-rs/tower#817 X-Ref: tower-rs/tower#818 X-Ref: tower-rs/tower#819 * fix(stack/loadshed): update test affected by tower-rs/tower#635 this commit updates a test that was affected by breaking changes in tower's `Buffer` middleware. see this excerpt from the description of that change: > I had to change some of the integration tests slightly as part of this > change. This is because the buffer implementation using semaphore > permits is _very subtly_ different from one using a bounded channel. In > the `Semaphore`-based implementation, a semaphore permit is stored in > the `Message` struct sent over the channel. This is so that the capacity > is used as long as the message is in flight. However, when the worker > task is processing a message that's been recieved from the channel, > the permit is still not dropped. Essentially, the one message actively > held by the worker task _also_ occupies one "slot" of capacity, so the > actual channel capacity is one less than the value passed to the > constructor, _once the first request has been sent to the worker_. The > bounded MPSC changed this behavior so that capacity is only occupied > while a request is actually in the channel, which broke some tests > that relied on the old (and technically wrong) behavior. bear particular attention to this: > The bounded MPSC changed this behavior so that capacity is only > occupied while a request is actually in the channel, which broke some > tests that relied on the old (and technically wrong) behavior. that pr adds an additional message to the channel in tests exercising the laod-shedding behavior, on account of the previous (incorrect) behavior. https://github.com/tower-rs/tower/pull/635/files#r797108274 this commit performs the same change for our corresponding test, adding an additional `ready()` call before we hit the buffer's limit. Signed-off-by: katelyn martin <kate@buoyant.io> * review: use vendored `Either` for consistency #3744 (comment) Signed-off-by: katelyn martin <kate@buoyant.io> --------- Signed-off-by: katelyn martin <kate@buoyant.io>
Motivation
Currently,
tower::buffer
usestokio::sync::mpsc
's unboundedchannel plus a
tokio::sync::Semaphore
, in order to re-implement abounded channel. This was necessary because when this code was updated
to the latest version of
tokio
, there was no way to reserve anon-borrowed send permit from a
Sender
. Thus, it was necessary to usethe
Semaphore
for the future that is polled inpoll_ready
to acquiresend capacity, since a
Permit
from theSender
could not be stored inthe struct until it's consumed in
call
.This code is Not Ideal. Reimplementing the bounded channel makes the
implementation more complicated, and means that there is a bunch of
extra stuff we have to do to e.g. propagate cancellations/service errors
to tasks waiting on
poll_ready
. The bounded MPSC would solve most ofthis for us. It might also be a bit more efficient, since we would only
have a single reference-counted heap allocation (the
Sender
), ratherthan two (the
Sender
and theArc<Semaphore>
).Now that
tokio::sync::mpsc::Sender
has areserve_owned
method, we nolonger need to use the semaphore, and can simplify the buffer
implementation a bit.
Solution
This branch changes the buffer to use only a single bounded MPSC, rather
than an unbounded MPSC and a semaphore. The bounded MPSC internally
manages its semaphore, so we can now remove a lot of complexity in the
current implementation.
I had to change some of the integration tests slightly as part of this
change. This is because the buffer implementation using semaphore
permits is very subtly different from one using a bounded channel. In
the
Semaphore
-based implementation, a semaphore permit is stored inthe
Message
struct sent over the channel. This is so that the capacityis used as long as the message is in flight. However, when the worker
task is processing a message that's been recieved from the channel, the
permit is still not dropped. Essentially, the one message actively held
by the worker task also occupies one "slot" of capacity, so the actual
channel capacity is one less than the value passed to the constructor,
once the first request has been sent to the worker. The bounded MPSC
changed this behavior so that capacity is only occupied while a request
is actually in the channel, which broke some tests that relied on the
old (and technically wrong) behavior.
Notes
There is one sort of significant issue with this change, which is why
it's currently a draft. The issue is that this unfortunately requires
adding
Send
and'static
bounds to theT::Future
andRequest
types. This is because they occur within the
Message
type that's sentover the channel, and a MPSC's
OwnedPermit<T>
for a message of typeT
is onlySend
or'static
whenT
is. TheReusableBoxFuture
that the
reserve_owned
future is stored in requires the future beSend + 'static
, which it is not whenOwnedPermit
isn't.I don't believe that it's actually necessary for the
OwnedPermit<T>
type to require
T: Send
in order to beSend
, orT: 'static
inorder to be valid for the
'static
lifetime. AnOwnedPermit
neveractually contains an instance of type
T
, it just represents thecapacity to send that type to the channel. The channel itself will
actually contain the values of type
T
. Therefore, it's possible thiscould be changed upstream in Tokio, although I haven't looked into it
yet.
In practice, these bounds shouldn't really be a problem, but adding them
is a breaking change. We could alternatively just wait for
tower
0.5to merge this PR.