8000 outbound: Return a default endpoint on reject by olix0r · Pull Request #690 · linkerd/linkerd2-proxy · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

outbound: Return a default endpoint on reject #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,6 @@ pub struct ProxyMetrics {
pub transport: transport::Metrics,
}

#[derive(Clone, Debug)]
pub struct DiscoveryRejected(());

impl DiscoveryRejected {
pub fn new() -> Self {
DiscoveryRejected(())
}
}

impl std::fmt::Display for DiscoveryRejected {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "discovery rejected")
}
}

impl std::error::Error for DiscoveryRejected {}

impl From<Addr> for DiscoveryRejected {
fn from(_: Addr) -> Self {
Self::new()
}
}

#[derive(Clone, Debug, Default)]
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);

Expand Down
16 changes: 8 additions & 8 deletions linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ mod transport {
// Connection to the server should be a failure with the EXFULL error
// code.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
// Connection from the client should have closed cleanly.
assert_eventually_contains!(
metrics.get("/metrics").await,
Expand Down Expand Up @@ -963,7 +963,7 @@ mod transport {
// Connection to the server should be a failure with the EXFULL error
// code.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
// Connection from the client should have closed cleanly.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
Expand Down Expand Up @@ -1121,7 +1121,7 @@ mod transport {
tcp_client.write(TcpFixture::HELLO_MSG).await;
assert_eq!(tcp_client.read().await, TcpFixture::BYE_MSG.as_bytes());
let expected = format!(
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} 1",
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} 1",
proxy.outbound_server.as_ref().unwrap().addr,
);
assert_eventually_contains!(metrics.get("/metrics").await, &expected);
Expand Down Expand Up @@ -1184,7 +1184,7 @@ mod transport {
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");

let tcp_client = client.connect().await;

Expand All @@ -1194,14 +1194,14 @@ mod transport {
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");

tcp_client.shutdown().await;
let out = metrics.get("/metrics").await;
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 2");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 2");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 2");
}

#[tokio::test]
Expand All @@ -1217,7 +1217,7 @@ mod transport {
TcpFixture::BYE_MSG.len()
);
let dst_expected = format!(
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
proxy.outbound_server.as_ref().unwrap().addr,
TcpFixture::HELLO_MSG.len()
);
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod transport {
TcpFixture::HELLO_MSG.len()
);
let dst_expected = format!(
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
proxy.outbound_server.as_ref().unwrap().addr,
TcpFixture::BYE_MSG.len()
);
Expand Down
12 changes: 10 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ impl Into<Addr> for &'_ HttpConcrete {
}
}

impl Into<SocketAddr> for &'_ HttpConcrete {
fn into(self) -> SocketAddr {
self.dst
.socket_addr()
.unwrap_or_else(|| self.logical.orig_dst)
}
}

impl std::fmt::Display for HttpConcrete {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.dst.fmt(f)
Expand Down Expand Up @@ -201,7 +209,7 @@ impl From<HttpLogical> for HttpEndpoint {
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(),
),
concrete: logical.into(),
metadata: Metadata::empty(),
metadata: Metadata::default(),
}
}
}
Expand Down Expand Up @@ -331,7 +339,7 @@ impl From<SocketAddr> for TcpEndpoint {
Self {
addr,
dst: addr.into(),
identity: Conditional::None(tls::ReasonForNoPeerName::NotHttp.into()),
identity: Conditional::None(tls::ReasonForNoPeerName::PortSkipped.into()),
labels: None,
}
}
Expand Down
15 changes: 3 additions & 12 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use linkerd2_app_core::{
spans::SpanConverter,
svc::{self},
transport::{self, listen, tls},
Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER,
DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
};
use std::{collections::HashMap, net, time::Duration};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -500,13 +500,6 @@ impl Config {

// Load balances TCP streams that cannot be decoded as HTTP.
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
.push_fallback_with_predicate(
tcp_forward
.clone()
.push_map_target(TcpEndpoint::from)
.into_inner(),
is_discovery_rejected,
)
.push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
Expand Down Expand Up @@ -591,9 +584,7 @@ pub fn trace_labels() -> HashMap<String, String> {

fn is_discovery_rejected(err: &Error) -> bool {
fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool {
err.is::<DiscoveryRejected>()
|| err.is::<profiles::InvalidProfileAddr>()
|| err.source().map(is_rejected).unwrap_or(false)
err.is::<profiles::InvalidProfileAddr>() || err.source().map(is_rejected).unwrap_or(false)
}

let rejected = is_rejected(&**err);
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn plaintext_tcp() {
let resolver = test_support::resolver().endpoint_exists(
logical.clone(),
target_addr,
test_support::resolver::Metadata::empty(),
test_support::resolver::Metadata::default(),
);

// Build the outbound TCP balancer stack.
Expand Down
59 changes: 59 additions & 0 deletions linkerd/app/src/dst/default_resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use super::Rejected;
use futures::{future, prelude::*, stream};
use linkerd2_app_core::{
proxy::core::{Resolve, Update},
svc, Error,
};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub fn layer<S>() -> impl svc::Layer<S, Service = RecoverDefaultResolve<S>> + Clone {
svc::layer::mk(RecoverDefaultResolve)
}

#[derive(Clone, Debug)]
pub struct RecoverDefaultResolve<S>(S);
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it'd be nice to have a comment explaining what this does --- the name is about as descriptive as i can think of, but i can see the purpose of this thing not being immediately obvious to new contributors?


impl<T, S> tower::Service<T> for RecoverDefaultResolve<S>
where
for<'t> &'t T: Into<std::net::SocketAddr>,
S: Resolve<T, Error = Error>,
S::Endpoint: Default + Send + 'static,
S::Resolution: Send + 'static,
S::Future: Send + 'static,
stream::Once<future::Ready<Result<Update<S::Endpoint>, S::Error>>>:
stream::TryStream<Ok = Update<S::Endpoint>, Error = S::Error>,
{
type Response = future::Either<
S::Resolution,
stream::Once<future::Ready<Result<Update<S::Endpoint>, Error>>>,
>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}

fn call(&mut self, t: T) -> Self::Future {
let addr = (&t).into();
Box::pin(
self.0
.resolve(t)
.map_ok(future::Either::Left)
.or_else(move |error| {
if Rejected::matches(&*error) {
tracing::debug!(%error, %addr, "Synthesizing endpoint");
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/TIOLI: if we added addr to this message by instrumenting the future instead of moving it in, we wouldn't need this to be a move closure and could therefore remove the box around the future (since it's just combinators). not a blocker though; and the concrete type is probably pretty gross...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addr is used on the next line (as part of the returend stream), so the move is needed either way. And, really, the addr is already on the context, so we could omit it here, but 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, i overlooked that --- carry on!

let endpoint = (addr, S::Endpoint::default());
let res = stream::once(future::ok(Update::Reset(vec![endpoint])));
future::ok(future::Either::Right(res))
} else {
future::err(error)
}
}),
)
}
}
40 changes: 38 additions & 2 deletions linkerd/app/src/dst/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod default_resolve;
mod permit;
mod resolve;

use self::default_resolve::RecoverDefaultResolve;
use indexmap::IndexSet;
use linkerd2_app_core::{
control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls,
ControlHttpMetrics, Error,
Addr, ControlHttpMetrics, Error,
};
use permit::PermitConfiguredDsts;
use std::time::Duration;
Expand All @@ -21,6 +23,9 @@ pub struct Config {
pub initial_profile_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct Rejected(());
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe worth a comment on the semantics of what this error means?


/// Handles to destination service clients.
///
/// The addr is preserved for logging.
Expand All @@ -30,7 +35,9 @@ pub struct Dst {
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>,
>,
pub resolve: RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
pub resolve: RecoverDefaultResolve<
RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
>,
}

impl Config {
Expand All @@ -48,6 +55,7 @@ impl Config {
self.get_suffixes,
self.get_networks,
)))
.push(default_resolve::layer())
.into_inner();

let profiles = svc::stack(profiles::Client::new(
Expand All @@ -69,3 +77,31 @@ impl Config {
})
}
}

impl std::fmt::Display for Rejected {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "rejected discovery")
}
}

