8000 outbound: Cache balancers within profile stack by olix0r · Pull Request #641 · linkerd/linkerd2-proxy · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

outbound: Cache balancers within profile stack #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 66 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
198e90b
fixup Cargo.toml
olix0r Aug 26, 2020
678e90d
wip
olix0r Aug 27, 2020
68da5ae
wip
olix0r Aug 27, 2020
e0556b6
dynamic traffic split middleware
olix0r Aug 27, 2020
c36faac
split layer
olix0r Aug 27, 2020
2412c65
wip
olix0r Aug 27, 2020
aec1af3
dynamic request routes
olix0r Aug 27, 2020
e4384b8
Handle empty overrides
olix0r Aug 27, 2020
ed91beb
touchup: derive clone
olix0r Aug 27, 2020
6962e58
touchup
olix0r Aug 27, 2020
673d1e8
rename http::service to discover
olix0r Aug 27, 2020
83c4bf3
move types around
olix0r Aug 28, 2020
a157e03
Rename profiles modules
olix0r Aug 28, 2020
2ef104d
wip
olix0r Aug 28, 2020
2036545
trace => debug
olix0r Aug 28, 2020
24d88a1
Use ReadyCache in traffic split
olix0r Aug 28, 2020
b952440
outbound compiles
olix0r Aug 28, 2020
69fa690
compiles without route metrics
olix0r Aug 28, 2020
194be9e
compiles; fallback tests fail
olix0r Aug 28, 2020
bd4a5b6
debug
olix0r Aug 29, 2020
0ef01ea
Update tower::ReadyCache to propagate error types
olix0r Aug 29, 2020
2774a01
fixup port
olix0r Aug 29, 2020
e73531c
restore test
olix0r Aug 29, 2020
a411188
bump dep
olix0r Aug 29, 2020
46ffa48
Merge branch 'main' into ver/profile-split
olix0r Aug 29, 2020
ea39d34
-unused Into<Addr> for Target
olix0r Aug 29, 2020
eca3920
undo rename
olix0r Aug 29, 2020
54b43b3
svc: Update stack diagnostic checks
olix0r Aug 29, 2020
8000
4624c60
service-profiles: Eliminate the HasDestination trait
olix0r Aug 29, 2020
8fb3014
dst=>logical
olix0r Aug 29, 2020
82bc5a2
fixup test
olix0r Aug 29, 2020
ab6466f
outbound: Make discovery error detection generic
olix0r Aug 30, 2020
197588c
fixup test
olix0r Aug 30, 2020
6c616d3
Restore disabled portion of profile override test
olix0r Aug 30, 2020
5d9adca
Merge branch 'ver/inbound-logical' into ver/exile-on-main
olix0r Aug 30, 2020
6d3b695
Merge branch 'ver/generic-disco-reject' into ver/exile-on-main
olix0r Aug 30, 2020
6ee71b0
Merge branch 'ver/restore-fallback-test' into ver/exile-on-main
olix0r Aug 30, 2020
9c263a3
Merge branch 'ver/exile-on-main' into ver/profile-split
olix0r Aug 30, 2020
99376bc
fixup reject
olix0r Aug 30, 2020
492f986
service-profiles: Cleanup crate organization
olix0r Aug 30, 2020
ca2fd59
Merge branch 'ver/profile-exports' into ver/exile-on-main
olix0r Aug 30, 2020
69cf514
Merge branch 'ver/exile-on-main' into ver/profile-split
olix0r Aug 30, 2020
7c8e1ef
undo needless change
olix0r Aug 30, 2020
74235fe
Merge branch 'ver/inbound-logical' into ver/profile-split
olix0r Aug 30, 2020
06ebc3e
Merge branch 'ver/inbound-logical' into ver/exile-on-main
olix0r Aug 30, 2020
138951d
Merge branch 'ver/exile-on-main' into ver/profile-split
olix0r Aug 30, 2020
dc06a82
Merge branch 'main' into ver/profile-split
olix0r Aug 31, 2020
8b5d9da
update tower to primary repo
olix0r Aug 31, 2020
58517b3
Update tower to tower-rs/tower@ad348d8
olix0r Aug 31, 2020
e08bf22
Merge branch 'ver/tower' into ver/profile-split
olix0r Aug 31, 2020
ebc12cb
fixup
olix0r Aug 31, 2020
c3e11c7
remove unused modules
olix0r Aug 31, 2020
0012b53
Remove needless http_ variable name prefixes
olix0r Aug 31, 2020
cd68def
fixup
olix0r Aug 31, 2020
7c017e3
split: there must always be at least one target
olix0r Aug 31, 2020
679600f
split: Ensure there is at least one target in all cases
olix0r Aug 31, 2020
a10dc4e
Merge branch 'main' into ver/profile-split
olix0r Aug 31, 2020
0e23979
comment
olix0r Aug 31, 2020
37502fe
outbound: fewer stack checks
olix0r Aug 31, 2020
917ca75
balance => concrete
olix0r Aug 31, 2020
1f3cd56
commentary
olix0r Aug 31, 2020
ac6056a
commentary/clarity
olix0r Aug 31, 2020
ea783c8
fixup assertions
olix0r Aug 31, 2020
3fbebc3
Commentary
olix0r Aug 31, 2020
09df470
outbound: Cache balancers within profile stack
olix0r Aug 31, 2020
15b5de6
fixup comment
olix0r Aug 31, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 9 additions & 32 deletions linkerd/app/inbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ pub struct Target {
pub tls_client_id: tls::PeerIdentity,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Profile(Addr);

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HttpEndpoint {
pub port: u16,
Expand Down Expand Up @@ -103,32 +100,22 @@ impl tls::HasPeerIdentity for TcpEndpoint {

// === impl Profile ===

impl From<Target> for Profile {
fn from(t: Target) -> Self {
Profile(t.logical)
pub(super) fn route(route: profiles::http::Route, target: Target) -> dst::Route {
dst::Route {
route,
target: target.logical,
direction: metric_labels::Direction::In,
}
}

impl AsRef<Addr> for Profile {
fn as_ref(&self) -> &Addr {
&self.0
}
}

impl profiles::WithRoute for Profile {
type Route = dst::Route;
// === impl Target ===

fn with_route(self, route: profiles::http::Route) -> Self::Route {
dst::Route {
route,
target: self.0.clone(),
direction: metric_labels::Direction::In,
}
impl AsRef<Addr> for Target {
fn as_ref(&self) -> &Addr {
&self.logical
}
}

// === impl Target ===

impl http::normalize_uri::ShouldNormalizeUri for Target {
fn should_normalize_uri(&self) -> Option<http::uri::Authority> {
if let http::Settings::Http1 {
Expand Down Expand Up @@ -306,13 +293,3 @@ impl<A> router::Recognize<http::Request<A>> for RequestTarget {
}
}
}

// === impl ProfileTarget ===

impl router::Recognize<Target> for ProfileTarget {
type Key = Profile;

fn recognize(&self, t: &Target) -> Self::Key {
Profile(t.logical.clone())
}
}
130 changes: 60 additions & 70 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

#![deny(warnings, rust_2018_idioms)]

pub use self::endpoint::{
HttpEndpoint, Profile, ProfileTarget, RequestTarget, Target, TcpEndpoint,
};
pub use self::endpoint::{HttpEndpoint, ProfileTarget, RequestTarget, Target, TcpEndpoint};
use self::prevent_loop::PreventLoop;
use self::require_identity_for_ports::RequireIdentityForPorts;
use futures::{future, prelude::*};
Expand Down Expand Up @@ -66,8 +64,9 @@ impl Config {
+ 'static,
S::Error: Into<Error>,
S::Future: Unpin + Send,
P: profiles::GetProfile<Profile> + Unpin + Clone + Send + 'static,
P: profiles::GetProfile<Target> + Unpin + Clone + Send + 'static,
P::Future: Unpin + Send,
P::Error: Send,
{
let tcp_connect = self.build_tcp_connect(&metrics);
let prevent_loop = PreventLoop::from(listen_addr.port());
Expand Down Expand Up @@ -124,7 +123,7 @@ impl Config {
&self,
tcp_connect: C,
prevent_loop: impl Into<PreventLoop>,
http_loopback: L,
loopback: L,
profiles_client: P,
tap_layer: tap::Layer,
metrics: ProxyMetrics,
Expand All @@ -148,8 +147,9 @@ impl Config {
C::Error: Into<Error>,
C::Response: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
C::Future: Unpin + Send,
P: profiles::GetProfile<Profile> + Unpin + Clone + Send + 'static,
P: profiles::GetProfile<Target> + Unpin + Clone + Send + 'static,
P::Future: Unpin + Send,
P::Error: Send,
// The loopback router processes requests sent to the inbound port.
L: tower::Service<Target, Response = S> + Unpin + Send + Clone + 'static,
L::Error: Into<Error>,
Expand Down Expand Up @@ -178,15 +178,15 @@ impl Config {
let prevent_loop = prevent_loop.into();

// Creates HTTP clients for each inbound port & HTTP settings.
let http_endpoint = svc::stack(tcp_connect)
let endpoint = svc::stack(tcp_connect)
.push(http::MakeClientLayer::new(connect.h2_settings))
.push(reconnect::layer({
let backoff = connect.backoff.clone();
move |_| Ok(backoff.stream())
}))
.check_service::<HttpEndpoint>();
.check_make_service::<HttpEndpoint, http::Request<_>>();

let http_target_observability = svc::layers()
let observe = svc::layers()
// Registers the stack to be tapped.
.push(tap_layer)
// Records metrics for each `Target`.
Expand All @@ -197,91 +197,85 @@ impl Config {
.map(|span_sink| SpanConverter::client(span_sink, trace_labels())),
));

let http_profile_route_proxy = svc::proxies()
// Sets the route as a request extension so that it can be used
// by tap.
.push_http_insert_target()
// Records per-route metrics.
.push(metrics.http_route.into_layer::<classify::Response>())
// Sets the per-route response classifier as a request
// extension.
.push(classify::Layer::new())
.check_new_clone::<dst::Route>();

// An HTTP client is created for each target via the endpoint stack.
let http_target_cache = http_endpoint
let target = endpoint
.push_map_target(HttpEndpoint::from)
// Normalizes the URI, i.e. if it was originally in
// absolute-form on the outbound side.
.push(normalize_uri::layer())
.push(http_target_observability)
.check_service::<Target>()
.push(observe)
.into_new_service()
.check_new_service::<Target, http::Request<_>>();

// Attempts to discover a service profile for each logical target (as
// informed by the request's headers). The stack is cached until a
// request has not been received for `cache_max_idle_age`.
let profile = target
.clone()
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
.push_on_response(svc::layers().box_http_request())
// The target stack doesn't use the profile resolution, so drop it.
.push_map_target(|(_, target): (profiles::Receiver, Target)| target)
.push(profiles::http::route_request::layer(
svc::proxies()
// Sets the route as a request extension so that it can be used
// by tap.
.push_http_insert_target()
// Records per-route metrics.
.push(metrics.http_route.into_layer::<classify::Response>())
// Sets the per-route response classifier as a request
// extension.
.push(classify::Layer::new())
.check_new_clone::<dst::Route>()
.push_map_target(|(r, t): (profiles::http::Route, Target)| {
endpoint::route(r, t)
})
.into_inner(),
))
.push(profiles::discover::layer(profiles_client))
.into_new_service()
.cache(
svc::layers().push_on_response(
svc::layers()
// If the service has been unavailable for an extended time, eagerly
// fail requests.
.push_failfast(dispatch_timeout)
// Shares the service, ensuring discovery errors are propagated.
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
.push(metrics.stack.layer(stack_labels("target"))),
.push(metrics.stack.layer(stack_labels("profile")))
.box_http_response(),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|_: &Target| info_span!("target"))
.check_service::<Target>();
.instrument(|_: &Target| info_span!("profile"))
.check_make_service::<Target, http::Request<_>>();

// Routes targets to a Profile stack, i.e. so that profile
// resolutions are shared even as the type of request may vary.
let http_profile_cache = http_target_cache
.clone()
.push_on_response(svc::layers().box_http_request())
.check_service::<Target>()
// Provides route configuration without pdestination overrides.
.push(profiles::Layer::without_overrides(
profiles_client,
http_profile_route_proxy.into_inner(),
))
.into_new_service()
// Caches profile stacks.
.check_new_service::<Profile, Target>()
let forward = target
.cache(
svc::layers().push_on_response(
svc::layers()
// If the service has been unavailable for an extended time, eagerly
// fail requests.
.push_failfast(dispatch_timeout)
// Shares the service, ensuring discovery errors are propagated.
.push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age)
.push(metrics.stack.layer(stack_labels("profile"))),
F438 .push(metrics.stack.layer(stack_labels("forward")))
.box_http_response(),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|p: &Profile| info_span!("profile", addr = %p.as_ref()))
.check_make_service::<Profile, Target>()
.push(router::Layer::new(|()| ProfileTarget))
.check_new_service::<(), Target>()
.new_service(());

svc::stack(http_profile_cache)
.push_on_response(svc::layers().box_http_response())
.instrument(|_: &Target| info_span!("forward"))
.check_make_service::<Target, http::Request<http::boxed::Payload>>();

// Attempts to resolve the target as a service profile or, if that
// fails, skips that stack to forward to the local endpoint.
profile
.push_make_ready()
.push_fallback(
http_target_cache
.push_on_response(svc::layers().box_http_response().box_http_request()),
)
.push_fallback(forward)
// If the traffic is targeted at the inbound port, send it through
// the loopback service (i.e. as a gateway). This is done before
// caching so that the loopback stack can determine whether it
// should cache or not.
.push(admit::AdmitLayer::new(prevent_loop))
.push_fallback_on_error::<prevent_loop::LoopPrevented, _>(
svc::stack(http_loopback)
.push_on_response(svc::layers().box_http_request())
svc::stack(loopback)
.check_make_service::<Target, http::Request<_>>()
.into_inner(),
)
.check_service::<Target>()
.check_make_service::<Target, http::Request<http::boxed::Payload>>()
.into_inner()
}

Expand Down Expand Up @@ -335,13 +329,9 @@ impl Config {
// Synthesizes responses for proxy errors.
.push(errors::layer());

let http_server_observability = svc::layers()
.push(TraceContextLayer::new(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
// // Tracks proxy handletime.
// .push(metrics.http_handle_time.layer())
;
let http_server_observability = svc::layers().push(TraceContextLayer::new(
span_sink.map(|span_sink| SpanConverter::server(span_sink, trace_labels())),
));

let http_server = svc::stack(http_router)
// Ensures that the built service is ready before it is returned
Expand Down
44 changes: 5 additions & 39 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,9 @@ pub type Logical<T> = Target<T>;

pub type Concrete<T> = Target<Logical<T>>;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Profile(Addr);

#[derive(Clone, Debug)]
pub struct LogicalPerRequest(listen::Addrs);

#[derive(Clone, Debug)]
pub struct ProfilePerTarget;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Target<T> {
pub addr: Addr,
Expand Down Expand Up @@ -102,12 +96,6 @@ impl<T: http::settings::HasSettings> http::normalize_uri::ShouldNormalizeUri for
}
}

impl<T> profiles::OverrideDestination for Target<T> {
fn dst_mut(&mut self) -> &mut Addr {
&mut self.addr
}
}

impl<T: http::settings::HasSettings> http::settings::HasSettings for Target<T> {
fn http_settings(&self) -> http::Settings {
self.inner.http_settings()
Expand Down Expand Up @@ -418,32 +406,10 @@ impl<B> router::Recognize<http::Request<B>> for LogicalPerRequest {
}
}

// === impl ProfilePerTarget ===

impl<T> router::Recognize<Target<T>> for ProfilePerTarget {
type Key = Profile;

fn recognize(&self, t: &Target<T>) -> Self::Key {
Profile(t.addr.clone())
}
}

// === impl Profile ===

impl AsRef<Addr> for Profile {
fn as_ref(&self) -> &Addr {
&self.0
}
}

impl profiles::WithRoute for Profile {
type Route = dst::Route;

fn with_route(self, route: profiles::http::Route) -> Self::Route {
dst::Route {
route,
target: self.0.clone(),
direction: metric_labels::Direction::Out,
}
pub fn route<T>(route: profiles::http::Route, target: Logical<T>) -> dst::Route {
dst::Route {
route,
target: target.addr,
direction: metric_labels::Direction::Out,
}
}
Loading
0