8000 profiles: Do not rely on tuples as stack targets by olix0r · Pull Request #650 · linkerd/linkerd2-proxy · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

profiles: Do not rely on tuples as stack targets #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linkerd/app/gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod test {
let socket_addr = SocketAddr::from(([127, 0, 0, 1], 4143));
let target = inbound::Target {
socket_addr,
logical: dst_name
dst: dst_name
.map(|n| NameAddr::from_str(n).unwrap().into())
.unwrap_or_else(|| socket_addr.into()),
http_settings: http::Settings::Http2,
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/gateway/src/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where

fn call(&mut self, target: inbound::Target) -> Self::Future {
let inbound::Target {
logical,
dst,
http_settings,
tls_client_id,
..
Expand All @@ -85,7 +85,7 @@ where
}
};

let orig_dst = match logical.into_name_addr() {
let orig_dst = match dst.into_name_addr() {
Some(n) => n,
None => {
return Box::pin(future::ok(Gateway::NoAuthority));
Expand Down
48 changes: 37 additions & 11 deletions linkerd/app/inbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ use tracing::debug;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Target {
pub logical: Addr,
pub dst: Addr,
pub socket_addr: SocketAddr,
pub http_settings: http::Settings,
pub tls_client_id: tls::PeerIdentity,
}

#[derive(Clone, Debug)]
pub struct Logical {
target: Target,
profiles: profiles::Receiver,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HttpEndpoint {
pub port: u16,
Expand Down Expand Up @@ -100,10 +106,10 @@ impl tls::HasPeerIdentity for TcpEndpoint {

// === impl Profile ===

pub(super) fn route(route: profiles::http::Route, target: Target) -> dst::Route {
pub(super) fn route((route, logical): (profiles::http::Route, Logical)) -> dst::Route {
dst::Route {
route,
target: target.logical,
target: logical.target.dst,
direction: metric_labels::Direction::In,
}
}
Expand All @@ -112,7 +118,7 @@ pub(super) fn route(route: profiles::http::Route, target: Target) -> dst::Route

impl AsRef<Addr> for Target {
fn as_ref(&self) -> &Addr {
&self.logical
&self.dst
}
}

Expand All @@ -123,7 +129,7 @@ impl http::normalize_uri::ShouldNormalizeUri for Target {
..
} = self.http_settings
{
return Some(self.logical.to_http_authority());
return Some(self.dst.to_http_authority());
}
None
}
Expand All @@ -144,7 +150,7 @@ impl tls::HasPeerIdentity for Target {
impl Into<metric_labels::EndpointLabels> for Target {
fn into(self) -> metric_labels::EndpointLabels {
metric_labels::EndpointLabels {
authority: self.logical.name_addr().map(|d| d.as_http_authority()),
authority: self.dst.name_addr().map(|d| d.as_http_authority()),
direction: metric_labels::Direction::In,
tls_id: self.tls_client_id.map(metric_labels::TlsId::ClientId),
labels: None,
Expand Down Expand Up @@ -205,7 +211,7 @@ impl tap::Inspect for Target {

impl fmt::Display for Target {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.logical.fmt(f)
self.dst.fmt(f)
}
}

Expand All @@ -214,7 +220,7 @@ impl stack_tracing::GetSpan<()> for Target {
use tracing::info_span;

match self.http_settings {
http::Settings::Http2 => match self.logical.name_addr() {
http::Settings::Http2 => match self.dst.name_addr() {
None => info_span!(
"http2",
port = %self.socket_addr.port(),
Expand All @@ -229,7 +235,7 @@ impl stack_tracing::GetSpan<()> for Target {
keep_alive,
wants_h1_upgrade,
was_absolute_form,
} => match self.logical.name_addr() {
} => match self.dst.name_addr() {
None => info_span!(
"http1",
port = %self.socket_addr.port(),
Expand Down Expand Up @@ -262,7 +268,7 @@ impl<A> router::Recognize<http::Request<A>> for RequestTarget {
type Key = Target;

fn recognize(&self, req: &http::Request<A>) -> Self::Key {
let logical = req
let dst = req
.headers()
.get(CANONICAL_DST_HEADER)
.and_then(|dst| {
Expand All @@ -286,10 +292,30 @@ impl<A> router::Recognize<http::Request<A>> for RequestTarget {
.unwrap_or_else(|| self.accept.addrs.target_addr().into());

Target {
logical,
dst,
socket_addr: self.accept.addrs.target_addr(),
tls_client_id: self.accept.peer_identity.clone(),
http_settings: http::Settings::from_request(req),
}
}
}

impl From<Logical> for Target {
fn from(Logical { target, .. }: Logical) -> Self {
target
}
}

// === impl Logical ===

impl From<(profiles::Receiver, Target)> for Logical {
fn from((profiles, target): (profiles::Receiver, Target)) -> Self {
Self { profiles, target }
}
}

impl AsRef<profiles::Receiver> for Logical {
fn as_ref(&self) -> &profiles::Receiver {
&self.profiles
}
}
7 changes: 3 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Config {
.check_new_service::<Target, http::Request<http::boxed::Payload>>()
.push_on_response(svc::layers().box_http_request())
// The target stack doesn't use the profile resolution, so drop it.
.push_map_target(|(_, target): (profiles::Receiver, Target)| target)
.push_map_target(endpoint::Target::from)
.push(profiles::http::route_request::layer(
svc::proxies()
// Sets the route as a request extension so that it can be used
Expand All @@ -226,11 +226,10 @@ impl Config {
// extension.
.push(classify::Layer::new())
.check_new_clone::<dst::Route>()
.push_map_target(|(r, t): (profiles::http::Route, Target)| {
endpoint::route(r, t)
})
.push_map_target(endpoint::route)
.into_inner(),
))
.push_map_target(endpoint::Logical::from)
.push(profiles::discover::layer(profiles_client))
.into_new_service()
.cache(
Expand Down
45 changes: 43 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub struct Target<T> {
pub inner: T,
}

#[derive(Clone, Debug)]
pub struct Profile {
pub rx: profiles::Receiver,
pub target: Target<HttpEndpoint>,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct HttpEndpoint {
pub addr: SocketAddr,
Expand All @@ -49,6 +55,15 @@ pub struct TcpEndpoint {
pub identity: tls::PeerIdentity,
}

impl From<(Addr, Profile)> for Concrete<http::Settings> {
fn from((addr, Profile { target, .. }): (Addr, Profile)) -> Self {
Self {
addr,
inner: target.map(|e| e.settings),
}
}
}

// === impl Target ===

impl<T> Target<T> {
Expand Down Expand Up @@ -406,10 +421,36 @@ impl<B> router::Recognize<http::Request<B>> for LogicalPerRequest {
}
}

pub fn route<T>(route: profiles::http::Route, target: Logical<T>) -> dst::Route {
pub fn route((route, profile): (profiles::http::Route, Profile)) -> dst::Route {
dst::Route {
route,
target: target.addr,
target: profile.target.addr,
direction: metric_labels::Direction::Out,
}
}

// === impl Profile ===

impl From<(profiles::Receiver, Target<HttpEndpoint>)> for Profile {
fn from((rx, target): (profiles::Receiver, Target<HttpEndpoint>)) -> Self {
Self { rx, target }
}
}

impl AsRef<Addr> for Profile {
fn as_ref(&self) -> &Addr {
&self.target.addr
}
}

impl AsRef<profiles::Receiver> for Profile {
fn as_ref(&self) -> &profiles::Receiver {
&self.rx
}
}

impl From<Profile> for Target<HttpEndpoint> {
fn from(Profile { target, .. }: Profile) -> Self {
target
}
}
12 changes: 4 additions & 8 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use linkerd2_app_core::{
spans::SpanConverter,
svc::{self, NewService},
transport::{self, listen, tls},
Addr, Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
};
use std::{collections::HashMap, net::IpAddr, time::Duration};
Expand Down Expand Up @@ -291,10 +291,7 @@ impl Config {
// processed.
let logical = concrete
// Uses the split-provided target `Addr` to build a concrete target.
.push_map_target(|(addr, l): (Addr, Logical<HttpEndpoint>)| Concrete {
addr,
inner: l.map(|e| e.settings),
})
.push_map_target(Concrete::<http::Settings>::from)
.push(profiles::split::layer())
// Drives concrete stacks to readiness and makes the split
// cloneable, as required by the retry middleware.
Expand All @@ -311,11 +308,10 @@ impl Config {
// Sets the per-route response classifier as a request
// extension.
.push(classify::Layer::new())
.push_map_target(|(r, l): (profiles::http::Route, Logical<HttpEndpoint>)| {
endpoint::route(r, l)
})
.push_map_target(endpoint::route)
.into_inner(),
))
.push_map_target(endpoint::Profile::from)
// Discovers the service profile from the control plane and passes
// it to inner stack to build the router and traffic split.
.push(profiles::discover::layer(profiles_client))
Expand Down
11 changes: 6 additions & 5 deletions linkerd/service-profiles/src/http/route_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ impl<M: Clone, N: Clone, R> Clone for NewRouteRequest<M, N, R> {
}
}

impl<T, M, N> NewService<(Receiver, T)> for NewRouteRequest<M, N, N::Service>
impl<T, M, N> NewService<T> for NewRouteRequest<M, N, N::Service>
where
T: Clone,
M: NewService<(Receiver, T)>,
T: AsRef<Receiver> + Clone,
M: NewService<T>,
N: NewService<(Route, T)> + Clone,
{
type Service = RouteRequest<T, M::Service, N, N::Service>;

fn new_service(&self, (rx, target): (Receiver, T)) -> Self::Service {
let inner = self.inner.new_service((rx.clone(), target.clone()));
fn new_service(&self, target: T) -> Self::Service {
let rx = target.as_ref().clone();
let inner = self.inner.new_service(target.clone());
let default = self
.new_route
.new_service((self.default.clone(), target.clone()));
Expand Down
6 changes: 4 additions & 2 deletions linkerd/service-profiles/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ impl<N: Clone, S, Req> Clone for NewSplit<N, S, Req> {
}
}

impl<T, N: Clone, S, Req> NewService<(Receiver, T)> for NewSplit<N, S, Req>
impl<T, N: Clone, S, Req> NewService<T> for NewSplit<N, S, Req>
where
T: AsRef<Receiver>,
S: tower::Service<Req>,
{
type Service = Split<T, N, S, Req>;

fn new_service(&self, (rx, target): (Receiver, T)) -> Self::Service {
fn new_service(&self, target: T) -> Self::Service {
let rx = target.as_ref().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does add a clone + drop of the receiver that wasn't previously necessary...I don't think this is hot enough to care about an atomic ref bump, though.

Split {
rx,
target,
Expand Down
0