-
Notifications
You must be signed in to change notification settings - Fork 273
outbound: Return a default endpoint on reject #690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
use super::Rejected; | ||
use futures::{future, prelude::*, stream}; | ||
use linkerd2_app_core::{ | ||
proxy::core::{Resolve, Update}, | ||
svc, Error, | ||
}; | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
pub fn layer<S>() -> impl svc::Layer<S, Service = RecoverDefaultResolve<S>> + Clone { | ||
svc::layer::mk(RecoverDefaultResolve) | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct RecoverDefaultResolve<S>(S); | ||
Comment on lines
+17
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it'd be nice to have a comment explaining what this does --- the name is about as descriptive as i can think of, but i can see the purpose of this thing not being immediately obvious to new contributors? |
||
|
||
impl<T, S> tower::Service<T> for RecoverDefaultResolve<S> | ||
where | ||
for<'t> &'t T: Into<std::net::SocketAddr>, | ||
S: Resolve<T, Error = Error>, | ||
S::Endpoint: Default + Send + 'static, | ||
S::Resolution: Send + 'static, | ||
S::Future: Send + 'static, | ||
stream::Once<future::Ready<Result<Update<S::Endpoint>, S::Error>>>: | ||
stream::TryStream<Ok = Update<S::Endpoint>, Error = S::Error>, | ||
{ | ||
type Response = future::Either< | ||
S::Resolution, | ||
stream::Once<future::Ready<Result<Update<S::Endpoint>, Error>>>, | ||
>; | ||
type Error = Error; | ||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>; | ||
|
||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
self.0.poll_ready(cx) | ||
} | ||
|
||
fn call(&mut self, t: T) -> Self::Future { | ||
let addr = (&t).into(); | ||
Box::pin( | ||
self.0 | ||
.resolve(t) | ||
.map_ok(future::Either::Left) | ||
.or_else(move |error| { | ||
if Rejected::matches(&*error) { | ||
tracing::debug!(%error, %addr, "Synthesizing endpoint"); | ||
Comment on lines
+47
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit/TIOLI: if we added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whoops, i overlooked that --- carry on! |
||
let endpoint = (addr, S::Endpoint::default()); | ||
let res = stream::once(future::ok(Update::Reset(vec![endpoint]))); | ||
future::ok(future::Either::Right(res)) | ||
} else { | ||
future::err(error) | ||
} | ||
}), | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
mod default_resolve; | ||
mod permit; | ||
mod resolve; | ||
|
||
use self::default_resolve::RecoverDefaultResolve; | ||
use indexmap::IndexSet; | ||
use linkerd2_app_core::{ | ||
control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, | ||
ControlHttpMetrics, Error, | ||
Addr, ControlHttpMetrics, Error, | ||
}; | ||
use permit::PermitConfiguredDsts; | ||
use std::time::Duration; | ||
|
@@ -21,6 +23,9 @@ pub struct Config { | |
pub initial_profile_timeout: Duration, | ||
} | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct Rejected(()); | ||
Comment on lines
+26
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe worth a comment on the semantics of what this error means? |
||
|
||
/// Handles to destination service clients. | ||
/// | ||
/// The addr is preserved for logging. | ||
|
@@ -30,7 +35,9 @@ pub struct Dst { | |
PermitConfiguredDsts<profiles::InvalidProfileAddr>, | ||
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>, | ||
>, | ||
pub resolve: RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>, | ||
pub resolve: RecoverDefaultResolve< | ||
RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>, | ||
>, | ||
} | ||
|
||
impl Config { | ||
|
@@ -48,6 +55,7 @@ impl Config { | |
self.get_suffixes, | ||
self.get_networks, | ||
))) | ||
.push(default_resolve::layer()) | ||
.into_inner(); | ||
|
||
let profiles = svc::stack(profiles::Client::new( | ||
|
@@ -69,3 +77,31 @@ impl Config { | |
}) | ||
} | ||
} | ||
|
||
impl std::fmt::Display for Rejected { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "rejected discovery") | ||
} | ||
} | ||
|
||
impl std::error::Error for Rejected {} | ||
|
||
impl From<Addr> for Rejected { | ||
fn from(_: Addr) -> Self { | ||
Rejected(()) | ||
} | ||
} | ||
|
||
impl Rejected { | ||
fn matches(err: &(dyn std::error::Error + 'static)) -> bool { | ||
if err.is::<Self>() { | ||
return true; | ||
} | ||
|
||
if let Some(status) = err.downcast_ref::<tonic::Status>() { | ||
return status.code() == tonic::Code::InvalidArgument; | ||
} | ||
|
||
err.source().map(Self::matches).unwrap_or(false) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.