8000 Control plane load balancing by olix0r · Pull Request #636 · linkerd/linkerd2-proxy · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Control plane load balancing #636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,9 @@ version = "0.1.0"
name = "linkerd2-dns"
version = "0.1.0"
dependencies = [
"async-stream",
"futures 0.3.5",
"linkerd2-dns-name",
"linkerd2-error",
"linkerd2-stack",
"pin-project",
"tokio",
Expand Down Expand Up @@ -1265,7 +1265,6 @@ dependencies = [
name = "linkerd2-proxy-dns-resolve"
version = "0.1.0"
dependencies = [
"async-stream",
"futures 0.3.5",
"linkerd2-addr",
"linkerd2-dns",
Expand All @@ -1275,6 +1274,7 @@ dependencies = [
"tokio-test",
"tower",
"tracing",
"tracing-futures",
]

[[package]]
Expand Down
141 changes: 27 additions & 114 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ pub struct ControlAddr {
pub identity: crate::transport::tls::PeerIdentity,
}

impl Into<Addr> for ControlAddr {
fn into(self) -> Addr {
self.addr
}
}

impl fmt::Display for ControlAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.addr, f)
Expand Down Expand Up @@ -111,7 +117,7 @@ pub mod add_origin {
}
}

// === impl Resolve ===
// === impl MakeFuture ===

impl<F, B> Future for MakeFuture<F, B>
where
Expand Down Expand Up @@ -145,120 +151,25 @@ pub mod add_origin {
}
}

pub mod dns_resolve {
use super::{client::Target, ControlAddr};
use async_stream::try_stream;
use futures::future::{self, Either};
use futures::prelude::*;
use linkerd2_addr::{Addr, NameAddr};
use linkerd2_error::Error;
use linkerd2_proxy_core::resolve::Update;
use linkerd2_proxy_transport::tls::PeerIdentity;
use pin_project::pin_project;
pub mod resolve {
use crate::{
dns,
proxy::{dns_resolve::DnsResolve, resolve::map_endpoint},
};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

type UpdatesStream = Pin<Box<dyn Stream<Item = Result<Update<Target>, Error>> + Send + Sync>>;

#[derive(Clone)]
pub struct Resolve<S> {
inner: S,
}

#[pin_project]
pub struct MakeFuture<S: tower::Service<NameAddr>> {
#[pin]
inner: S::Future,
identity: PeerIdentity,
}

// === impl Resolve ===

impl<S> Resolve<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}

// should yield the socket address once
// and keep on yielding Pending forever.
pub fn resolve_once_stream(&self, sa: SocketAddr, identity: PeerIdentity) -> UpdatesStream {
Box::pin(try_stream! {
let update = Update::Add(vec![(sa, Target::new(sa, identity))]);
tracing::debug!(?update, "resolved once");
yield update;
future::pending::<Update<Target>>().await;
})
}
}

impl<S> tower::Service<ControlAddr> for Resolve<S>
where
S: tower::Service<NameAddr>,
S::Response: Stream<Item = Result<Update<SocketAddr>, Error>> + Send + Sync + 'static,
S::Error: Into<Error> + Send + Sync,
S::Future: Send + Sync,
{
type Response = UpdatesStream;
type Error = S::Error;
type Future = Either<MakeFuture<S>, future::Ready<Result<Self::Response, S::Error>>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, target: ControlAddr) -> Self::Future {
match target.addr {
Addr::Name(na) if na.is_localhost() => {
let local_sa = format!("127.0.0.1:{}", na.port()).parse().unwrap();
Either::Right(future::ok(
self.resolve_once_stream(local_sa, target.identity),
))
}
Addr::Name(na) => Either::Left(MakeFuture {
inner: self.inner.call(na),
identity: target.identity,
}),
Addr::Socket(sa) => {
Either::Right(future::ok(self.resolve_once_stream(sa, target.identity)))
}
}
}
pub fn new(dns: dns::Resolver) -> map_endpoint::Resolve<IntoTarget, DnsResolve> {
map_endpoint::Resolve::new(IntoTarget(()), DnsResolve::new(dns))
}

// === impl MakeFuture ===
#[derive(Copy, Clone, Debug)]
pub struct IntoTarget(());

