From 8ef420833a2de5ad48b83af5b83b9ad095389172 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 2 Oct 2020 00:27:05 +0000 Subject: [PATCH 1/2] outbound: Return a default endpoint on reject When the resolver rejects resolution, we currently propagate that error so that it can be handled via fallback. And due to recent HTTP router changes, these resolution errors can propagate up across splits, etc. This change simplifies this behavior by isntead synthesizing a resolution with a default endpoint. The `not_http` reason has been removed, as it's no longer useful. --- linkerd/app/core/src/lib.rs | 23 -------- .../app/integration/src/tests/telemetry.rs | 16 ++--- linkerd/app/outbound/src/endpoint.rs | 12 +++- linkerd/app/outbound/src/lib.rs | 15 +---- linkerd/app/src/dst/default_resolve.rs | 59 +++++++++++++++++++ linkerd/app/src/dst/mod.rs | 40 ++++++++++++- linkerd/app/src/dst/permit.rs | 9 ++- linkerd/app/src/dst/resolve.rs | 15 +++-- linkerd/proxy/api-resolve/src/metadata.rs | 6 +- linkerd/proxy/transport/src/tls/mod.rs | 5 -- 10 files changed, 133 insertions(+), 67 deletions(-) create mode 100644 linkerd/app/src/dst/default_resolve.rs diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index ce64d69c62..2709cc1699 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -99,29 +99,6 @@ pub struct ProxyMetrics { pub transport: transport::Metrics, } -#[derive(Clone, Debug)] -pub struct DiscoveryRejected(()); - -impl DiscoveryRejected { - pub fn new() -> Self { - DiscoveryRejected(()) - } -} - -impl std::fmt::Display for DiscoveryRejected { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "discovery rejected") - } -} - -impl std::error::Error for DiscoveryRejected {} - -impl From for DiscoveryRejected { - fn from(_: Addr) -> Self { - Self::new() - } -} - #[derive(Clone, Debug, Default)] pub struct SkipByPort(std::sync::Arc>); diff --git a/linkerd/app/integration/src/tests/telemetry.rs b/linkerd/app/integration/src/tests/telemetry.rs index 4ddd335b4f..7f27c98f7a 100644 --- a/linkerd/app/integration/src/tests/telemetry.rs +++ b/linkerd/app/integration/src/tests/telemetry.rs @@ -932,7 +932,7 @@ mod transport { // Connection to the server should be a failure with the EXFULL error // code. assert_eventually_contains!(metrics.get("/metrics").await, - "tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1"); + "tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1"); // Connection from the client should have closed cleanly. assert_eventually_contains!( metrics.get("/metrics").await, @@ -963,7 +963,7 @@ mod transport { // Connection to the server should be a failure with the EXFULL error // code. assert_eventually_contains!(metrics.get("/metrics").await, - "tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1"); + "tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1"); // Connection from the client should have closed cleanly. assert_eventually_contains!(metrics.get("/metrics").await, "tcp_close_total{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1"); @@ -1121,7 +1121,7 @@ mod transport { tcp_client.write(TcpFixture::HELLO_MSG).await; assert_eq!(tcp_client.read().await, TcpFixture::BYE_MSG.as_bytes()); let expected = format!( - "tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} 1", + "tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} 1", proxy.outbound_server.as_ref().unwrap().addr, ); assert_eventually_contains!(metrics.get("/metrics").await, &expected); @@ -1184,7 +1184,7 @@ mod transport { assert_eventually_contains!(out, "tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1"); assert_eventually_contains!(out, - "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1"); + "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1"); let tcp_client = client.connect().await; @@ -1194,14 +1194,14 @@ mod transport { assert_eventually_contains!(out, "tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1"); assert_eventually_contains!(out, - "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1"); + "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1"); tcp_client.shutdown().await; let out = metrics.get("/metrics").await; assert_eventually_contains!(out, "tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 2"); assert_eventually_contains!(out, - "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 2"); + "tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 2"); } #[tokio::test] @@ -1217,7 +1217,7 @@ mod transport { TcpFixture::BYE_MSG.len() ); let dst_expected = format!( - "tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}", + "tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}", proxy.outbound_server.as_ref().unwrap().addr, TcpFixture::HELLO_MSG.len() ); @@ -1246,7 +1246,7 @@ mod transport { TcpFixture::HELLO_MSG.len() ); let dst_expected = format!( - "tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}", + "tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}", proxy.outbound_server.as_ref().unwrap().addr, TcpFixture::BYE_MSG.len() ); diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index 87beffc6a4..07e84a722a 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -136,6 +136,14 @@ impl Into for &'_ HttpConcrete { } } +impl Into for &'_ HttpConcrete { + fn into(self) -> SocketAddr { + self.dst + .socket_addr() + .unwrap_or_else(|| self.logical.orig_dst) + } +} + impl std::fmt::Display for HttpConcrete { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.dst.fmt(f) @@ -201,7 +209,7 @@ impl From for HttpEndpoint { tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(), ), concrete: logical.into(), - metadata: Metadata::empty(), + metadata: Metadata::default(), } } } @@ -331,7 +339,7 @@ impl From for TcpEndpoint { Self { addr, dst: addr.into(), - identity: Conditional::None(tls::ReasonForNoPeerName::NotHttp.into()), + identity: Conditional::None(tls::ReasonForNoPeerName::PortSkipped.into()), labels: None, } } diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 2d466f22ea..e047dc1752 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -20,8 +20,8 @@ use linkerd2_app_core::{ spans::SpanConverter, svc::{self}, transport::{self, listen, tls}, - Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer, - CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, + Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER, + DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, }; use std::{collections::HashMap, net, time::Duration}; use tokio::sync::mpsc; @@ -500,13 +500,6 @@ impl Config { // Load balances TCP streams that cannot be decoded as HTTP. let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve)) - .push_fallback_with_predicate( - tcp_forward - .clone() - .push_map_target(TcpEndpoint::from) - .into_inner(), - is_discovery_rejected, - ) .push_on_response( svc::layers() .push_failfast(dispatch_timeout) @@ -591,9 +584,7 @@ pub fn trace_labels() -> HashMap { fn is_discovery_rejected(err: &Error) -> bool { fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool { - err.is::() - || err.is::() - || err.source().map(is_rejected).unwrap_or(false) + err.is::() || err.source().map(is_rejected).unwrap_or(false) } let rejected = is_rejected(&**err); diff --git a/linkerd/app/src/dst/default_resolve.rs b/linkerd/app/src/dst/default_resolve.rs new file mode 100644 index 0000000000..61ac3a5acd --- /dev/null +++ b/linkerd/app/src/dst/default_resolve.rs @@ -0,0 +1,59 @@ +use super::Rejected; +use futures::{future, prelude::*, stream}; +use linkerd2_app_core::{ + proxy::core::{Resolve, Update}, + svc, Error, +}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +pub fn layer() -> impl svc::Layer> + Clone { + svc::layer::mk(RecoverDefaultResolve) +} + +#[derive(Clone, Debug)] +pub struct RecoverDefaultResolve(S); + +impl tower::Service for RecoverDefaultResolve +where + for<'t> &'t T: Into, + S: Resolve, + S::Endpoint: Default + Send + 'static, + S::Resolution: Send + 'static, + S::Future: Send + 'static, + stream::Once, S::Error>>>: + stream::TryStream, Error = S::Error>, +{ + type Response = future::Either< + S::Resolution, + stream::Once, Error>>>, + >; + type Error = Error; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, t: T) -> Self::Future { + let addr = (&t).into(); + Box::pin( + self.0 + .resolve(t) + .map_ok(future::Either::Left) + .or_else(move |error| { + if Rejected::matches(&*error) { + tracing::debug!(%error, %addr, "Synthesizing endpoint"); + let endpoint = (addr, S::Endpoint::default()); + let res = stream::once(future::ok(Update::Reset(vec![endpoint]))); + future::ok(future::Either::Right(res)) + } else { + future::err(error) + } + }), + ) + } +} diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 5b1802c53d..92b449b36b 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -1,10 +1,12 @@ +mod default_resolve; mod permit; mod resolve; +use self::default_resolve::RecoverDefaultResolve; use indexmap::IndexSet; use linkerd2_app_core::{ control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, - ControlHttpMetrics, Error, + Addr, ControlHttpMetrics, Error, }; use permit::PermitConfiguredDsts; use std::time::Duration; @@ -21,6 +23,9 @@ pub struct Config { pub initial_profile_timeout: Duration, } +#[derive(Clone, Debug)] +pub struct Rejected(()); + /// Handles to destination service clients. /// /// The addr is preserved for logging. @@ -30,7 +35,9 @@ pub struct Dst { PermitConfiguredDsts, profiles::Client, resolve::BackoffUnlessInvalidArgument>, >, - pub resolve: RequestFilter>>, + pub resolve: RecoverDefaultResolve< + RequestFilter>>, + >, } impl Config { @@ -48,6 +55,7 @@ impl Config { self.get_suffixes, self.get_networks, ))) + .push(default_resolve::layer()) .into_inner(); let profiles = svc::stack(profiles::Client::new( @@ -69,3 +77,31 @@ impl Config { }) } } + +impl std::fmt::Display for Rejected { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "rejected discovery") + } +} + +impl std::error::Error for Rejected {} + +impl From for Rejected { + fn from(_: Addr) -> Self { + Rejected(()) + } +} + +impl Rejected { + fn matches(err: &(dyn std::error::Error + 'static)) -> bool { + if err.is::() { + return true; + } + + if let Some(status) = err.downcast_ref::() { + return status.code() == tonic::Code::InvalidArgument; + } + + err.source().map(Self::matches).unwrap_or(false) + } +} diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs index cc33caad33..bf05c17249 100644 --- a/linkerd/app/src/dst/permit.rs +++ b/linkerd/app/src/dst/permit.rs @@ -1,12 +1,11 @@ +use super::Rejected; use ipnet::{Contains, IpNet}; -use linkerd2_app_core::{ - dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error, -}; +use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error}; use std::marker::PhantomData; use std::net::IpAddr; use std::sync::Arc; -pub struct PermitConfiguredDsts { +pub struct PermitConfiguredDsts { name_suffixes: Arc>, networks: Arc>, _error: PhantomData, @@ -74,7 +73,7 @@ where if permitted { Ok(t) } else { - Err(E::from(addr.clone()).into()) + Err(E::from(addr).into()) } } } diff --git a/linkerd/app/src/dst/resolve.rs b/linkerd/app/src/dst/resolve.rs index 3688e6d537..2155dde771 100644 --- a/linkerd/app/src/dst/resolve.rs +++ b/linkerd/app/src/dst/resolve.rs @@ -6,7 +6,7 @@ use linkerd2_app_core::{ api_resolve as api, resolve::{self, recover}, }, - DiscoveryRejected, Error, Recover, + Error, Recover, }; use tonic::{ body::{Body, BoxBody}, @@ -46,16 +46,15 @@ impl From for BackoffUnlessInvalidArgument { impl Recover for BackoffUnlessInvalidArgument { type Backoff = ExponentialBackoffStream; - fn recover(&self, err: Error) -> Result { - match err.downcast::() { - Ok(ref status) if status.code() == Code::InvalidArgument => { - tracing::debug!(message = "cannot recover", %status); - return Err(DiscoveryRejected::new().into()); + fn recover(&self, error: Error) -> Result { + if let Some(status) = error.downcast_ref::() { + if status.code() == Code::InvalidArgument { + tracing::debug!(%status, "Cannot recover"); + return Err(error); } - Ok(status) => tracing::trace!(message = "recovering", %status), - Err(error) => tracing::trace!(message = "recovering", %error), } + tracing::trace!(%error, "Recovering"); Ok(self.0.stream()) } } diff --git a/linkerd/proxy/api-resolve/src/metadata.rs b/linkerd/proxy/api-resolve/src/metadata.rs index 7e6e37a6a4..b183ca2000 100644 --- a/linkerd/proxy/api-resolve/src/metadata.rs +++ b/linkerd/proxy/api-resolve/src/metadata.rs @@ -41,8 +41,8 @@ pub enum ProtocolHint { // === impl Metadata === -impl Metadata { - pub fn empty() -> Self { +impl Default for Metadata { + fn default() -> Self { Self { labels: IndexMap::default(), protocol_hint: ProtocolHint::Unknown, @@ -51,7 +51,9 @@ impl Metadata { authority_override: None, } } +} +impl Metadata { pub fn new( labels: IndexMap, protocol_hint: ProtocolHint, diff --git a/linkerd/proxy/transport/src/tls/mod.rs b/linkerd/proxy/transport/src/tls/mod.rs index 72c0f3de82..1e752cbdb4 100755 --- a/linkerd/proxy/transport/src/tls/mod.rs +++ b/linkerd/proxy/transport/src/tls/mod.rs @@ -21,10 +21,6 @@ pub trait HasPeerIdentity { #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum ReasonForNoPeerName { - /// The connection is a non-HTTP connection so we don't know anything - /// about the destination besides its address. - NotHttp, - /// The destination service didn't give us the identity, which is its way /// of telling us that we shouldn't do TLS for this endpoint. NotProvidedByServiceDiscovery, @@ -51,7 +47,6 @@ impl fmt::Display for ReasonForNoPeerName { match self { ReasonForNoPeerName::LocalIdentityDisabled => write!(f, "disabled"), ReasonForNoPeerName::Loopback => write!(f, "loopback"), - ReasonForNoPeerName::NotHttp => write!(f, "not_http"), ReasonForNoPeerName::PortSkipped => write!(f, "port_skipped"), ReasonForNoPeerName::NoTlsFromRemote => write!(f, "no_tls_from_remote"), ReasonForNoPeerName::NoPeerIdFromRemote => write!(f, "no_peer_id_from_remote"), From 6461d9ed38b55057a81a9a5071973ade645a2796 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 2 Oct 2020 02:09:36 +0000 Subject: [PATCH 2/2] fixup test --- linkerd/app/outbound/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/app/outbound/src/tests.rs b/linkerd/app/outbound/src/tests.rs index 5ae4bce307..d7a67c4a18 100644 --- a/linkerd/app/outbound/src/tests.rs +++ b/linkerd/app/outbound/src/tests.rs @@ -67,7 +67,7 @@ async fn plaintext_tcp() { let resolver = test_support::resolver().endpoint_exists( logical.clone(), target_addr, - test_support::resolver::Metadata::empty(), + test_support::resolver::Metadata::default(), ); // Build the outbound TCP balancer stack.