1use std::borrow::Cow;
2use std::collections::BTreeMap;
3use std::net::SocketAddr;
4use std::ops::Deref;
5use std::{collections::BTreeSet, marker::PhantomData, sync::Arc};
6
7use junction_api::backend::Backend;
8use junction_api::backend::BackendId;
9use junction_api::http::Route;
10use junction_api::Hostname;
11use smol_str::SmolStr;
12use xds_api::{
13 pb::envoy::{
14 config::{
15 cluster::v3 as xds_cluster, core::v3 as xds_core, endpoint::v3 as xds_endpoint,
16 listener::v3 as xds_listener, route::v3 as xds_route,
17 },
18 extensions::filters::network::http_connection_manager::v3 as xds_http,
19 service::discovery::v3 as xds_discovery,
20 },
21 WellKnownTypes,
22};
23
24use crate::endpoints::{EndpointGroup, Locality, LocalityInfo};
25use crate::load_balancer::{BackendLb, LoadBalancer};
26
27#[derive(Clone, Debug, thiserror::Error)]
30pub(crate) enum ResourceError {
31 #[error("{0}")]
32 InvalidResource(#[from] junction_api::Error),
33
34 #[error("invalid xDS: {resource_name}: {message}")]
35 InvalidXds {
36 resource_name: String,
37 message: Cow<'static, str>,
38 },
39}
40
41impl ResourceError {
42 fn for_xds(resource_name: String, message: String) -> Self {
43 Self::InvalidXds {
44 resource_name,
45 message: message.into(),
46 }
47 }
48
49 fn for_xds_static(resource_name: String, message: &'static str) -> Self {
50 Self::InvalidXds {
51 resource_name,
52 message: message.into(),
53 }
54 }
55}
56
57#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)]
61pub struct ResourceVersion(SmolStr);
62
63impl Deref for ResourceVersion {
64 type Target = str;
65
66 fn deref(&self) -> &Self::Target {
67 &self.0
68 }
69}
70
71impl serde::Serialize for ResourceVersion {
72 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
73 where
74 S: serde::Serializer,
75 {
76 serializer.serialize_str(&self.0)
77 }
78}
79
80impl AsRef<str> for ResourceVersion {
81 fn as_ref(&self) -> &str {
82 &self.0
83 }
84}
85
86macro_rules! impl_resource_version_from {
87 ($from_ty:ty) => {
88 impl From<$from_ty> for ResourceVersion {
89 fn from(s: $from_ty) -> ResourceVersion {
90 ResourceVersion(s.into())
91 }
92 }
93 };
94}
95
96impl_resource_version_from!(&str);
97impl_resource_version_from!(&mut str);
98impl_resource_version_from!(String);
99impl_resource_version_from!(&String);
100impl_resource_version_from!(Arc<str>);
101impl_resource_version_from!(Box<str>);
102
103#[derive(Debug, Copy, Clone, PartialEq, Eq, enum_map::Enum, Hash, PartialOrd, Ord)]
110pub(crate) enum ResourceType {
111 Cluster,
112 ClusterLoadAssignment,
113 Listener,
114 RouteConfiguration,
115}
116
117impl ResourceType {
118 fn as_well_known(&self) -> WellKnownTypes {
119 match self {
120 ResourceType::Cluster => WellKnownTypes::Cluster,
121 ResourceType::ClusterLoadAssignment => WellKnownTypes::ClusterLoadAssignment,
122 ResourceType::Listener => WellKnownTypes::Listener,
123 ResourceType::RouteConfiguration => WellKnownTypes::RouteConfiguration,
124 }
125 }
126
127 fn from_well_known(wkt: WellKnownTypes) -> Option<Self> {
128 match wkt {
129 WellKnownTypes::Cluster => Some(Self::Cluster),
130 WellKnownTypes::ClusterLoadAssignment => Some(Self::ClusterLoadAssignment),
131 WellKnownTypes::Listener => Some(Self::Listener),
132 WellKnownTypes::RouteConfiguration => Some(Self::RouteConfiguration),
133 _ => None,
134 }
135 }
136
137 pub(crate) const fn supports_wildcard(&self) -> bool {
138 matches!(self, ResourceType::Cluster | ResourceType::Listener)
139 }
140
141 pub(crate) fn all() -> &'static [Self] {
143 &[
144 Self::Cluster,
145 Self::ClusterLoadAssignment,
146 Self::Listener,
147 Self::RouteConfiguration,
148 ]
149 }
150
151 pub(crate) fn type_url(&self) -> &'static str {
152 self.as_well_known().type_url()
153 }
154
155 pub(crate) fn from_type_url(type_url: &str) -> Option<Self> {
156 Self::from_well_known(WellKnownTypes::from_type_url(type_url)?)
157 }
158}
159
160#[derive(Clone, Debug)]
161pub(crate) enum ResourceVec {
162 Listener(VersionedVec<xds_listener::Listener>),
163 RouteConfiguration(VersionedVec<xds_route::RouteConfiguration>),
164 Cluster(VersionedVec<xds_cluster::Cluster>),
165 ClusterLoadAssignment(VersionedVec<xds_endpoint::ClusterLoadAssignment>),
166}
167
168type VersionedVec<T> = Vec<(ResourceVersion, T)>;
169
170impl ResourceVec {
171 pub(crate) fn from_resources(
172 rtype: ResourceType,
173 resources: Vec<xds_discovery::Resource>,
174 ) -> Result<Self, prost::DecodeError> {
175 match rtype {
176 ResourceType::Cluster => from_resource_vec(resources).map(Self::Cluster),
177 ResourceType::ClusterLoadAssignment => {
178 from_resource_vec(resources).map(Self::ClusterLoadAssignment)
179 }
180 ResourceType::Listener => from_resource_vec(resources).map(Self::Listener),
181 ResourceType::RouteConfiguration => {
182 from_resource_vec(resources).map(Self::RouteConfiguration)
183 }
184 }
185 }
186
187 #[cfg(test)]
188 pub(crate) fn resource_type(&self) -> ResourceType {
189 match self {
190 ResourceVec::Listener(_) => ResourceType::Listener,
191 ResourceVec::RouteConfiguration(_) => ResourceType::RouteConfiguration,
192 ResourceVec::Cluster(_) => ResourceType::Cluster,
193 ResourceVec::ClusterLoadAssignment(_) => ResourceType::ClusterLoadAssignment,
194 }
195 }
196
197 #[cfg(test)]
198 pub(crate) fn to_resources(&self) -> Result<Vec<xds_discovery::Resource>, prost::EncodeError> {
199 match self {
200 ResourceVec::Listener(vec) => to_resource_vec(vec),
201 ResourceVec::RouteConfiguration(vec) => to_resource_vec(vec),
202 ResourceVec::Cluster(vec) => to_resource_vec(vec),
203 ResourceVec::ClusterLoadAssignment(vec) => to_resource_vec(vec),
204 }
205 }
206}
207
208macro_rules! test_constructor {
209 ($name:ident, $variant:ident, $xds_type:ty) => {
210 impl ResourceVec {
211 #[cfg(test)]
212 pub(crate) fn $name<I: IntoIterator<Item = $xds_type>>(
213 version: ResourceVersion,
214 xs: I,
215 ) -> Self {
216 let data = xs.into_iter().map(|l| (version.clone(), l)).collect();
217 Self::$variant(data)
218 }
219 }
220 };
221}
222
223test_constructor!(from_listeners, Listener, xds_listener::Listener);
224test_constructor!(
225 from_route_configs,
226 RouteConfiguration,
227 xds_route::RouteConfiguration
228);
229test_constructor!(from_clusters, Cluster, xds_cluster::Cluster);
230test_constructor!(
231 from_load_assignments,
232 ClusterLoadAssignment,
233 xds_endpoint::ClusterLoadAssignment
234);
235
236fn from_resource_vec<M: Default + prost::Name>(
237 resources: Vec<xds_discovery::Resource>,
238) -> Result<VersionedVec<M>, prost::DecodeError> {
239 let mut ms = Vec::with_capacity(resources.len());
240 for r in resources {
241 let Some(any) = r.resource else {
242 continue;
243 };
244 ms.push((ResourceVersion::from(r.version), any.to_msg()?));
245 }
246
247 Ok(ms)
248}
249
250#[cfg(test)]
251fn to_resource_vec<M: prost::Name>(
252 xs: &VersionedVec<M>,
253) -> Result<Vec<xds_discovery::Resource>, prost::EncodeError> {
254 use xds_api::pb::google::protobuf;
255
256 let mut resources = Vec::with_capacity(xs.len());
257
258 for (v, msg) in xs {
259 let as_any = protobuf::Any::from_msg(msg)?;
260 resources.push(xds_discovery::Resource {
261 resource: Some(as_any),
262 version: v.to_string(),
263 ..Default::default()
264 })
265 }
266
267 Ok(resources)
268}
269
270#[derive(Debug)]
275pub(crate) struct ResourceName<T> {
276 _type: PhantomData<T>,
277 name: String,
278}
279
280impl<T> ResourceName<T> {
281 pub fn as_str(&self) -> &str {
282 &self.name
283 }
284}
285
286impl<T> Clone for ResourceName<T> {
287 fn clone(&self) -> Self {
288 Self {
289 _type: PhantomData,
290 name: self.name.clone(),
291 }
292 }
293}
294
295impl<T> From<String> for ResourceName<T> {
296 fn from(name: String) -> Self {
297 Self {
298 _type: PhantomData,
299 name,
300 }
301 }
302}
303
304impl<T> PartialEq for ResourceName<T> {
305 fn eq(&self, other: &Self) -> bool {
306 self.name == other.name
307 }
308}
309
310impl<T> Eq for ResourceName<T> {}
311
312#[allow(clippy::non_canonical_partial_ord_impl)]
313impl<T> PartialOrd for ResourceName<T> {
314 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
315 self.name.partial_cmp(&other.name)
316 }
317}
318
319impl<T> Ord for ResourceName<T> {
320 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
321 self.name.cmp(&other.name)
322 }
323}
324
325#[derive(Clone, Debug)]
326pub(crate) struct ApiListener {
327 pub xds: xds_listener::Listener,
328 pub route_config: ApiListenerData,
329}
330
331#[derive(Clone, Debug)]
332pub(crate) enum ApiListenerData {
333 Rds(ResourceName<RouteConfig>),
334 Inlined(RouteConfigData),
335}
336
337fn http_connection_manager(
338 listener: &xds_listener::Listener,
339) -> Result<xds_http::HttpConnectionManager, ResourceError> {
340 let api_listener = listener
341 .api_listener
342 .as_ref()
343 .and_then(|l| l.api_listener.as_ref())
344 .ok_or_else(|| {
345 ResourceError::for_xds_static(listener.name.clone(), "Listener has no api_listener")
346 })?;
347
348 api_listener.to_msg().map_err(|e| {
349 ResourceError::for_xds(listener.name.clone(), format!("invalid api_listener: {e}"))
350 })
351}
352
353impl ApiListener {
354 pub(crate) fn from_xds(name: &str, xds: xds_listener::Listener) -> Result<Self, ResourceError> {
355 use xds_http::http_connection_manager::RouteSpecifier;
356
357 let conn_manager = http_connection_manager(&xds)?;
358 let data = match &conn_manager.route_specifier {
359 Some(RouteSpecifier::Rds(rds)) => {
360 let name = rds.route_config_name.clone();
361 ApiListenerData::Rds(name.into())
362 }
363 Some(RouteSpecifier::RouteConfig(route_config)) => {
364 let data = RouteConfigData::from_xds(route_config)?;
365 ApiListenerData::Inlined(data)
366 }
367 _ => {
368 return Err(ResourceError::for_xds_static(
369 name.to_string(),
370 "api_listener has no routes configured",
371 ))
372 }
373 };
374
375 Ok(Self {
376 xds,
377 route_config: data,
378 })
379 }
380}
381
382#[derive(Clone, Debug)]
383pub(crate) struct RouteConfig {
384 pub xds: xds_route::RouteConfiguration,
385 pub data: RouteConfigData,
386}
387
388#[derive(Clone, Debug)]
389pub(super) enum RouteConfigData {
390 Route {
391 route: Arc<Route>,
392 clusters: Vec<ResourceName<Cluster>>,
393 },
394 LbPolicy {
395 action: Arc<xds_route::RouteAction>,
396 cluster: ResourceName<Cluster>,
397 },
398}
399
400impl RouteConfigData {
401 fn from_xds(xds: &xds_route::RouteConfiguration) -> Result<Self, ResourceError> {
402 match BackendId::from_lb_config_route_name(&xds.name) {
403 Err(_) => {
405 let clusters = RouteConfig::cluster_names(xds);
406 let route = Arc::new(Route::from_xds(xds)?);
407 Ok(RouteConfigData::Route { route, clusters })
408 }
409 Ok(_) => RouteConfig::lb_policy_action(xds).ok_or(ResourceError::for_xds_static(
411 xds.name.clone(),
412 "failed to parse LB config route",
413 )),
414 }
415 }
416}
417
418impl RouteConfig {
419 pub(crate) fn from_xds(xds: xds_route::RouteConfiguration) -> Result<Self, ResourceError> {
420 let data = RouteConfigData::from_xds(&xds)?;
421 Ok(Self { xds, data })
422 }
423
424 fn cluster_names(xds: &xds_route::RouteConfiguration) -> Vec<ResourceName<Cluster>> {
425 let mut clusters = BTreeSet::new();
426 for vhost in &xds.virtual_hosts {
427 for route in &vhost.routes {
428 let Some(xds_route::route::Action::Route(route_action)) = &route.action else {
429 continue;
430 };
431
432 match &route_action.cluster_specifier {
433 Some(xds_route::route_action::ClusterSpecifier::Cluster(cluster)) => {
434 clusters.insert(cluster.clone());
435 }
436 Some(xds_route::route_action::ClusterSpecifier::WeightedClusters(
437 weighted_clusters,
438 )) => {
439 for w in &weighted_clusters.clusters {
440 clusters.insert(w.name.clone());
441 }
442 }
443 _ => continue,
444 }
445 }
446 }
447 clusters.into_iter().map(|n| n.into()).collect()
448 }
449
450 fn lb_policy_action(xds: &xds_route::RouteConfiguration) -> Option<RouteConfigData> {
451 let vhost = match &xds.virtual_hosts.as_slice() {
452 &[vhost] => vhost,
453 _ => return None,
454 };
455
456 let route = match &vhost.routes.as_slice() {
457 &[route] => route,
458 _ => return None,
459 };
460
461 let Some(xds_route::route::Action::Route(action)) = &route.action else {
462 return None;
463 };
464 match &action.cluster_specifier {
465 Some(xds_route::route_action::ClusterSpecifier::Cluster(cluster)) => {
466 Some(RouteConfigData::LbPolicy {
467 action: Arc::new(action.clone()),
468 cluster: cluster.clone().into(),
469 })
470 }
471 _ => None,
472 }
473 }
474}
475
476#[derive(Clone, Debug)]
477pub(crate) struct Cluster {
478 pub(crate) xds: xds_cluster::Cluster,
479 pub(crate) backend_lb: Arc<BackendLb>,
480}
481
482impl Cluster {
483 pub(crate) fn dns_name(&self) -> Option<(Hostname, u16)> {
484 let id = &self.backend_lb.config.id;
485 match &id.service {
486 junction_api::Service::Dns(dns) => Some((dns.hostname.clone(), id.port)),
487 _ => None,
488 }
489 }
490
491 pub(crate) fn from_xds(
492 xds: xds_cluster::Cluster,
493 default_action: Option<&xds_route::RouteAction>,
494 ) -> Result<Self, ResourceError> {
495 let backend = Backend::from_xds(&xds, default_action)?;
496 let load_balancer = LoadBalancer::from_config(&backend.lb);
497
498 let backend_lb = Arc::new(BackendLb {
499 config: backend,
500 load_balancer,
501 });
502
503 Ok(Self { xds, backend_lb })
504 }
505}
506
507#[derive(Clone, Debug)]
508pub(crate) struct LoadAssignment {
509 pub xds: xds_endpoint::ClusterLoadAssignment,
510 pub endpoint_group: Arc<EndpointGroup>,
511}
512
513impl LoadAssignment {
514 pub(crate) fn from_xds(
515 xds: xds_endpoint::ClusterLoadAssignment,
516 ) -> Result<Self, ResourceError> {
517 let endpoint_group = Arc::new(EndpointGroup::from_xds(&xds)?);
518 Ok(Self {
519 xds,
520 endpoint_group,
521 })
522 }
523}
524
525impl Locality {
526 pub(crate) fn from_xds(locality: &Option<xds_core::Locality>) -> Self {
527 let Some(locality) = locality.as_ref() else {
528 return Self::Unknown;
529 };
530
531 if locality.region.is_empty() && locality.zone.is_empty() {
532 return Self::Unknown;
533 }
534
535 Self::Known(LocalityInfo {
536 region: locality.region.clone(),
537 zone: locality.zone.clone(),
538 })
539 }
540}
541
542impl EndpointGroup {
543 pub(crate) fn from_xds(
544 cla: &xds_endpoint::ClusterLoadAssignment,
545 ) -> Result<Self, ResourceError> {
546 let mut endpoints = BTreeMap::new();
547 for (locality_idx, locality_endpoints) in cla.endpoints.iter().enumerate() {
548 let locality = Locality::from_xds(&locality_endpoints.locality);
549 let locality_endpoints: Result<Vec<_>, _> = locality_endpoints
550 .lb_endpoints
551 .iter()
552 .enumerate()
553 .map(|(endpoint_idx, e)| {
554 xds_lb_endpoint_socket_addr(&cla.cluster_name, locality_idx, endpoint_idx, e)
555 })
556 .collect();
557
558 endpoints.insert(locality, locality_endpoints?);
559 }
560
561 Ok(EndpointGroup::new(endpoints))
562 }
563}
564
565fn xds_lb_endpoint_socket_addr(
566 eds_name: &str,
567 locality_idx: usize,
568 endpoint_idx: usize,
569 endpoint: &xds_endpoint::LbEndpoint,
570) -> Result<SocketAddr, ResourceError> {
571 macro_rules! make_error {
572 ($msg:expr) => {
573 ResourceError::for_xds_static(
574 format!(
575 "{}: endpoints[{}].lb_endpoints[{}]",
576 eds_name, locality_idx, endpoint_idx
577 ),
578 $msg,
579 )
580 };
581 }
582
583 let endpoint = match &endpoint.host_identifier {
584 Some(xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(ep)) => ep,
585 _ => return Err(make_error!("endpoint is missing endpoint data")),
586 };
587
588 let address = endpoint.address.as_ref().and_then(|a| a.address.as_ref());
589 match address {
590 Some(xds_core::address::Address::SocketAddress(addr)) => {
591 let ip = addr
592 .address
593 .parse()
594 .map_err(|_| make_error!("invalid socket address"))?;
595 let port = match &addr.port_specifier {
596 Some(xds_core::socket_address::PortSpecifier::PortValue(p)) => {
597 (*p).try_into().map_err(|_| make_error!("invalid port"))?
598 }
599 _ => return Err(make_error!("missing port specifier")),
600 };
601
602 Ok(SocketAddr::new(ip, port))
603 }
604 _ => Err(make_error!("endpoint has no socket address")),
605 }
606}