impl<S> Future for MakeFuture<S>
where
S: tower::Service<NameAddr>,
S::Response: Stream<Item = Result<Update<SocketAddr>, Error>> + Send + Sync + 'static,
S::Error: Into<Error> + Send + Sync,
S::Future: Send + Sync,
{
type Output = Result<UpdatesStream, S::Error>;
impl map_endpoint::MapEndpoint<super::ControlAddr, ()> for IntoTarget {
type Out = super::client::Target;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let identity = this.identity.clone();
let stream = futures::ready!(this.inner.poll(cx))?;
Poll::Ready(Ok(Box::pin(stream.map_ok(move |update| {
match update {
Update::Add(adds) => Update::Add(
adds.clone()
.into_iter()
.map(|(sa, _)| (sa, Target::new(sa, identity.clone())))
.collect(),
),
Update::Reset(eps) => Update::Reset(
eps.clone()
.into_iter()
.map(|(sa, _)| (sa, Target::new(sa, identity.clone())))
.collect(),
),
Update::Remove(removes) => Update::Remove(removes),
Update::DoesNotExist => Update::DoesNotExist,
}
}))))
fn map_endpoint(&self, control: &super::ControlAddr, addr: SocketAddr, _: ()) -> Self::Out {
super::client::Target::new(addr, control.identity.clone())
}
}
}
Expand All @@ -268,17 +179,19 @@ pub mod client {
use crate::transport::{connect, tls};
use crate::{proxy::http, svc};
use linkerd2_proxy_http::h2::Settings as H2Settings;
use std::net::SocketAddr;
use std::task::{Context, Poll};
use std::{
net::SocketAddr,
task::{Context, Poll},
};

#[derive(Clone, Hash, Debug, Eq, PartialEq)]
pub struct Target {
pub(super) addr: SocketAddr,
pub(super) server_name: tls::PeerIdentity,
addr: SocketAddr,
server_name: tls::PeerIdentity,
}

impl Target {
pub fn new(addr: SocketAddr, server_name: tls::PeerIdentity) -> Self {
pub(super) fn new(addr: SocketAddr, server_name: tls::PeerIdentity) -> Self {
Self { addr, server_name }
}
}
Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/core/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ pub struct Config {

pub struct Dns {
pub resolver: Resolver,
pub task: Task,
}

// === impl Config ===

impl Config {
pub fn build(self) -> Dns {
let (resolver, task) =
let resolver =
Resolver::from_system_config_with(&self).expect("system DNS config must be valid");
Dns { resolver, task }
Dns { resolver }
}
}

Expand Down
3 changes: 1 addition & 2 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
//! - Admin interfaces
//! - Tap
//! - Metric labeling
#![type_length_limit = "1586225"]

#![deny(warnings, rust_2018_idioms)]
#![recursion_limit = "512"]

pub use linkerd2_addr::{self as addr, Addr, NameAddr};
pub use linkerd2_admit as admit;
Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use linkerd2_app_core::{
config::{ControlAddr, ControlConfig},
control, dns,
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
proxy::{discover, dns_resolve::DnsResolve, http},
proxy::{discover, http},
reconnect,
svc::{self, NewService},
transport::tls,
Expand Down Expand Up @@ -53,14 +53,13 @@ impl Config {
let (local, crt_store) = Local::new(&certify);

let addr = control.addr;
let control_dns_resolve = control::dns_resolve::Resolve::new(DnsResolve::new(dns));
let svc = svc::connect(control.connect.keepalive)
.push(tls::ConnectLayer::new(tls::Conditional::Some(
certify.trust_anchors.clone(),
)))
.push_timeout(control.connect.timeout)
.push(control::client::layer())
.push(discover::resolve(control_dns_resolve))
.push(discover::resolve(control::resolve::new(dns)))
.push_on_response(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.push(reconnect::layer(Recover(control.connect.backoff)))
.push(metrics.into_layer::<classify::Response>())
Expand Down
24 changes: 11 additions & 13 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Configures and executes the proxy
#![recursion_limit = "256"]
//#![deny(warnings, rust_2018_idioms)]

#![deny(warnings, rust_2018_idioms)]

pub mod admin;
pub mod dst;
Expand All @@ -17,7 +17,7 @@ use linkerd2_app_core::{
classify,
config::ControlAddr,
control, dns, drain,
proxy::{discover, dns_resolve::DnsResolve, http},
proxy::{discover, http},
reconnect,
svc::{self, NewService},
transport::tls,
Expand Down Expand Up @@ -61,7 +61,6 @@ pub struct Config {
pub struct App {
admin: admin::Admin,
drain: drain::Signal,
dns: dns::Task,
dst: ControlAddr,
identity: identity::Identity,
inbound_addr: SocketAddr,
Expand Down Expand Up @@ -109,13 +108,17 @@ impl Config {

let metrics = metrics.control.clone();
let dns = dns.resolver.clone();
let control_dns_resolve = control::dns_resolve::Resolve::new(DnsResolve::new(dns));
info_span!("dst").in_scope(|| {
let dst_connect = svc::connect(dst.control.connect.keepalive)
// XXX This is unfortunate. But we don't daemonize the service into a
// task in the build, so we'd have to name it. And that's not
// happening today. Really, we should daemonize the whole client
// into a task so consumers can be ignorant. This would also
// probably enable the use of a lock.
let svc = svc::connect(dst.control.connect.keepalive)
.push(tls::ConnectLayer::new(identity.local()))
.push_timeout(dst.control.connect.timeout)
.push(control::client::layer())
.push(discover::resolve(control_dns_resolve))
.push(discover::resolve(control::resolve::new(dns)))
.push_on_response(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.push(reconnect::layer({
let backoff = dst.control.connect.backoff;
Expand All @@ -127,7 +130,7 @@ impl Config {
.push_on_response(svc::layers().push_spawn_buffer(dst.control.buffer_capacity))
.new_service(dst.control.addr.clone());

dst.build(dst_connect)
dst.build(svc)
})
}?;

Expand Down Expand Up @@ -226,7 +229,6 @@ impl Config {
admin,
dst: dst_addr,
drain: drain_tx,
dns: dns.task,
identity,
inbound_addr,
oc_collector,
Expand Down Expand Up @@ -286,7 +288,6 @@ impl App {
let App {
admin,
drain,
dns,
identity,
oc_collector,
start_proxy,
Expand Down Expand Up @@ -343,9 +344,6 @@ impl App {
admin.latch.release()
}

// Spawn the DNS resolver background task.
tokio::spawn(dns.instrument(info_span!("dns")));

if let tap::Tap::Enabled {
registry, serve, ..
} = tap
Expand Down
5 changes: 2 additions & 3 deletions linkerd/app/src/oc_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{dns, identity::LocalIdentity};
use linkerd2_app_core::{
config::{ControlAddr, ControlConfig},
control,
proxy::{discover, dns_resolve::DnsResolve, http},
proxy::{discover, http},
reconnect, svc,
transport::tls,
Error,
Expand Down Expand Up @@ -59,7 +59,6 @@ impl Config {
const EWMA_DECAY: Duration = Duration::from_secs(10);

let addr = control.addr;
let control_dns_resolve = control::dns_resolve::Resolve::new(DnsResolve::new(dns));
let svc = svc::connect(control.connect.keepalive)
.push(tls::ConnectLayer::new(identity))
.push_timeout(control.connect.timeout)
Expand All @@ -68,7 +67,7 @@ impl Config {
// TODO: we should have metrics of some kind, but the standard
// HTTP metrics aren't useful for a client where we never read
// the response.
.push(discover::resolve(control_dns_resolve))
.push(discover::resolve(control::resolve::new(dns)))
.push_on_response(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY))
.push(reconnect::layer({
let backoff = control.connect.backoff;
Expand Down
2 changes: 1 addition & 1 deletion linkerd/dns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ edition = "2018"
publish = false

[dependencies]
async-stream = "0.2.1"
futures = "0.3"
linkerd 5477 2-dns-name = { path = "./name" }
linkerd2-error = { path = "../error" }
linkerd2-stack = { path = "../stack" }
tower = "0.3"
tracing = "0.1.19"
Expand Down
Loading
0