junction_core/xds/
resources.rs

1use std::borrow::Cow;
2use std::collections::BTreeMap;
3use std::net::SocketAddr;
4use std::ops::Deref;
5use std::{collections::BTreeSet, marker::PhantomData, sync::Arc};
6
7use junction_api::backend::Backend;
8use junction_api::backend::BackendId;
9use junction_api::http::Route;
10use junction_api::Hostname;
11use smol_str::SmolStr;
12use xds_api::{
13    pb::envoy::{
14        config::{
15            cluster::v3 as xds_cluster, core::v3 as xds_core, endpoint::v3 as xds_endpoint,
16            listener::v3 as xds_listener, route::v3 as xds_route,
17        },
18        extensions::filters::network::http_connection_manager::v3 as xds_http,
19        service::discovery::v3 as xds_discovery,
20    },
21    WellKnownTypes,
22};
23
24use crate::endpoints::{EndpointGroup, Locality, LocalityInfo};
25use crate::load_balancer::{BackendLb, LoadBalancer};
26
27// FIXME: validate that the all the EDS config sources use ADS instead of just assuming it everywhere.
28
29#[derive(Clone, Debug, thiserror::Error)]
30pub(crate) enum ResourceError {
31    #[error("{0}")]
32    InvalidResource(#[from] junction_api::Error),
33
34    #[error("invalid xDS: {resource_name}: {message}")]
35    InvalidXds {
36        resource_name: String,
37        message: Cow<'static, str>,
38    },
39}
40
41impl ResourceError {
42    fn for_xds(resource_name: String, message: String) -> Self {
43        Self::InvalidXds {
44            resource_name,
45            message: message.into(),
46        }
47    }
48
49    fn for_xds_static(resource_name: String, message: &'static str) -> Self {
50        Self::InvalidXds {
51            resource_name,
52            message: message.into(),
53        }
54    }
55}
56
57/// An opaque string used to version an xDS resource.
58///
59/// `ResourceVersion`s are immutable and cheap to `clone` and share.
60#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
61pub struct ResourceVersion(SmolStr);
62
63impl Deref for ResourceVersion {
64    type Target = str;
65
66    fn deref(&self) -> &Self::Target {
67        &self.0
68    }
69}
70
71impl serde::Serialize for ResourceVersion {
72    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73    where
74        S: serde::Serializer,
75    {
76        serializer.serialize_str(&self.0)
77    }
78}
79
80impl AsRef<str> for ResourceVersion {
81    fn as_ref(&self) -> &str {
82        &self.0
83    }
84}
85
86macro_rules! impl_resource_version_from {
87    ($from_ty:ty) => {
88        impl From<$from_ty> for ResourceVersion {
89            fn from(s: $from_ty) -> ResourceVersion {
90                ResourceVersion(s.into())
91            }
92        }
93    };
94}
95
96impl_resource_version_from!(&str);
97impl_resource_version_from!(&mut str);
98impl_resource_version_from!(String);
99impl_resource_version_from!(&String);
100impl_resource_version_from!(Arc<str>);
101impl_resource_version_from!(Box<str>);
102
103/// The type of an xDS resource we store in cache.
104///
105/// The order these are declared in is the xDS make-before-break order, so that
106/// the [enum_map] crate keeps enum maps in this order. This means any time we
107/// need to iterate an EnumMap's values, we're probably doing it in an order
108/// that keeps state updates sane.
109#[derive(Debug, Copy, Clone, PartialEq, Eq, enum_map::Enum, Hash, PartialOrd, Ord)]
110pub(crate) enum ResourceType {
111    Cluster,
112    ClusterLoadAssignment,
113    Listener,
114    RouteConfiguration,
115}
116
117impl ResourceType {
118    fn as_well_known(&self) -> WellKnownTypes {
119        match self {
120            ResourceType::Cluster => WellKnownTypes::Cluster,
121            ResourceType::ClusterLoadAssignment => WellKnownTypes::ClusterLoadAssignment,
122            ResourceType::Listener => WellKnownTypes::Listener,
123            ResourceType::RouteConfiguration => WellKnownTypes::RouteConfiguration,
124        }
125    }
126
127    fn from_well_known(wkt: WellKnownTypes) -> Option<Self> {
128        match wkt {
129            WellKnownTypes::Cluster => Some(Self::Cluster),
130            WellKnownTypes::ClusterLoadAssignment => Some(Self::ClusterLoadAssignment),
131            WellKnownTypes::Listener => Some(Self::Listener),
132            WellKnownTypes::RouteConfiguration => Some(Self::RouteConfiguration),
133            _ => None,
134        }
135    }
136
137    pub(crate) const fn supports_wildcard(&self) -> bool {
138        matches!(self, ResourceType::Cluster | ResourceType::Listener)
139    }
140
141    /// Return all of the known enum variants in xDS's make-before-break order.
142    pub(crate) fn all() -> &'static [Self] {
143        &[
144            Self::Cluster,
145            Self::ClusterLoadAssignment,
146            Self::Listener,
147            Self::RouteConfiguration,
148        ]
149    }
150
151    pub(crate) fn type_url(&self) -> &'static str {
152        self.as_well_known().type_url()
153    }
154
155    pub(crate) fn from_type_url(type_url: &str) -> Option<Self> {
156        Self::from_well_known(WellKnownTypes::from_type_url(type_url)?)
157    }
158}
159
160#[derive(Clone, Debug)]
161pub(crate) enum ResourceVec {
162    Listener(VersionedVec<xds_listener::Listener>),
163    RouteConfiguration(VersionedVec<xds_route::RouteConfiguration>),
164    Cluster(VersionedVec<xds_cluster::Cluster>),
165    ClusterLoadAssignment(VersionedVec<xds_endpoint::ClusterLoadAssignment>),
166}
167
168type VersionedVec<T> = Vec<(ResourceVersion, T)>;
169
170impl ResourceVec {
171    pub(crate) fn from_resources(
172        rtype: ResourceType,
173        resources: Vec<xds_discovery::Resource>,
174    ) -> Result<Self, prost::DecodeError> {
175        match rtype {
176            ResourceType::Cluster => from_resource_vec(resources).map(Self::Cluster),
177            ResourceType::ClusterLoadAssignment => {
178                from_resource_vec(resources).map(Self::ClusterLoadAssignment)
179            }
180            ResourceType::Listener => from_resource_vec(resources).map(Self::Listener),
181            ResourceType::RouteConfiguration => {
182                from_resource_vec(resources).map(Self::RouteConfiguration)
183            }
184        }
185    }
186
187    #[cfg(test)]
188    pub(crate) fn resource_type(&self) -> ResourceType {
189        match self {
190            ResourceVec::Listener(_) => ResourceType::Listener,
191            ResourceVec::RouteConfiguration(_) => ResourceType::RouteConfiguration,
192            ResourceVec::Cluster(_) => ResourceType::Cluster,
193            ResourceVec::ClusterLoadAssignment(_) => ResourceType::ClusterLoadAssignment,
194        }
195    }
196
197    #[cfg(test)]
198    pub(crate) fn to_resources(&self) -> Result<Vec<xds_discovery::Resource>, prost::EncodeError> {
199        match self {
200            ResourceVec::Listener(vec) => to_resource_vec(vec),
201            ResourceVec::RouteConfiguration(vec) => to_resource_vec(vec),
202            ResourceVec::Cluster(vec) => to_resource_vec(vec),
203            ResourceVec::ClusterLoadAssignment(vec) => to_resource_vec(vec),
204        }
205    }
206}
207
208macro_rules! test_constructor {
209    ($name:ident, $variant:ident, $xds_type:ty) => {
210        impl ResourceVec {
211            #[cfg(test)]
212            pub(crate) fn $name<I: IntoIterator<Item = $xds_type>>(
213                version: ResourceVersion,
214                xs: I,
215            ) -> Self {
216                let data = xs.into_iter().map(|l| (version.clone(), l)).collect();
217                Self::$variant(data)
218            }
219        }
220    };
221}
222
223test_constructor!(from_listeners, Listener, xds_listener::Listener);
224test_constructor!(
225    from_route_configs,
226    RouteConfiguration,
227    xds_route::RouteConfiguration
228);
229test_constructor!(from_clusters, Cluster, xds_cluster::Cluster);
230test_constructor!(
231    from_load_assignments,
232    ClusterLoadAssignment,
233    xds_endpoint::ClusterLoadAssignment
234);
235
236fn from_resource_vec<M: Default + prost::Name>(
237    resources: Vec<xds_discovery::Resource>,
238) -> Result<VersionedVec<M>, prost::DecodeError> {
239    let mut ms = Vec::with_capacity(resources.len());
240    for r in resources {
241        let Some(any) = r.resource else {
242            continue;
243        };
244        ms.push((ResourceVersion::from(r.version), any.to_msg()?));
245    }
246
247    Ok(ms)
248}
249
250#[cfg(test)]
251fn to_resource_vec<M: prost::Name>(
252    xs: &VersionedVec<M>,
253) -> Result<Vec<xds_discovery::Resource>, prost::EncodeError> {
254    use xds_api::pb::google::protobuf;
255
256    let mut resources = Vec::with_capacity(xs.len());
257
258    for (v, msg) in xs {
259        let as_any = protobuf::Any::from_msg(msg)?;
260        resources.push(xds_discovery::Resource {
261            resource: Some(as_any),
262            version: v.to_string(),
263            ..Default::default()
264        })
265    }
266
267    Ok(resources)
268}
269
270/// A typed reference to another resource.
271///
272/// This is functionally a `String` and implements `Clone`, `Ord`, and `Eq`` as
273/// if it was just a string.
274#[derive(Debug)]
275pub(crate) struct ResourceName<T> {
276    _type: PhantomData<T>,
277    name: String,
278}
279
280impl<T> ResourceName<T> {
281    pub fn as_str(&self) -> &str {
282        &self.name
283    }
284}
285
286impl<T> Clone for ResourceName<T> {
287    fn clone(&self) -> Self {
288        Self {
289            _type: PhantomData,
290            name: self.name.clone(),
291        }
292    }
293}
294
295impl<T> From<String> for ResourceName<T> {
296    fn from(name: String) -> Self {
297        Self {
298            _type: PhantomData,
299            name,
300        }
301    }
302}
303
304impl<T> PartialEq for ResourceName<T> {
305    fn eq(&self, other: &Self) -> bool {
306        self.name == other.name
307    }
308}
309
310impl<T> Eq for ResourceName<T> {}
311
312#[allow(clippy::non_canonical_partial_ord_impl)]
313impl<T> PartialOrd for ResourceName<T> {
314    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
315        self.name.partial_cmp(&other.name)
316    }
317}
318
319impl<T> Ord for ResourceName<T> {
320    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
321        self.name.cmp(&other.name)
322    }
323}
324
325#[derive(Clone, Debug)]
326pub(crate) struct ApiListener {
327    pub xds: xds_listener::Listener,
328    pub route_config: ApiListenerData,
329}
330
331#[derive(Clone, Debug)]
332pub(crate) enum ApiListenerData {
333    Rds(ResourceName<RouteConfig>),
334    Inlined(RouteConfigData),
335}
336
337fn http_connection_manager(
338    listener: &xds_listener::Listener,
339) -> Result<xds_http::HttpConnectionManager, ResourceError> {
340    let api_listener = listener
341        .api_listener
342        .as_ref()
343        .and_then(|l| l.api_listener.as_ref())
344        .ok_or_else(|| {
345            ResourceError::for_xds_static(listener.name.clone(), "Listener has no api_listener")
346        })?;
347
348    api_listener.to_msg().map_err(|e| {
349        ResourceError::for_xds(listener.name.clone(), format!("invalid api_listener: {e}"))
350    })
351}
352
353impl ApiListener {
354    pub(crate) fn from_xds(name: &str, xds: xds_listener::Listener) -> Result<Self, ResourceError> {
355        use xds_http::http_connection_manager::RouteSpecifier;
356
357        let conn_manager = http_connection_manager(&xds)?;
358        let data = match &conn_manager.route_specifier {
359            Some(RouteSpecifier::Rds(rds)) => {
360                let name = rds.route_config_name.clone();
361                ApiListenerData::Rds(name.into())
362            }
363            Some(RouteSpecifier::RouteConfig(route_config)) => {
364                let data = RouteConfigData::from_xds(route_config)?;
365                ApiListenerData::Inlined(data)
366            }
367            _ => {
368                return Err(ResourceError::for_xds_static(
369                    name.to_string(),
370                    "api_listener has no routes configured",
371                ))
372            }
373        };
374
375        Ok(Self {
376            xds,
377            route_config: data,
378        })
379    }
380}
381
382#[derive(Clone, Debug)]
383pub(crate) struct RouteConfig {
384    pub xds: xds_route::RouteConfiguration,
385    pub data: RouteConfigData,
386}
387
388#[derive(Clone, Debug)]
389pub(super) enum RouteConfigData {
390    Route {
391        route: Arc<Route>,
392        clusters: Vec<ResourceName<Cluster>>,
393    },
394    LbPolicy {
395        action: Arc<xds_route::RouteAction>,
396        cluster: ResourceName<Cluster>,
397    },
398}
399
400impl RouteConfigData {
401    fn from_xds(xds: &xds_route::RouteConfiguration) -> Result<Self, ResourceError> {
402        match BackendId::from_lb_config_route_name(&xds.name) {
403            // it's a normal route
404            Err(_) => {
405                let clusters = RouteConfig::cluster_names(xds);
406                let route = Arc::new(Route::from_xds(xds)?);
407                Ok(RouteConfigData::Route { route, clusters })
408            }
409            // it's an lb config route
410            Ok(_) => RouteConfig::lb_policy_action(xds).ok_or(ResourceError::for_xds_static(
411                xds.name.clone(),
412                "failed to parse LB config route",
413            )),
414        }
415    }
416}
417
418impl RouteConfig {
419    pub(crate) fn from_xds(xds: xds_route::RouteConfiguration) -> Result<Self, ResourceError> {
420        let data = RouteConfigData::from_xds(&xds)?;
421        Ok(Self { xds, data })
422    }
423
424    fn cluster_names(xds: &xds_route::RouteConfiguration) -> Vec<ResourceName<Cluster>> {
425        let mut clusters = BTreeSet::new();
426        for vhost in &xds.virtual_hosts {
427            for route in &vhost.routes {
428                let Some(xds_route::route::Action::Route(route_action)) = &route.action else {
429                    continue;
430                };
431
432                match &route_action.cluster_specifier {
433                    Some(xds_route::route_action::ClusterSpecifier::Cluster(cluster)) => {
434                        clusters.insert(cluster.clone());
435                    }
436                    Some(xds_route::route_action::ClusterSpecifier::WeightedClusters(
437                        weighted_clusters,
438                    )) => {
439                        for w in &weighted_clusters.clusters {
440                            clusters.insert(w.name.clone());
441                        }
442                    }
443                    _ => continue,
444                }
445            }
446        }
447        clusters.into_iter().map(|n| n.into()).collect()
448    }
449
450    fn lb_policy_action(xds: &xds_route::RouteConfiguration) -> Option<RouteConfigData> {
451        let vhost = match &xds.virtual_hosts.as_slice() {
452            &[vhost] => vhost,
453            _ => return None,
454        };
455
456        let route = match &vhost.routes.as_slice() {
457            &[route] => route,
458            _ => return None,
459        };
460
461        let Some(xds_route::route::Action::Route(action)) = &route.action else {
462            return None;
463        };
464        match &action.cluster_specifier {
465            Some(xds_route::route_action::ClusterSpecifier::Cluster(cluster)) => {
466                Some(RouteConfigData::LbPolicy {
467                    action: Arc::new(action.clone()),
468                    cluster: cluster.clone().into(),
469                })
470            }
471            _ => None,
472        }
473    }
474}
475
476#[derive(Clone, Debug)]
477pub(crate) struct Cluster {
478    pub(crate) xds: xds_cluster::Cluster,
479    pub(crate) backend_lb: Arc<BackendLb>,
480}
481
482impl Cluster {
483    pub(crate) fn dns_name(&self) -> Option<(Hostname, u16)> {
484        let id = &self.backend_lb.config.id;
485        match &id.service {
486            junction_api::Service::Dns(dns) => Some((dns.hostname.clone(), id.port)),
487            _ => None,
488        }
489    }
490
491    pub(crate) fn from_xds(
492        xds: xds_cluster::Cluster,
493        default_action: Option<&xds_route::RouteAction>,
494    ) -> Result<Self, ResourceError> {
495        let backend = Backend::from_xds(&xds, default_action)?;
496        let load_balancer = LoadBalancer::from_config(&backend.lb);
497
498        let backend_lb = Arc::new(BackendLb {
499            config: backend,
500            load_balancer,
501        });
502
503        Ok(Self { xds, backend_lb })
504    }
505}
506
507#[derive(Clone, Debug)]
508pub(crate) struct LoadAssignment {
509    pub xds: xds_endpoint::ClusterLoadAssignment,
510    pub endpoint_group: Arc<EndpointGroup>,
511}
512
513impl LoadAssignment {
514    pub(crate) fn from_xds(
515        xds: xds_endpoint::ClusterLoadAssignment,
516    ) -> Result<Self, ResourceError> {
517        let endpoint_group = Arc::new(EndpointGroup::from_xds(&xds)?);
518        Ok(Self {
519            xds,
520            endpoint_group,
521        })
522    }
523}
524
525impl Locality {
526    pub(crate) fn from_xds(locality: &Option<xds_core::Locality>) -> Self {
527        let Some(locality) = locality.as_ref() else {
528            return Self::Unknown;
529        };
530
531        if locality.region.is_empty() && locality.zone.is_empty() {
532            return Self::Unknown;
533        }
534
535        Self::Known(LocalityInfo {
536            region: locality.region.clone(),
537            zone: locality.zone.clone(),
538        })
539    }
540}
541
542impl EndpointGroup {
543    pub(crate) fn from_xds(
544        cla: &xds_endpoint::ClusterLoadAssignment,
545    ) -> Result<Self, ResourceError> {
546        let mut endpoints = BTreeMap::new();
547        for (locality_idx, locality_endpoints) in cla.endpoints.iter().enumerate() {
548            let locality = Locality::from_xds(&locality_endpoints.locality);
549            let locality_endpoints: Result<Vec<_>, _> = locality_endpoints
550                .lb_endpoints
551                .iter()
552                .enumerate()
553                .map(|(endpoint_idx, e)| {
554                    xds_lb_endpoint_socket_addr(&cla.cluster_name, locality_idx, endpoint_idx, e)
555                })
556                .collect();
557
558            endpoints.insert(locality, locality_endpoints?);
559        }
560
561        Ok(EndpointGroup::new(endpoints))
562    }
563}
564
565fn xds_lb_endpoint_socket_addr(
566    eds_name: &str,
567    locality_idx: usize,
568    endpoint_idx: usize,
569    endpoint: &xds_endpoint::LbEndpoint,
570) -> Result<SocketAddr, ResourceError> {
571    macro_rules! make_error {
572        ($msg:expr) => {
573            ResourceError::for_xds_static(
574                format!(
575                    "{}: endpoints[{}].lb_endpoints[{}]",
576                    eds_name, locality_idx, endpoint_idx
577                ),
578                $msg,
579            )
580        };
581    }
582
583    let endpoint = match &endpoint.host_identifier {
584        Some(xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(ep)) => ep,
585        _ => return Err(make_error!("endpoint is missing endpoint data")),
586    };
587
588    let address = endpoint.address.as_ref().and_then(|a| a.address.as_ref());
589    match address {
590        Some(xds_core::address::Address::SocketAddress(addr)) => {
591            let ip = addr
592                .address
593                .parse()
594                .map_err(|_| make_error!("invalid socket address"))?;
595            let port = match &addr.port_specifier {
596                Some(xds_core::socket_address::PortSpecifier::PortValue(p)) => {
597                    (*p).try_into().map_err(|_| make_error!("invalid port"))?
598                }
599                _ => return Err(make_error!("missing port specifier")),
600            };
601
602            Ok(SocketAddr::new(ip, port))
603        }
604        _ => Err(make_error!("endpoint has no socket address")),
605    }
606}