use crossbeam_skiplist::SkipMap;
use enum_map::EnumMap;
use junction_api::VirtualHost;
use junction_api::{http::Route, BackendId};
use petgraph::{
graph::{DiGraph, NodeIndex},
visit::{self, Visitable},
Direction,
};
use prost::Name;
use std::collections::BTreeSet;
use std::str::FromStr;
use std::sync::Arc;
use xds_api::pb::envoy::config::{
cluster::v3::{self as xds_cluster},
endpoint::v3::{self as xds_endpoint},
listener::v3::{self as xds_listener},
route::v3::{self as xds_route},
};
use xds_api::pb::google::protobuf;
use crate::{BackendLb, ConfigCache, EndpointGroup};
use super::resources::{
ApiListener, ApiListenerRouteConfig, Cluster, ClusterEndpointData, LoadAssignment,
ResourceError, ResourceName, ResourceType, ResourceTypeSet, ResourceVec, RouteConfig,
};
use super::ResourceVersion;
#[derive(Debug, Clone)]
struct CacheEntry<T> {
pub version: ResourceVersion,
pub last_error: Option<(ResourceVersion, ResourceError)>,
pub data: Option<T>,
}
impl<T: CacheEntryData> CacheEntry<T> {}
trait CacheEntryData {
type Xds;
fn xds(&self) -> &Self::Xds;
}
macro_rules! impl_cache_entry {
($entry_ty:ty, $xds_ty:ty) => {
impl CacheEntryData for $entry_ty {
type Xds = $xds_ty;
fn xds(&self) -> &$xds_ty {
&self.xds
}
}
};
}
impl_cache_entry!(ApiListener, xds_listener::Listener);
impl_cache_entry!(RouteConfig, xds_route::RouteConfiguration);
impl_cache_entry!(Cluster, xds_cluster::Cluster);
impl_cache_entry!(LoadAssignment, xds_endpoint::ClusterLoadAssignment);
#[derive(Debug)]
struct ResourceMap<T>(SkipMap<String, CacheEntry<T>>);
impl<T> Default for ResourceMap<T> {
fn default() -> Self {
Self(Default::default())
}
}
impl<T: Send + 'static> ResourceMap<T> {
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn get<'a>(&'a self, name: &str) -> Option<ResourceEntry<'a, T>> {
self.0.get(name).map(ResourceEntry)
}
fn iter(&self) -> impl Iterator<Item = ResourceEntry<T>> + '_ {
self.0.iter().map(ResourceEntry)
}
fn names(&self) -> impl Iterator<Item = String> + '_ {
self.0.iter().map(|e| e.key().clone())
}
fn remove(&self, name: &str) {
self.0.remove(name);
}
fn remove_all<I>(&self, names: I)
where
I: IntoIterator<Item: AsRef<str>>,
{
for name in names {
self.0.remove(name.as_ref());
}
}
}
impl<X, T> ResourceMap<T>
where
T: CacheEntryData<Xds = X> + Clone + Send + 'static,
X: PartialEq + prost::Name,
{
fn insert_ok(&self, name: String, version: ResourceVersion, t: T) {
self.0.insert(
name,
CacheEntry {
version,
last_error: None,
data: Some(t),
},
);
}
fn insert_error<E: Into<ResourceError>>(
&self,
name: String,
version: ResourceVersion,
error: E,
) {
match self.0.get(&name) {
Some(entry) => {
let mut updated_entry = entry.value().clone();
updated_entry.last_error = Some((version, error.into()));
self.0.insert(name, updated_entry);
}
None => {
self.0.insert(
name,
CacheEntry {
version: ResourceVersion::default(),
last_error: Some((version, error.into())),
data: None,
},
);
}
}
}
fn is_changed(&self, name: &str, t: &X) -> bool {
let Some(entry) = self.0.get(name) else {
return true;
};
let Some(entry_data) = &entry.value().data else {
return true;
};
entry_data.xds() != t
}
}
struct ResourceEntry<'a, T>(crossbeam_skiplist::map::Entry<'a, String, CacheEntry<T>>);
impl<'a, T> ResourceEntry<'a, T> {
fn name(&self) -> &str {
self.0.key()
}
fn version(&self) -> &ResourceVersion {
&self.0.value().version
}
fn last_error(&self) -> Option<&(ResourceVersion, ResourceError)> {
self.0.value().last_error.as_ref()
}
fn data(&self) -> Option<&T> {
self.0.value().data.as_ref()
}
}
#[derive(Default, Clone)]
pub(crate) struct CacheReader {
data: Arc<CacheData>,
}
#[derive(Debug, Default, Clone)]
pub struct XdsConfig {
pub name: String,
pub type_url: String,
pub version: ResourceVersion,
pub xds: Option<protobuf::Any>,
pub last_error: Option<(ResourceVersion, String)>,
}
impl CacheReader {
pub(crate) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
let listener_routes = self.data.listeners.iter().filter_map(|entry| {
entry
.data()
.and_then(|api_listener| match &api_listener.route_config {
ApiListenerRouteConfig::Inlined { route, .. } => Some(route.clone()),
_ => None,
})
});
let route_config_routes = self
.data
.route_configs
.iter()
.filter_map(|entry| entry.data().map(|route_config| route_config.route.clone()));
listener_routes.chain(route_config_routes)
}
pub(crate) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
self.data
.clusters
.iter()
.filter_map(|entry| entry.data().map(|cluster| cluster.backend_lb.clone()))
}
pub(crate) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
macro_rules! any_iter {
($field:ident, $xds_type:ty) => {
self.data.$field.iter().map(|entry| {
let name = entry.name().to_string();
let type_url = <$xds_type>::type_url();
let version = entry.version().clone();
let xds = entry.data().map(|data| {
protobuf::Any::from_msg(data.xds()).expect("generated invalid protobuf")
});
let last_error = entry.last_error().map(|(v, e)| (v.clone(), e.to_string()));
XdsConfig {
name,
type_url,
version,
xds,
last_error,
}
})
};
}
any_iter!(listeners, xds_listener::Listener)
.chain(any_iter!(route_configs, xds_route::RouteConfiguration))
.chain(any_iter!(clusters, xds_cluster::Cluster))
.chain(any_iter!(
load_assignments,
xds_endpoint::ClusterLoadAssignment
))
}
}
impl ConfigCache for CacheReader {
fn get_route(&self, target: &VirtualHost) -> Option<Arc<Route>> {
let listener = self.data.listeners.get(&target.name())?;
match &listener.data()?.route_config {
ApiListenerRouteConfig::RouteConfig { name } => {
let route_config = self.data.route_configs.get(name.as_str())?;
route_config.data().map(|r| r.route.clone())
}
ApiListenerRouteConfig::Inlined { route, .. } => Some(route.clone()),
}
}
fn get_backend(
&self,
target: &BackendId,
) -> (Option<Arc<BackendLb>>, Option<Arc<EndpointGroup>>) {
macro_rules! tri {
($e:expr) => {
match $e {
Some(value) => value,
None => return (None, None),
}
};
}
let cluster = tri!(self.data.clusters.get(&target.name()));
let cluster_data = tri!(cluster.data());
let backend_and_lb = Some(cluster_data.backend_lb.clone());
match &cluster_data.endpoints {
ClusterEndpointData::Inlined { endpoint_group, .. } => {
(backend_and_lb, Some(endpoint_group.clone()))
}
ClusterEndpointData::LoadAssignment { name } => {
let load_assignment = match self.data.load_assignments.get(name.as_str()) {
Some(load_assignment) => load_assignment,
None => return (backend_and_lb, None),
};
let endpoint_group = load_assignment.data().map(|d| d.endpoint_group.clone());
(backend_and_lb, endpoint_group)
}
}
}
}
#[derive(Default, Debug)]
pub(super) struct Cache {
refs: DiGraph<GCData, ()>,
data: Arc<CacheData>,
}
#[derive(Debug)]
struct GCData {
name: String,
resource_type: ResourceType,
pinned: bool,
deleted: bool,
}
impl GCData {
fn is_gc_root(&self) -> bool {
self.pinned && !self.deleted
}
}
#[derive(Debug, Default)]
struct CacheData {
listeners: ResourceMap<ApiListener>,
route_configs: ResourceMap<RouteConfig>,
clusters: ResourceMap<Cluster>,
load_assignments: ResourceMap<LoadAssignment>,
}
impl Cache {
pub fn reader(&self) -> CacheReader {
CacheReader {
data: self.data.clone(),
}
}
pub fn subscriptions(&self, resource_type: ResourceType) -> Vec<String> {
let weights = self
.refs
.node_weights()
.filter(|n| n.resource_type == resource_type);
weights.map(|n| n.name.clone()).collect()
}
pub fn insert(
&mut self,
version: crate::xds::ResourceVersion,
resources: ResourceVec,
) -> (ResourceTypeSet, Vec<ResourceError>) {
let (changed, errs) = match resources {
ResourceVec::Listener(ls) => self.insert_listeners(version, ls),
ResourceVec::RouteConfiguration(rcs) => self.insert_route_configs(version, rcs),
ResourceVec::Cluster(cs) => self.insert_clusters(version, cs),
ResourceVec::ClusterLoadAssignment(clas) => self.insert_load_assignments(version, clas),
};
if !changed.is_empty() {
self.collect();
}
(changed, errs)
}
pub fn delete(&mut self, resource_type: ResourceType, name: &str) -> bool {
if !self.delete_ref(resource_type, name, true) {
return false;
}
match resource_type {
ResourceType::Cluster => {
self.data.clusters.remove(name);
}
ResourceType::ClusterLoadAssignment => {
self.data.load_assignments.remove(name);
}
ResourceType::Listener => {
self.data.listeners.remove(name);
}
ResourceType::RouteConfiguration => {
self.data.route_configs.remove(name);
}
}
true
}
pub fn subscribe(&mut self, resource_type: ResourceType, name: &str) -> bool {
let (node, created) = self.find_or_create_ref(resource_type, name);
self.pin_ref(node);
created
}
}
impl Cache {
fn collect(&mut self) {
use visit::{Control, DfsEvent};
let mut reachable = self.refs.visit_map();
visit::depth_first_search(&self.refs, self.gc_roots(), |event| -> Control<()> {
if let DfsEvent::Discover(n, _) = event {
if reachable.contains(n.index()) {
return Control::Prune;
}
reachable.insert(n.index());
};
Control::Continue
});
let unreachable_nodes = self
.refs
.node_indices()
.filter(|n| !reachable.contains(n.index()));
let mut unreachable_names: EnumMap<ResourceType, Vec<String>> = EnumMap::default();
for n in unreachable_nodes {
let n = &self.refs[n];
unreachable_names[n.resource_type].push(n.name.to_string());
}
for (resource_type, names) in unreachable_names.into_iter() {
match resource_type {
ResourceType::Listener => self.data.listeners.remove_all(&names),
ResourceType::RouteConfiguration => self.data.route_configs.remove_all(&names),
ResourceType::Cluster => self.data.clusters.remove_all(&names),
ResourceType::ClusterLoadAssignment => {
self.data.load_assignments.remove_all(&names);
}
}
}
self.refs
.retain_nodes(|g, n| g[n].pinned || reachable.contains(n.index()));
}
fn insert_listeners(
&mut self,
version: crate::xds::ResourceVersion,
listeners: Vec<xds_listener::Listener>,
) -> (ResourceTypeSet, Vec<ResourceError>) {
let mut changed = ResourceTypeSet::default();
let mut errors = Vec::new();
let mut to_remove: BTreeSet<_> = self.data.listeners.names().collect();
for listener in listeners {
to_remove.remove(&listener.name);
if self.data.listeners.is_changed(&listener.name, &listener) {
let listener_name = listener.name.clone();
let api_listener = match ApiListener::from_xds(&listener_name, listener) {
Ok(l) => l,
Err(e) => {
self.data
.listeners
.insert_error(listener_name, version.clone(), e.clone());
errors.push(e);
continue;
}
};
let (node, _) = self.find_or_create_ref(ResourceType::Listener, &listener_name);
self.reset_ref(node);
match &api_listener.route_config {
ApiListenerRouteConfig::RouteConfig { name } => {
let (rc_node, created) = self
.find_or_create_ref(ResourceType::RouteConfiguration, name.as_str());
self.refs.update_edge(node, rc_node, ());
if created {
changed.insert(ResourceType::RouteConfiguration);
}
}
ApiListenerRouteConfig::Inlined {
clusters,
default_action,
..
} => {
let mut clusters_changed = false;
for cluster in clusters {
let (cluster_node, created) =
self.find_or_create_ref(ResourceType::Cluster, cluster.as_str());
self.refs.update_edge(node, cluster_node, ());
clusters_changed |= created;
}
if clusters_changed {
changed.insert(ResourceType::Cluster);
}
if let Some((cluster, route_action)) = default_action {
if let Err(e) =
self.rebuild_cluster(&mut changed, cluster, route_action)
{
self.data.listeners.insert_error(
listener_name,
version.clone(),
e.clone(),
);
errors.push(e);
continue;
}
}
}
}
self.data
.listeners
.insert_ok(listener_name, version.clone(), api_listener);
changed.insert(ResourceType::Listener);
}
}
for name in to_remove {
changed.insert(ResourceType::Listener);
self.delete_ref(ResourceType::Listener, &name, false);
self.data.listeners.remove(&name);
}
(changed, errors)
}
fn insert_clusters(
&mut self,
version: crate::xds::ResourceVersion,
clusters: Vec<xds_cluster::Cluster>,
) -> (ResourceTypeSet, Vec<ResourceError>) {
let mut changed = ResourceTypeSet::default();
let mut errors = Vec::new();
let mut to_remove: BTreeSet<_> = self.data.clusters.names().collect();
for cluster in clusters {
to_remove.remove(&cluster.name);
if self.data.clusters.is_changed(&cluster.name, &cluster) {
let action = self.find_passthrough_action(&cluster.name);
if let Err(e) =
self.insert_cluster(&mut changed, &version, cluster, action.as_ref())
{
errors.push(e);
}
}
}
for name in to_remove {
changed.insert(ResourceType::Cluster);
self.delete_ref(ResourceType::Cluster, &name, false);
self.data.clusters.remove(&name);
}
(changed, errors)
}
fn insert_route_configs(
&mut self,
version: crate::xds::ResourceVersion,
route_configs: Vec<xds_route::RouteConfiguration>,
) -> (ResourceTypeSet, Vec<ResourceError>) {
let mut errors = Vec::new();
let mut changed = ResourceTypeSet::default();
for route_config in route_configs {
if self
.data
.route_configs
.is_changed(&route_config.name, &route_config)
{
changed.insert(ResourceType::RouteConfiguration);
let Some(node) =
self.find_ref(ResourceType::RouteConfiguration, &route_config.name)
else {
continue;
};
let route_config_name = route_config.name.clone();
let route_config = match RouteConfig::from_xds(route_config) {
Ok(rc) => rc,
Err(e) => {
self.data.route_configs.insert_error(
route_config_name,
version.clone(),
e.clone(),
);
errors.push(e.into());
continue;
}
};
if let Some((cluster, route_action)) = &route_config.passthrough_action {
if let Err(e) = self.rebuild_cluster(&mut changed, cluster, route_action) {
errors.push(e);
}
}
self.reset_ref(node);
for cluster in &route_config.clusters {
let (cluster, _) =
self.find_or_create_ref(ResourceType::Cluster, cluster.as_str());
self.refs.update_edge(node, cluster, ());
}
self.data
.route_configs
.insert_ok(route_config_name, version.clone(), route_config);
}
}
(changed, errors)
}
fn insert_load_assignments(
&mut self,
version: crate::xds::ResourceVersion,
load_assignments: Vec<xds_endpoint::ClusterLoadAssignment>,
) -> (ResourceTypeSet, Vec<ResourceError>) {
let mut changed = ResourceTypeSet::default();
for load_assignment in load_assignments {
if self
.data
.load_assignments
.is_changed(&load_assignment.cluster_name, &load_assignment)
{
let Some(cla_node) = self.find_ref(
ResourceType::ClusterLoadAssignment,
&load_assignment.cluster_name,
) else {
continue;
};
let target = {
let cluster_node = self
.parent_refs(cla_node)
.next()
.expect("GC leak: ClusterLoadAssignment must have a parent cluster");
let cluster = self
.data
.clusters
.get(&self.refs[cluster_node].name)
.expect("GC leak: parent Cluster was removed from cache");
cluster
.data()
.expect("GC leak: parent Cluster has no data")
.backend_lb
.config
.id
.clone()
};
let load_assignment_name = load_assignment.cluster_name.clone();
let load_assignment = LoadAssignment::from_xds(target, load_assignment);
self.data.load_assignments.insert_ok(
load_assignment_name,
version.clone(),
load_assignment,
);
changed.insert(ResourceType::ClusterLoadAssignment);
}
}
(changed, Vec::new())
}
fn rebuild_cluster(
&mut self,
changed: &mut ResourceTypeSet,
cluster: &ResourceName<Cluster>,
route_action: &xds_route::RouteAction,
) -> Result<(), ResourceError> {
let version_and_xds = self.data.clusters.get(cluster.as_str()).and_then(|e| {
let version = e.version();
e.data().map(|d| (version.clone(), d.xds().clone()))
});
match version_and_xds {
Some((version, xds)) => self.insert_cluster(changed, &version, xds, Some(route_action)),
None => Ok(()),
}
}
fn insert_cluster(
&mut self,
changed: &mut ResourceTypeSet,
version: &crate::xds::ResourceVersion,
cluster: xds_cluster::Cluster,
default_action: Option<&xds_route::RouteAction>,
) -> Result<(), ResourceError> {
let Some(node) = self.find_ref(ResourceType::Cluster, &cluster.name) else {
return Ok(());
};
let cluster_name = cluster.name.clone();
let cluster = match Cluster::from_xds(cluster, default_action) {
Ok(c) => c,
Err(e) => {
self.data
.clusters
.insert_error(cluster_name, version.clone(), e.clone());
return Err(e);
}
};
self.reset_ref(node);
if let ClusterEndpointData::LoadAssignment { name } = &cluster.endpoints {
changed.insert(ResourceType::ClusterLoadAssignment);
let (cla_node, _) =
self.find_or_create_ref(ResourceType::ClusterLoadAssignment, name.as_str());
self.refs.update_edge(node, cla_node, ());
}
let passthrough_listener_name = cluster.backend_lb.config.id.passthrough_route_name();
let (default_listener_node, created) =
self.find_or_create_ref(ResourceType::Listener, &passthrough_listener_name);
if created {
changed.insert(ResourceType::Listener);
}
self.refs.update_edge(node, default_listener_node, ());
self.data
.clusters
.insert_ok(cluster_name, version.clone(), cluster);
changed.insert(ResourceType::Cluster);
Ok(())
}
fn gc_roots(&self) -> Vec<NodeIndex> {
self.refs
.node_indices()
.filter(|idx| self.refs[*idx].is_gc_root())
.collect()
}
fn delete_ref(&mut self, resource_type: ResourceType, name: &str, force: bool) -> bool {
match self.find_ref(resource_type, name) {
Some(node) => {
if force || !self.refs[node].pinned {
self.refs.remove_node(node);
true
} else {
self.refs[node].deleted = true;
false
}
}
None => false,
}
}
fn parent_refs(&self, node: NodeIndex) -> impl Iterator<Item = NodeIndex> + '_ {
self.refs.neighbors_directed(node, Direction::Incoming)
}
fn find_passthrough_action(&self, cluster_name: &str) -> Option<xds_route::RouteAction> {
let target = BackendId::from_str(cluster_name).ok()?;
let listener = self.data.listeners.get(&target.passthrough_route_name())?;
match &listener.data()?.route_config {
ApiListenerRouteConfig::RouteConfig { name } => {
let route = self.data.route_configs.get(name.as_str())?;
let default_action = &route.data()?.passthrough_action;
default_action.as_ref().map(|(_, a)| a.clone())
}
ApiListenerRouteConfig::Inlined { default_action, .. } => {
default_action.as_ref().map(|(_, a)| a.clone())
}
}
}
fn find_or_create_ref(&mut self, resource_type: ResourceType, name: &str) -> (NodeIndex, bool) {
if let Some(node) = self.find_ref(resource_type, name) {
self.refs[node].deleted = false;
return (node, false);
}
let node = self.refs.add_node(GCData {
name: name.to_string(),
resource_type,
pinned: false,
deleted: false,
});
(node, true)
}
fn find_ref(&self, resource_type: ResourceType, name: &str) -> Option<NodeIndex> {
self.refs.node_indices().find(|n| {
let n = &self.refs[*n];
n.resource_type == resource_type && n.name == name
})
}
fn pin_ref(&mut self, node: NodeIndex) {
self.refs[node].pinned = true;
}
fn reset_ref(&mut self, node: NodeIndex) {
let neighbors: Vec<_> = self
.refs
.neighbors_directed(node, Direction::Outgoing)
.collect();
for n in neighbors {
if let Some((edge, _)) = self.refs.find_edge_undirected(node, n) {
self.refs.remove_edge(edge);
};
}
}
}
#[cfg(test)]
mod test {
use junction_api::{backend::LbPolicy, Target};
use super::*;
use crate::xds::test as xds_test;
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
#[test]
fn assert_reader_send_sync() {
assert_send::<CacheReader>();
assert_sync::<CacheReader>();
}
#[track_caller]
fn assert_insert((changed, errors): (ResourceTypeSet, Vec<ResourceError>)) -> ResourceTypeSet {
assert!(errors.is_empty(), "first error = {}", errors[0]);
changed
}
#[track_caller]
fn assert_subscribe_insert(
cache: &mut Cache,
version: ResourceVersion,
resources: ResourceVec,
) {
for name in resources.names() {
cache.subscribe(resources.resource_type(), &name);
}
assert_insert(cache.insert(version, resources));
}
#[test]
fn test_insert_listener_inline_route_config() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local" => [xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster.example:80")],
)],
)]),
);
assert!(cache
.data
.listeners
.get("listener.example.svc.cluster.local")
.is_some());
assert!(cache.data.route_configs.is_empty());
}
#[test]
fn test_insert_invalid_listener() {
let mut cache = Cache::default();
cache.subscribe(ResourceType::Listener, "potato");
let (changed, errors) = cache.insert(
"123".into(),
ResourceVec::Listener(vec![xds_listener::Listener {
name: "potato".to_string(),
..Default::default()
}]),
);
assert!(changed.is_empty());
assert_eq!(errors.len(), 1);
let listener_data = cache.data.listeners.get("potato").unwrap();
assert!(listener_data.data().is_none());
assert!(listener_data.name() == "potato");
assert!(*listener_data.version() == "".into());
assert!(matches!(listener_data.last_error(), Some((v, _)) if *v == "123".into()));
}
#[test]
fn test_insert_listener_rds() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![
xds_test::listener!(
"listener1.example.svc.cluster.local",
"rc1.example.svc.cluster.local"
),
xds_test::listener!(
"listener2.example.svc.cluster.local",
"rc2.example.svc.cluster.local"
),
]),
);
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec![
"listener1.example.svc.cluster.local",
"listener2.example.svc.cluster.local"
],
);
assert!(cache.data.route_configs.is_empty());
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"rc1.example.svc.cluster.local",
[xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster1.example:8913")],
)]
)]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec![
"listener1.example.svc.cluster.local",
"listener2.example.svc.cluster.local"
],
);
assert_eq!(
cache.data.route_configs.names().collect::<Vec<_>>(),
vec!["rc1.example.svc.cluster.local"],
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"rc2.example.svc.cluster.local",
[xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster1.example:8913")],
)]
)]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec![
"listener1.example.svc.cluster.local",
"listener2.example.svc.cluster.local"
],
);
assert_eq!(
cache.data.route_configs.names().collect::<Vec<_>>(),
vec![
"rc1.example.svc.cluster.local",
"rc2.example.svc.cluster.local"
],
);
}
#[test]
fn test_insert_cluster_eds() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(eds "cluster1.example:8913")]),
);
assert!(cache.data.listeners.is_empty());
assert!(cache.data.route_configs.is_empty());
assert!(cache.data.clusters.get("cluster1.example:8913").is_some());
assert!(cache.data.load_assignments.is_empty());
}
#[test]
fn test_insert_load_assignment() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::ClusterLoadAssignment(vec![xds_test::cla!(
"cluster1.example:8913" => {
"zone1" => ["1.1.1.1"]
}
)]),
));
assert!(cache.data.listeners.is_empty());
assert!(cache.data.route_configs.is_empty());
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert_eq!(
cache.data.load_assignments.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913"],
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::ClusterLoadAssignment(vec![xds_test::cla!(
"cluster2.example:8913" => {
"zone2" => ["2.2.2.2"]
}
)]),
));
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert_eq!(
cache.data.load_assignments.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
}
#[test]
fn test_insert_load_assignment_missing_ref() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
let (changed, errors) = cache.insert(
"123".into(),
ResourceVec::ClusterLoadAssignment(vec![xds_test::cla!(
"cluster3.example.svc.cluster.local" => {
"zone2" => ["2.2.2.2"]
}
)]),
);
assert!(changed.is_empty());
assert!(errors.is_empty());
}
#[test]
fn test_insert_deletes_listeners() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"nginx.default.local" => [xds_test::vhost!(
"default",
["nginx.default.local"],
[xds_test::route!(default "nginx.default.local:80")],
)],
)]),
);
assert!(cache.data.listeners.get("nginx.default.local").is_some());
assert_eq!(cache.refs.node_count(), 2);
assert_insert(cache.insert("123".into(), ResourceVec::Listener(Vec::new())));
assert!(cache.data.listeners.is_empty());
assert_eq!(
cache.refs.node_count(),
1,
"should still be subscribed to the removed Listener",
);
}
#[test]
fn test_insert_deletes_clusters() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(eds "cluster2.example:8913")]),
));
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster2.example:8913"],
);
assert_insert(cache.insert("123".into(), ResourceVec::Cluster(vec![])));
assert!(cache.data.clusters.is_empty());
}
#[test]
fn test_deletes_keep_subscriptions() {
let mut cache = Cache::default();
cache.subscribe(ResourceType::Listener, "listener.example.svc.cluster.local");
cache.subscribe(ResourceType::Cluster, "cluster1.example:8913");
cache.subscribe(ResourceType::Cluster, "cluster2.example:8913");
assert_insert(cache.insert(
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local" => [xds_test::vhost!(
"default",
["*"],
[
xds_test::route!(header "x-staging" => "cluster2.example:8913"),
xds_test::route!(default "cluster1.example:8913"),
],
)],
)]),
));
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
));
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert_insert(cache.insert("123".into(), ResourceVec::Listener(vec![])));
assert_insert(cache.insert("123".into(), ResourceVec::Cluster(vec![])));
assert!(cache.data.listeners.is_empty());
assert!(cache.data.clusters.is_empty());
assert_eq!(
cache.subscriptions(ResourceType::Listener),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.subscriptions(ResourceType::Cluster),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
}
#[test]
fn test_insert_out_of_order() {
let mut cache = Cache::default();
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(
inline "cluster1.example.svc.cluster.local" => {
"zone1" => ["1.1.1.1", "2.2.2.2"],
"zone2" => ["3.3.3.3"]
})]),
));
assert!(cache.data.listeners.is_empty());
assert!(cache.data.clusters.is_empty());
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local" => [xds_test::vhost!(
"default",
["*"],
[xds_test::route!(default "cluster1.example:8913")],
)],
)]),
);
assert!(cache
.data
.listeners
.get("listener.example.svc.cluster.local")
.is_some());
assert!(cache.data.clusters.is_empty());
}
#[test]
fn test_cache_cluster_finds_passthrough_listener() {
let mut cache = Cache::default();
let svc = Target::kube_service("default", "something")
.unwrap()
.into_backend(8910);
let cluster_name = svc.name().leak();
let passthrough_name = svc.passthrough_route_name().leak();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
passthrough_name => [xds_test::vhost!(
"default",
["*"],
[xds_test::route!(default ring_hash = "x-user", cluster_name)],
)],
)]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(ring_hash eds cluster_name)]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if !params.hash_params.is_empty(),
)
},
"should have non-empty hash params"
);
}
#[test]
fn test_cache_cluster_finds_passthrough_route() {
let mut cache = Cache::default();
let svc = Target::kube_service("default", "something")
.unwrap()
.into_backend(8910);
let cluster_name = svc.name().leak();
let passthrough_name = svc.passthrough_route_name().leak();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
passthrough_name,
"example-route-config", )]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"example-route-config",
[xds_test::vhost!(
"example-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default ring_hash = "x-user", cluster_name),],
)]
)]),
));
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(ring_hash eds cluster_name)]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if !params.hash_params.is_empty(),
)
},
"should have non-empty hash params"
);
}
#[test]
fn test_cache_listener_rebuilds_cluster() {
let mut cache = Cache::default();
let svc = Target::kube_service("default", "something")
.unwrap()
.into_backend(8910);
let cluster_name = svc.passthrough_route_name().leak();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local"=> [xds_test::vhost!(
"default",
["listener.example.svc.cluster.local"],
[xds_test::route!(default cluster_name)],
)],
)]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(ring_hash eds cluster_name)]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if params.hash_params.is_empty(),
)
},
"should have empty hash params before Listener insert"
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::Listener(vec![
xds_test::listener!(
"listener.example.svc.cluster.local"=> [xds_test::vhost!(
"default",
["listener.example.svc.cluster.local"],
[xds_test::route!(default cluster_name)],
)],
),
xds_test::listener!(
cluster_name => [xds_test::vhost!(
"default",
["listener.example.svc.cluster.local"],
[xds_test::route!(default ring_hash = "x-user", cluster_name)],
)],
),
]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if !params.hash_params.is_empty(),
)
},
"should have non-empty hash params after Listener insert"
);
}
#[test]
fn test_cache_route_rebuilds_cluster() {
let mut cache = Cache::default();
let svc = Target::kube_service("default", "something")
.unwrap()
.into_backend(8910);
let cluster_name = svc.passthrough_route_name().leak();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
cluster_name,
"example-route-config",
)]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"example-route-config",
[xds_test::vhost!(
"example-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default cluster_name),],
)]
)]),
));
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![xds_test::cluster!(ring_hash eds cluster_name)]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if params.hash_params.is_empty(),
)
},
"should have empty hash params before the default route has a hash policy"
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"example-route-config",
[xds_test::vhost!(
"example-vhost",
["listener.example.svc.cluster.local"],
[xds_test::route!(default ring_hash = "x-user", cluster_name),],
)]
)]),
));
assert!(
{
let cluster = cache
.data
.clusters
.get(cluster_name)
.expect("Cache should contain cluster");
let cluster_data = cluster.data().expect("cluster should have data");
matches!(
&cluster_data.backend_lb.config.lb,
LbPolicy::RingHash(params) if !params.hash_params.is_empty(),
)
},
"should have non-empty hash params when the default route is updated with a hash policy",
);
}
#[test]
fn test_cache_gc_simple() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"rc.example.svc.cluster.local"
)]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"rc.example.svc.cluster.local",
[xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[
xds_test::route!(header "x-staging" => "cluster2.example:8913"),
xds_test::route!(default "cluster1.example:8913"),
],
)]
)]),
));
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.data.route_configs.names().collect::<Vec<_>>(),
vec!["rc.example.svc.cluster.local"],
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert!(cache.data.load_assignments.is_empty());
assert_eq!(cache.refs.node_count(), 8);
assert_insert(cache.insert("123".into(), ResourceVec::Listener(vec![])));
assert!(cache.data.listeners.is_empty());
assert!(cache.data.route_configs.is_empty());
assert!(cache.data.clusters.is_empty());
assert!(cache.data.load_assignments.is_empty());
assert_eq!(cache.refs.node_count(), 1);
}
#[test]
fn test_cache_gc_update_rds() {
let mut cache = Cache::default();
assert_subscribe_insert(
&mut cache,
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local",
"rc1.example.svc.cluster.local"
)]),
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"rc1.example.svc.cluster.local",
[xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[
xds_test::route!(header "x-staging" => "cluster2.example:8913"),
xds_test::route!(default "cluster1.example:8913"),
],
)]
)]),
));
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8913"),
xds_test::cluster!(eds "cluster2.example:8913"),
]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.data.route_configs.names().collect::<Vec<_>>(),
vec!["rc1.example.svc.cluster.local"],
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913", "cluster2.example:8913"],
);
assert!(cache.data.load_assignments.is_empty());
assert_insert(cache.insert(
"123".into(),
ResourceVec::RouteConfiguration(vec![xds_test::route_config!(
"rc1.example.svc.cluster.local",
[xds_test::vhost!(
"vhost1.example.svc.cluster.local",
["listener.example.svc.cluster.local"],
[xds_test::route!(default "cluster1.example:8913")],
)]
)]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.data.route_configs.names().collect::<Vec<_>>(),
vec!["rc1.example.svc.cluster.local"],
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8913"],
);
assert!(cache.data.load_assignments.is_empty());
}
#[test]
fn test_cache_gc_pinned() {
let mut cache = Cache::default();
cache.subscribe(ResourceType::Cluster, "cluster1.example:8888");
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8888"),
xds_test::cluster!(eds "cluster2.example:8888"),
]),
));
assert!(cache.data.listeners.is_empty());
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8888"],
);
cache.subscribe(ResourceType::Listener, "listener.example.svc.cluster.local");
assert_insert(cache.insert(
"123".into(),
ResourceVec::Listener(vec![xds_test::listener!(
"listener.example.svc.cluster.local" => [xds_test::vhost!(
"default",
["*"],
[
xds_test::route!(header "x-staging" => "cluster2.example:8888"),
xds_test::route!(default "cluster1.example:8888"),
],
)],
)]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8888"],
);
assert_insert(cache.insert(
"123".into(),
ResourceVec::Cluster(vec![
xds_test::cluster!(eds "cluster1.example:8888"),
xds_test::cluster!(eds "cluster2.example:8888"),
]),
));
assert_eq!(
cache.data.listeners.names().collect::<Vec<_>>(),
vec!["listener.example.svc.cluster.local"],
);
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8888", "cluster2.example:8888"],
);
assert_insert(cache.insert("123".into(), ResourceVec::Listener(vec![])));
assert!(cache.data.listeners.is_empty());
assert_eq!(
cache.data.clusters.names().collect::<Vec<_>>(),
vec!["cluster1.example:8888"],
);
}
}