use bytes::Bytes;
use cache::{Cache, CacheReader};
use enum_map::EnumMap;
use futures::{FutureExt, TryStreamExt};
use junction_api::{backend::BackendId, http::Route, Hostname, Service};
use std::{
borrow::Cow, collections::BTreeSet, future::Future, io::ErrorKind, sync::Arc, time::Duration,
};
use tokio::sync::mpsc::{self, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Endpoint, Streaming};
use tracing::debug;
use xds_api::pb::{
envoy::{
config::core::v3 as xds_core,
service::discovery::v3::{
aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
DeltaDiscoveryRequest, DeltaDiscoveryResponse,
},
},
google::{protobuf, rpc::Status as GrpcStatus},
};
mod cache;
mod resources;
pub use resources::ResourceVersion;
pub(crate) use resources::{ResourceType, ResourceVec};
use crate::{dns::StdlibResolver, BackendLb, ConfigCache};
mod csds;
#[cfg(test)]
mod test;
#[derive(Debug, Default, Clone)]
pub struct XdsConfig {
pub name: String,
pub type_url: String,
pub version: Option<ResourceVersion>,
pub xds: Option<protobuf::Any>,
pub last_error: Option<(ResourceVersion, String)>,
}
#[derive(Debug)]
enum SubscriptionUpdate {
AddHosts(Vec<String>),
AddBackends(Vec<BackendId>),
AddEndpoints(Vec<BackendId>),
#[allow(unused)]
RemoveHosts(Vec<String>),
#[allow(unused)]
RemoveBackends(Vec<BackendId>),
#[allow(unused)]
RemoveEndpoints(Vec<BackendId>),
}
#[derive(Clone)]
pub(super) struct AdsClient {
subs: mpsc::Sender<SubscriptionUpdate>,
cache: CacheReader,
dns: StdlibResolver,
}
impl AdsClient {
pub(super) fn build(
address: impl Into<Bytes>,
node_id: String,
cluster: String,
) -> Result<(AdsClient, AdsTask), tonic::transport::Error> {
let endpoint = Endpoint::from_shared(address)?
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let node_info = xds_core::Node {
id: node_id,
cluster,
client_features: vec![
"envoy.lb.does_not_support_overprovisioning".to_string(),
"envoy.lrs.supports_send_all_clusters".to_string(),
],
..Default::default()
};
let (sub_tx, sub_rx) = mpsc::channel(10);
let cache = Cache::default();
let dns = StdlibResolver::new_with(Duration::from_secs(5), Duration::from_millis(500), 2);
let client = AdsClient {
subs: sub_tx,
cache: cache.reader(),
dns: dns.clone(),
};
let task = AdsTask {
endpoint,
initial_channel: None,
node_info,
cache,
dns,
subs: sub_rx,
};
Ok((client, task))
}
pub(super) fn csds_server(
&self,
port: u16,
) -> impl Future<Output = Result<(), tonic::transport::Error>> + Send + 'static {
csds::local_server(self.cache.clone(), port)
}
pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
self.cache.iter_routes()
}
pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
self.cache.iter_backends()
}
pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
self.cache.iter_xds()
}
}
impl ConfigCache for AdsClient {
async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
let hosts = vec![host.as_ref().to_string()];
let _ = self.subs.send(SubscriptionUpdate::AddHosts(hosts)).await;
self.cache.get_route(host).await
}
async fn get_backend(
&self,
backend: &junction_api::backend::BackendId,
) -> Option<std::sync::Arc<crate::BackendLb>> {
let bs = vec![backend.clone()];
let _ = self.subs.send(SubscriptionUpdate::AddBackends(bs)).await;
self.cache.get_backend(backend).await
}
async fn get_endpoints(
&self,
backend: &junction_api::backend::BackendId,
) -> Option<std::sync::Arc<crate::EndpointGroup>> {
let bs = vec![backend.clone()];
let _ = self.subs.send(SubscriptionUpdate::AddEndpoints(bs)).await;
match &backend.service {
junction_api::Service::Dns(dns) => {
self.dns
.get_endpoints_await(&dns.hostname, backend.port)
.await
}
_ => self.cache.get_endpoints(backend).await,
}
}
}
pub(crate) struct AdsTask {
endpoint: tonic::transport::Endpoint,
initial_channel: Option<tonic::transport::Channel>,
node_info: xds_core::Node,
cache: Cache,
dns: StdlibResolver,
subs: mpsc::Receiver<SubscriptionUpdate>,
}
#[derive(Debug, thiserror::Error)]
struct ShutdownError;
impl std::fmt::Display for ShutdownError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AdsTask started after shutdown")
}
}
macro_rules! log_request {
($request:expr) => {
tracing::debug!(
nack = $request.error_detail.is_some(),
"DeltaDiscoveryRequest(n={:?}, ty={:?}, r={:?}, init={:?})",
$request.response_nonce,
$request.type_url,
$request.resource_names_subscribe,
$request.initial_resource_versions,
);
};
}
macro_rules! log_response {
($response:expr) => {
if tracing::enabled!(tracing::Level::DEBUG) {
let names_and_versions = names_and_versions(&$response);
tracing::debug!(
"DeltaDiscoveryResponse(n={:?}, ty={:?}, r={:?})",
$response.nonce,
$response.type_url,
names_and_versions,
);
}
};
}
fn names_and_versions(response: &DeltaDiscoveryResponse) -> Vec<(String, String)> {
response
.resources
.iter()
.map(|r| (r.name.clone(), r.version.clone()))
.collect()
}
impl AdsTask {
pub(super) fn is_shutdown(&self) -> bool {
self.subs.is_closed()
}
pub(super) async fn run(&mut self) -> Result<(), &(dyn std::error::Error + 'static)> {
if self.is_shutdown() {
return Err(&ShutdownError);
}
loop {
match self.run_connection().await {
Ok(()) => break,
Err(ConnectionError::AdsDisconnected) => continue,
Err(ConnectionError::Connect(e)) => {
debug!(err = %e, "failed to connect to ADS server");
tokio::time::sleep(Duration::from_secs(2)).await;
}
Err(ConnectionError::Status(status)) => {
let is_broken_pipe =
unwrap_io_error(&status).is_some_and(|e| e.kind() == ErrorKind::BrokenPipe);
if !is_broken_pipe {
debug!(err = %status, "ADS connection closed unexpectedly");
}
tokio::time::sleep(if is_broken_pipe {
Duration::from_millis(50)
} else {
Duration::from_secs(2)
})
.await;
}
};
}
Ok(())
}
async fn run_connection(&mut self) -> Result<(), ConnectionError> {
let (xds_tx, xds_rx) = tokio::sync::mpsc::channel(10);
let channel = self.new_connection().await?;
let mut client = AggregatedDiscoveryServiceClient::new(channel);
let stream_response = client
.delta_aggregated_resources(ReceiverStream::new(xds_rx))
.await?;
let mut incoming = stream_response.into_inner();
self.dns.set_names(self.cache.dns_names());
let (mut conn, initial_requests) =
AdsConnection::new(self.node_info.clone(), &mut self.cache);
for msg in initial_requests {
log_request!(msg);
if xds_tx.send(msg).await.is_err() {
return Err(ConnectionError::AdsDisconnected);
}
}
loop {
let is_eof = handle_update_batch(&mut conn, &mut self.subs, &mut incoming).await?;
if is_eof {
return Ok(());
}
let (outgoing, dns_updates) = conn.outgoing();
for msg in outgoing {
log_request!(msg);
if xds_tx.send(msg).await.is_err() {
return Err(ConnectionError::AdsDisconnected);
}
}
update_dns(&self.dns, dns_updates.add, dns_updates.remove);
}
}
pub(super) async fn connect(&mut self) -> Result<(), tonic::transport::Error> {
if self.initial_channel.is_none() {
let channel = self.endpoint.connect().await?;
self.initial_channel = Some(channel)
}
Ok(())
}
async fn new_connection(
&mut self,
) -> Result<tonic::transport::Channel, tonic::transport::Error> {
match self.initial_channel.take() {
Some(channel) => Ok(channel),
None => self.endpoint.connect().await,
}
}
}
async fn handle_update_batch(
conn: &mut AdsConnection<'_>,
subs: &mut Receiver<SubscriptionUpdate>,
incoming: &mut Streaming<DeltaDiscoveryResponse>,
) -> Result<bool, ConnectionError> {
async fn next_update(
conn: &mut AdsConnection<'_>,
subs: &mut Receiver<SubscriptionUpdate>,
incoming: &mut Streaming<DeltaDiscoveryResponse>,
) -> Result<bool, ConnectionError> {
tokio::select! {
biased;
xds_msg = incoming.try_next() => {
let response = match xds_msg? {
Some(response) => response,
None => return Err(ConnectionError::AdsDisconnected),
};
log_response!(response);
tracing::trace!("ads connection: handle_ads_message");
conn.handle_ads_message(response);
}
sub_update = subs.recv() => {
let Some(sub_update) = sub_update else {
return Ok(true)
};
tracing::trace!(
?sub_update,
"ads connection: handle_subscription_update",
);
conn.handle_subscription_update(sub_update);
}
}
Ok(false)
}
if next_update(conn, subs, incoming).await? {
return Ok(true);
}
loop {
let Some(should_exit) = next_update(conn, subs, incoming).now_or_never() else {
break;
};
if should_exit? {
return Ok(true);
}
}
Ok(false)
}
#[inline]
fn update_dns(
dns: &StdlibResolver,
add: BTreeSet<(Hostname, u16)>,
remove: BTreeSet<(Hostname, u16)>,
) {
for (name, port) in add {
dns.subscribe(name, port);
}
for (name, port) in remove {
dns.unsubscribe(&name, port);
}
}
#[derive(Debug, thiserror::Error)]
enum ConnectionError {
#[error(transparent)]
Connect(#[from] tonic::transport::Error),
#[error(transparent)]
Status(#[from] tonic::Status),
#[error("ADS server closed the stream")]
AdsDisconnected,
}
fn unwrap_io_error(status: &tonic::Status) -> Option<&std::io::Error> {
let mut err: &(dyn std::error::Error + 'static) = status;
loop {
if let Some(e) = err.downcast_ref::<std::io::Error>() {
return Some(e);
}
if let Some(e) = err.downcast_ref::<h2::Error>().and_then(|e| e.get_io()) {
return Some(e);
}
err = err.source()?;
}
}
struct AdsConnection<'a> {
cache: &'a mut Cache,
node: Option<xds_core::Node>,
acks: EnumMap<ResourceType, Option<AckState>>,
unknown_types: Vec<(String, String)>,
}
#[derive(Debug, Default)]
struct AckState {
nonce: String,
error: Option<Cow<'static, str>>,
}
impl AckState {
fn into_ack(self) -> (String, Option<GrpcStatus>) {
let nonce = self.nonce;
let error = self.error.map(|message| GrpcStatus {
message: message.to_string(),
code: tonic::Code::InvalidArgument.into(),
..Default::default()
});
(nonce, error)
}
}
impl<'a> AdsConnection<'a> {
fn new(node: xds_core::Node, cache: &'a mut Cache) -> (Self, Vec<DeltaDiscoveryRequest>) {
let mut requests = Vec::with_capacity(ResourceType::all().len());
let mut node = Some(node);
for &rtype in ResourceType::all() {
let initial_versions = cache.versions(rtype);
let mut subscribe = cache.initial_subscriptions(rtype);
if cache.is_wildcard(rtype) && !subscribe.is_empty() {
subscribe.push("*".to_string());
}
if !cache.is_wildcard(rtype) && subscribe.is_empty() && initial_versions.is_empty() {
continue;
}
requests.push(DeltaDiscoveryRequest {
node: node.take(),
type_url: rtype.type_url().to_string(),
resource_names_subscribe: subscribe,
initial_resource_versions: initial_versions,
..Default::default()
});
}
let conn = Self {
cache,
node,
acks: Default::default(),
unknown_types: Vec::new(),
};
(conn, requests)
}
fn outgoing(&mut self) -> (Vec<DeltaDiscoveryRequest>, DnsUpdates) {
let mut responses = Vec::with_capacity(ResourceType::all().len());
for (response_nonce, type_url) in std::mem::take(&mut self.unknown_types) {
let error_detail = Some(xds_api::pb::google::rpc::Status {
code: tonic::Code::InvalidArgument.into(),
message: "unknown type".to_string(),
..Default::default()
});
responses.push(DeltaDiscoveryRequest {
type_url,
response_nonce,
error_detail,
..Default::default()
})
}
let (resources, dns) = self.cache.collect();
for (rtype, changes) in resources {
let ack = self.get_ack(rtype);
if ack.is_none() && changes.is_empty() {
continue;
}
let node = self.node.take();
let (response_nonce, error_detail) = ack.map(|a| a.into_ack()).unwrap_or_default();
let resource_names_subscribe = changes.added.into_iter().collect();
let resource_names_unsubscribe = changes.removed.into_iter().collect();
responses.push(DeltaDiscoveryRequest {
node,
type_url: rtype.type_url().to_string(),
response_nonce,
error_detail,
resource_names_subscribe,
resource_names_unsubscribe,
..Default::default()
})
}
(responses, dns)
}
fn handle_ads_message(&mut self, resp: DeltaDiscoveryResponse) {
let Some(rtype) = ResourceType::from_type_url(&resp.type_url) else {
tracing::trace!(type_url = %resp.type_url, "unknown type url");
self.set_unknown(resp.nonce, resp.type_url);
return;
};
let resources = match ResourceVec::from_resources(rtype, resp.resources) {
Ok(r) => r,
Err(e) => {
tracing::trace!(err = %e, "invalid proto");
self.set_ack(
rtype,
resp.nonce,
Some(format!("invalid resource: {e}").into()),
);
return;
}
};
let resource_errors = self.cache.insert(resources);
let error = match &resource_errors[..] {
&[] => None,
_ => Some("invalid resources".into()),
};
self.set_ack(rtype, resp.nonce, error);
self.cache.remove(rtype, &resp.removed_resources);
}
fn handle_subscription_update(&mut self, update: SubscriptionUpdate) {
match update {
SubscriptionUpdate::AddHosts(hosts) => {
for host in hosts {
self.cache.subscribe(ResourceType::Listener, &host);
}
}
SubscriptionUpdate::RemoveHosts(hosts) => {
for host in hosts {
self.cache.unsubscribe(ResourceType::Listener, &host);
}
}
SubscriptionUpdate::AddBackends(backends) => {
for backend in backends {
if let Service::Dns(dns) = &backend.service {
self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
}
self.cache.subscribe(ResourceType::Cluster, &backend.name());
}
}
SubscriptionUpdate::RemoveBackends(backends) => {
for backend in backends {
if let Service::Dns(dns) = &backend.service {
self.cache
.unsubscribe_dns(dns.hostname.clone(), backend.port);
}
self.cache
.unsubscribe(ResourceType::Cluster, &backend.name());
}
}
SubscriptionUpdate::AddEndpoints(backends) => {
for backend in backends {
match &backend.service {
Service::Dns(dns) => {
self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
}
_ => self
.cache
.subscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
}
}
}
SubscriptionUpdate::RemoveEndpoints(backends) => {
for backend in backends {
match &backend.service {
Service::Dns(dns) => {
self.cache
.unsubscribe_dns(dns.hostname.clone(), backend.port);
}
_ => self
.cache
.unsubscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
}
}
}
}
}
fn set_unknown(&mut self, nonce: String, type_url: String) {
self.unknown_types.push((nonce, type_url))
}
fn set_ack(&mut self, rtype: ResourceType, nonce: String, error: Option<Cow<'static, str>>) {
self.acks[rtype] = Some(AckState { nonce, error })
}
fn get_ack(&mut self, rtype: ResourceType) -> Option<AckState> {
self.acks[rtype].take()
}
}
#[derive(Debug, Default, PartialEq, Eq)]
struct DnsUpdates {
add: BTreeSet<(Hostname, u16)>,
remove: BTreeSet<(Hostname, u16)>,
sync: bool,
}
#[cfg(test)]
impl DnsUpdates {
fn is_noop(&self) -> bool {
self.add.is_empty() && self.remove.is_empty() && !self.sync
}
}
#[cfg(test)]
mod test_ads_conn {
use cache::Cache;
use once_cell::sync::Lazy;
use pretty_assertions::assert_eq;
use xds_api::pb::envoy::service::discovery::v3 as xds_discovery;
use super::test as xds_test;
use super::*;
static TEST_NODE: Lazy<xds_core::Node> = Lazy::new(|| xds_core::Node {
id: "unit-test".to_string(),
..Default::default()
});
#[track_caller]
fn new_conn(cache: &mut Cache) -> (AdsConnection, Vec<DeltaDiscoveryRequest>) {
let (conn, mut outgoing) = AdsConnection::new(TEST_NODE.clone(), cache);
if let Some(first) = outgoing.first_mut() {
let node = first
.node
.take()
.expect("expected first outgoing request to have a node");
assert_eq!(node, *TEST_NODE);
};
(conn, outgoing)
}
#[test]
fn test_init_empty_wildcard() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Listener, true);
cache.set_wildcard(ResourceType::Cluster, true);
let (_, outgoing) = new_conn(&mut cache);
assert_eq!(
outgoing,
vec![
xds_test::req!(t = ResourceType::Cluster),
xds_test::req!(t = ResourceType::Listener),
]
)
}
#[test]
fn test_init_empty_explicit() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Listener, false);
cache.set_wildcard(ResourceType::Cluster, false);
let (_, outgoing) = new_conn(&mut cache);
assert!(outgoing.is_empty());
}
#[test]
fn test_init_subscription_wildcard() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Listener, false);
cache.set_wildcard(ResourceType::Cluster, true);
cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
let (_, outgoing) = new_conn(&mut cache);
assert_eq!(
outgoing,
vec![
xds_test::req!(
t = ResourceType::Cluster,
add = vec!["cluster.example:7891", "*"],
init = vec![],
),
xds_test::req!(
t = ResourceType::ClusterLoadAssignment,
add = vec!["cluster.example:7891",],
init = vec![],
)
]
);
}
#[test]
fn test_init_subscription_explicit() {
let mut cache = Cache::default();
cache.set_wildcard(ResourceType::Listener, false);
cache.set_wildcard(ResourceType::Cluster, false);
cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
let (_, outgoing) = new_conn(&mut cache);
assert_eq!(
outgoing,
vec![
xds_test::req!(
t = ResourceType::Cluster,
add = vec!["cluster.example:7891",],
init = vec![],
),
xds_test::req!(
t = ResourceType::ClusterLoadAssignment,
add = vec!["cluster.example:7891",],
init = vec![],
),
]
);
}
#[test]
fn test_init_initial_versions() {
let mut cache = Cache::default();
assert!(cache.is_wildcard(ResourceType::Listener));
assert!(!cache.is_wildcard(ResourceType::RouteConfiguration));
cache.insert(ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!("cooler.example.org", "cool-route")],
));
cache.insert(ResourceVec::from_listeners(
"456".into(),
vec![xds_test::listener!("warmer.example.org", "warm-route")],
));
cache.insert(ResourceVec::from_route_configs(
"789".into(),
vec![xds_test::route_config!(
"cool-route",
vec![xds_test::vhost!(
"an-vhost",
["cooler.example.org"],
[xds_test::route!(default "cooler.example.internal:8008")]
)]
)],
));
let (_, outgoing) = new_conn(&mut cache);
assert_eq!(
outgoing,
vec![
xds_test::req!(
t = ResourceType::Cluster,
add = vec!["cooler.example.internal:8008", "*"],
init = vec![],
),
xds_test::req!(
t = ResourceType::Listener,
add = vec![],
init = vec![("cooler.example.org", "123"), ("warmer.example.org", "456"),]
),
xds_test::req!(
t = ResourceType::RouteConfiguration,
add = vec!["warm-route"],
init = vec![("cool-route", "789")]
),
],
);
}
#[test]
fn test_handle_subscribe_hostname() {
let mut cache = Cache::default();
let (mut conn, _) = new_conn(&mut cache);
conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![
Service::dns("website.internal").unwrap().name(),
Service::kube("default", "nginx")
.unwrap()
.as_backend_id(4443)
.name(),
]));
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![xds_test::req!(
t = ResourceType::Listener,
add = vec!["nginx.default.svc.cluster.local:4443", "website.internal"],
)]
);
}
#[test]
fn test_handle_subscribe_backend() {
let mut cache = Cache::default();
let (mut conn, _) = new_conn(&mut cache);
conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![
Service::dns("website.internal").unwrap().as_backend_id(80),
Service::kube("default", "nginx")
.unwrap()
.as_backend_id(4443),
]));
let (outgoing, dns) = conn.outgoing();
assert_eq!(
dns,
DnsUpdates {
add: [(Hostname::from_static("website.internal"), 80)]
.into_iter()
.collect(),
..Default::default()
}
);
assert_eq!(
outgoing,
vec![xds_test::req!(
t = ResourceType::Cluster,
add = vec![
"nginx.default.svc.cluster.local:4443",
"website.internal:80"
],
)]
);
}
#[test]
fn test_handle_ads_message_listener_route() {
let mut cache = Cache::default();
assert!(cache.is_wildcard(ResourceType::Listener));
let (mut conn, _) = new_conn(&mut cache);
conn.handle_ads_message(xds_test::resp!(
n = "1",
add = ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!("cooler.example.org", "cool-route")],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "2",
add = ResourceVec::from_listeners(
"456".into(),
vec![xds_test::listener!("warmer.example.org", "warm-route")],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "3",
add = ResourceVec::from_route_configs(
"789".into(),
vec![xds_test::route_config!(
"cool-route",
vec![xds_test::vhost!(
"an-vhost",
["cooler.example.org"],
[xds_test::route!(default "cooler.example.internal:8008")]
)]
)],
),
remove = vec![],
));
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![
xds_test::req!(
t = ResourceType::Cluster,
add = vec!["cooler.example.internal:8008"]
),
xds_test::req!(t = ResourceType::Listener, n = "2"),
xds_test::req!(
t = ResourceType::RouteConfiguration,
n = "3",
add = vec!["warm-route"]
),
],
);
}
#[test]
fn test_handle_ads_message_listener_removed() {
let mut cache = Cache::default();
assert!(cache.is_wildcard(ResourceType::Listener));
let (mut conn, _) = new_conn(&mut cache);
conn.handle_ads_message(xds_test::resp!(
n = "1",
add = ResourceVec::from_listeners(
"123".into(),
vec![xds_test::listener!("cooler.example.org", "cool-route")],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "2",
add = ResourceVec::from_listeners(
"456".into(),
vec![xds_test::listener!("warmer.example.org", "warm-route")],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "3",
add = ResourceVec::from_route_configs(
"789".into(),
vec![xds_test::route_config!(
"cool-route",
vec![xds_test::vhost!(
"an-vhost",
["cooler.example.org"],
[xds_test::route!(default "cooler.example.internal:8008")]
)]
)],
),
remove = vec![],
));
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![
xds_test::req!(
t = ResourceType::Cluster,
add = vec!["cooler.example.internal:8008"]
),
xds_test::req!(t = ResourceType::Listener, n = "2"),
xds_test::req!(
t = ResourceType::RouteConfiguration,
n = "3",
add = vec!["warm-route"]
),
],
);
conn.handle_ads_message(xds_test::resp!(
n = "4",
add = ResourceVec::from_listeners("123".into(), vec![]),
remove = vec!["warmer.example.org"],
));
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![
xds_test::req!(t = ResourceType::Listener, n = "4"),
xds_test::req!(
t = ResourceType::RouteConfiguration,
remove = vec!["warm-route"],
),
]
);
}
#[test]
fn test_handle_ads_message_cluster_cla() {
let mut cache = Cache::default();
assert!(cache.is_wildcard(ResourceType::Cluster));
let (mut conn, _) = new_conn(&mut cache);
conn.handle_ads_message(xds_test::resp!(
n = "1",
add = ResourceVec::from_clusters(
"123".into(),
vec![
xds_test::cluster!("cooler.example.org:2345"),
xds_test::cluster!("thing.default.svc.cluster.local:9876"),
],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "2",
add = ResourceVec::from_load_assignments(
"123".into(),
vec![xds_test::cla!(
"thing.default.svc.cluster.local:9876" => {
"zone1" => ["1.1.1.1"]
}
)],
),
remove = vec![],
));
conn.handle_ads_message(xds_test::resp!(
n = "3",
add = ResourceVec::from_listeners("555".into(), vec![
xds_test::listener!("cooler.example.org.lb.jct:2345", "lb-route" => [xds_test::vhost!(
"lb-vhost",
["cooler.example.org.lb.jct:2345"],
[xds_test::route!(default ring_hash = "x-user", "cooler.example.org:2345")],
)]),
xds_test::listener!("thing.default.svc.cluster.local.lb.jct:9876", "lb-route" => [xds_test::vhost!(
"lb-vhost",
["cooler.example.org.lb.jct:2345"],
[xds_test::route!(default ring_hash = "x-user", "thing.default.svc.cluster.local:9876")],
)])
]),
remove = vec![],
));
let (outgoing, dns) = conn.outgoing();
assert_eq!(
dns,
DnsUpdates {
add: [(Hostname::from_static("cooler.example.org"), 2345)]
.into_iter()
.collect(),
..Default::default()
}
);
assert_eq!(
outgoing,
vec![
xds_test::req!(t = ResourceType::Cluster, n = "1"),
xds_test::req!(t = ResourceType::ClusterLoadAssignment, n = "2"),
xds_test::req!(t = ResourceType::Listener, n = "3"),
]
);
}
#[test]
fn test_set_node_after_init() {
let mut cache = Cache::default();
for rtype in ResourceType::all() {
cache.set_wildcard(*rtype, false);
}
let (mut conn, outgoing) = new_conn(&mut cache);
assert!(outgoing.is_empty());
let svc = Service::dns("website.internal").unwrap().as_backend_id(80);
conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![svc]));
let (outgoing, _) = conn.outgoing();
assert_eq!(outgoing[0].node.as_ref(), Some(&*TEST_NODE));
}
#[test]
fn test_handle_unknown_type_url() {
let mut cache = Cache::default();
let (mut conn, _) = new_conn(&mut cache);
conn.handle_ads_message(DeltaDiscoveryResponse {
type_url: "made.up.type_url/Potato".to_string(),
..Default::default()
});
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![DeltaDiscoveryRequest {
type_url: "made.up.type_url/Potato".to_string(),
error_detail: Some(xds_api::pb::google::rpc::Status {
code: tonic::Code::InvalidArgument.into(),
message: "unknown type".to_string(),
..Default::default()
}),
..Default::default()
}]
);
}
#[test]
fn test_handle_invalid_resource() {
let mut cache = Cache::default();
let (mut conn, _) = new_conn(&mut cache);
let node = xds_core::Node {
id: "some-node".to_string(),
..Default::default()
};
conn.handle_ads_message(DeltaDiscoveryResponse {
type_url: ResourceType::Listener.type_url().to_string(),
resources: vec![xds_discovery::Resource {
resource: Some(protobuf::Any::from_msg(&node).unwrap()),
..Default::default()
}],
..Default::default()
});
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert!(matches!(
&outgoing[..],
[DeltaDiscoveryRequest { type_url, error_detail, ..}] if
type_url == ResourceType::Listener.type_url() &&
error_detail.as_ref().is_some_and(|e| e.message.starts_with("invalid resource"))
));
}
#[test]
fn test_handle_does_not_exist() {
let mut cache = Cache::default();
let (mut conn, _) = new_conn(&mut cache);
let does_not_exist = Service::dns("website.internal").unwrap().name();
conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![does_not_exist.clone()]));
let _ = conn.outgoing();
conn.handle_ads_message(DeltaDiscoveryResponse {
nonce: "boo".to_string(),
type_url: ResourceType::Listener.type_url().to_string(),
removed_resources: vec![does_not_exist.clone()],
..Default::default()
});
let (outgoing, dns) = conn.outgoing();
assert!(dns.is_noop());
assert_eq!(
outgoing,
vec![xds_test::req!(t = ResourceType::Listener, n = "boo")],
);
let route = cache
.reader()
.get_route("website.internal")
.now_or_never()
.unwrap();
assert_eq!(route, None);
}
}