impl std::error::Error for Rejected {}

impl From<Addr> for Rejected {
fn from(_: Addr) -> Self {
Rejected(())
}
}

impl Rejected {
fn matches(err: &(dyn std::error::Error + 'static)) -> bool {
if err.is::<Self>() {
return true;
}

if let Some(status) = err.downcast_ref::<tonic::Status>() {
return status.code() == tonic::Code::InvalidArgument;
}

err.source().map(Self::matches).unwrap_or(false)
}
}
9 changes: 4 additions & 5 deletions linkerd/app/src/dst/permit.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::Rejected;
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{
dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error,
};
use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::sync::Arc;

pub struct PermitConfiguredDsts<E = DiscoveryRejected> {
pub struct PermitConfiguredDsts<E = Rejected> {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
_error: PhantomData<fn(E)>,
Expand Down Expand Up @@ -74,7 +73,7 @@ where
if permitted {
Ok(t)
} else {
Err(E::from(addr.clone()).into())
Err(E::from(addr).into())
}
}
}
15 changes: 7 additions & 8 deletions linkerd/app/src/dst/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use linkerd2_app_core::{
api_resolve as api,
resolve::{self, recover},
},
DiscoveryRejected, Error, Recover,
Error, Recover,
};
use tonic::{
body::{Body, BoxBody},
Expand Down Expand Up @@ -46,16 +46,15 @@ impl From<ExponentialBackoff> for BackoffUnlessInvalidArgument {
impl Recover<Error> for BackoffUnlessInvalidArgument {
type Backoff = ExponentialBackoffStream;

fn recover(&self, err: Error) -> Result<Self::Backoff, Error> {
match err.downcast::<Status>() {
Ok(ref status) if status.code() == Code::InvalidArgument => {
tracing::debug!(message = "cannot recover", %status);
return Err(DiscoveryRejected::new().into());
fn recover(&self, error: Error) -> Result<Self::Backoff, Error> {
if let Some(status) = error.downcast_ref::<Status>() {
if status.code() == Code::InvalidArgument {
tracing::debug!(%status, "Cannot recover");
return Err(error);
}
Ok(status) => tracing::trace!(message = "recovering", %status),
Err(error) => tracing::trace!(message = "recovering", %error),
}

tracing::trace!(%error, "Recovering");
Ok(self.0.stream())
}
}
6 changes: 4 additions & 2 deletions linkerd/proxy/api-resolve/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ pub enum ProtocolHint {

// === impl Metadata ===

impl Metadata {
pub fn empty() -> Self {
impl Default for Metadata {
fn default() -> Self {
Self {
labels: IndexMap::default(),
protocol_hint: ProtocolHint::Unknown,
Expand All @@ -51,7 +51,9 @@ impl Metadata {
authority_override: None,
}
}
}

impl Metadata {
pub fn new(
labels: IndexMap<String, String>,
protocol_hint: ProtocolHint,
Expand Down
Loading
0