junction_api/xds/
backend.rs

1use std::str::FromStr;
2use xds_api::pb::envoy::config::{
3    cluster::v3::{self as xds_cluster, cluster::ring_hash_lb_config::HashFunction},
4    core::v3 as xds_core,
5    endpoint::v3 as xds_endpoint,
6    route::v3 as xds_route,
7};
8
9use crate::{
10    backend::{Backend, LbPolicy, RequestHashPolicy, RequestHasher, RingHashParams},
11    error::{Error, ErrorContext},
12    value_or_default,
13    xds::ads_config_source,
14    BackendId, Service,
15};
16
17impl Backend {
18    pub fn from_xds(
19        cluster: &xds_cluster::Cluster,
20        route_action: Option<&xds_route::RouteAction>,
21    ) -> Result<Self, Error> {
22        use xds_cluster::cluster::DiscoveryType;
23
24        let lb = LbPolicy::from_xds(cluster, route_action)?;
25        let id = BackendId::from_str(&cluster.name)?;
26
27        let discovery_type = cluster_discovery_type(cluster);
28
29        match &id.service {
30            // if this is supposed to be a DNS cluster, validate that the xDS
31            // actually says it's a DNS cluster and that the discovery data
32            // matches the name.
33            Service::Dns(dns) => {
34                if discovery_type != Some(DiscoveryType::LogicalDns) {
35                    return Err(Error::new_static("mismatched discovery type"))
36                        .with_field("cluster_discovery_type");
37                }
38
39                let addr_matches = logical_dns_address(cluster).is_some_and(|sa| {
40                    sa.address == dns.hostname.as_ref()
41                        && xds_port(sa.port_specifier.as_ref()) == Some(id.port)
42                });
43                if !addr_matches {
44                    // NOTE: this error doesn't point at the actual proximate
45                    // field because nobody has time for that. if this starts
46                    // happening often, refine the error message here.
47                    return Err(Error::new_static("cluster is not a valid DNS cluster"))
48                        .with_fields("load_assignment", "endpoints");
49                }
50            }
51            // kube clusters should use EDS over ADS, with a cluster_name matching cluster.name
52            Service::Kube(_) => {
53                if discovery_type != Some(DiscoveryType::Eds) {
54                    return Err(Error::new_static("mismatched discovery type"))
55                        .with_field("cluster_discovery_type");
56                }
57
58                let Some(eds_config) = &cluster.eds_cluster_config else {
59                    return Err(Error::new_static("missing EDS config"))
60                        .with_field("eds_cluster_conig");
61                };
62
63                if eds_config.service_name != id.name() {
64                    return Err(Error::new_static("cluster_name must match EDS name"))
65                        .with_fields("eds_cluster_config", "service_name");
66                }
67
68                if eds_config.eds_config != Some(ads_config_source()) {
69                    return Err(Error::new_static("EDS cluster is not configured for ADS"))
70                        .with_fields("eds_cluster_config", "eds_config");
71                }
72            }
73        }
74
75        Ok(Backend { id, lb })
76    }
77
78    pub fn to_xds(&self) -> xds_cluster::Cluster {
79        use xds_cluster::cluster::ClusterDiscoveryType;
80        use xds_cluster::cluster::DiscoveryType;
81        use xds_cluster::cluster::EdsClusterConfig;
82
83        let (lb_policy, lb_config) = match self.lb.to_xds() {
84            Some((policy, config)) => (policy, Some(config)),
85            None => (xds_cluster::cluster::LbPolicy::default(), None),
86        };
87
88        let (cluster_discovery_type, eds_cluster_config, load_assignment) = match &self.id.service {
89            Service::Dns(dns) => {
90                let dtype = ClusterDiscoveryType::Type(DiscoveryType::LogicalDns.into());
91                let host_identifier = Some(xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(
92                    xds_endpoint::Endpoint {
93                        address: Some(to_xds_address(&dns.hostname, self.id.port)),
94                        ..Default::default()
95                    },
96                ));
97                let endpoints = vec![xds_endpoint::LocalityLbEndpoints {
98                    lb_endpoints: vec![xds_endpoint::LbEndpoint {
99                        host_identifier,
100                        ..Default::default()
101                    }],
102                    ..Default::default()
103                }];
104                let load_assignment = Some(xds_endpoint::ClusterLoadAssignment {
105                    endpoints,
106                    ..Default::default()
107                });
108                (dtype, None, load_assignment)
109            }
110            Service::Kube(_) => {
111                let cluster_discovery_type = ClusterDiscoveryType::Type(DiscoveryType::Eds.into());
112                let eds_cluster_config = Some(EdsClusterConfig {
113                    eds_config: Some(crate::xds::ads_config_source()),
114                    service_name: self.id.name(),
115                });
116                (cluster_discovery_type, eds_cluster_config, None)
117            }
118        };
119
120        let cluster_discovery_type = Some(cluster_discovery_type);
121        xds_cluster::Cluster {
122            name: self.id.name(),
123            lb_policy: lb_policy.into(),
124            lb_config,
125            cluster_discovery_type,
126            load_assignment,
127            eds_cluster_config,
128            ..Default::default()
129        }
130    }
131
132    /// Generate a RouteConfiguration that routes to this Cluster using its
133    /// hash_policy as RouteAction. This exists so we can guarantee that there's
134    /// at least one unique RouteConfiguration pointing at every cluster so the
135    /// client can deduce its hash policies that way.
136    #[doc(hidden)]
137    pub fn to_xds_lb_route_config(&self) -> xds_route::RouteConfiguration {
138        use xds_route::route::Action;
139        use xds_route::route_action::ClusterSpecifier;
140        use xds_route::route_match::PathSpecifier;
141
142        let default_action = Action::Route(xds_route::RouteAction {
143            cluster_specifier: Some(ClusterSpecifier::Cluster(self.id.name())),
144            hash_policy: self.to_xds_hash_policies(),
145            ..Default::default()
146        });
147
148        let default_route = xds_route::Route {
149            r#match: Some(xds_route::RouteMatch {
150                path_specifier: Some(PathSpecifier::Prefix("".to_string())),
151                ..Default::default()
152            }),
153            action: Some(default_action),
154            ..Default::default()
155        };
156
157        let vhost = xds_route::VirtualHost {
158            domains: vec!["*".to_string()],
159            routes: vec![default_route],
160            ..Default::default()
161        };
162
163        xds_route::RouteConfiguration {
164            name: self.id.lb_config_route_name(),
165            virtual_hosts: vec![vhost],
166            ..Default::default()
167        }
168    }
169
170    fn to_xds_hash_policies(&self) -> Vec<xds_route::route_action::HashPolicy> {
171        match &self.lb {
172            LbPolicy::RingHash(ring_hash) => {
173                ring_hash.hash_params.iter().map(|p| p.to_xds()).collect()
174            }
175            _ => Vec::new(),
176        }
177    }
178}
179
180fn to_xds_address(hostname: &crate::Hostname, port: u16) -> xds_core::Address {
181    let socket_address = xds_core::SocketAddress {
182        address: hostname.to_string(),
183        port_specifier: Some(xds_core::socket_address::PortSpecifier::PortValue(
184            port as u32,
185        )),
186        ..Default::default()
187    };
188
189    xds_core::Address {
190        address: Some(xds_core::address::Address::SocketAddress(socket_address)),
191    }
192}
193
194fn cluster_discovery_type(
195    cluster: &xds_cluster::Cluster,
196) -> Option<xds_cluster::cluster::DiscoveryType> {
197    match cluster.cluster_discovery_type {
198        Some(xds_cluster::cluster::ClusterDiscoveryType::Type(cdt)) => {
199            xds_cluster::cluster::DiscoveryType::try_from(cdt).ok()
200        }
201        _ => None,
202    }
203}
204
205fn logical_dns_address(cluster: &xds_cluster::Cluster) -> Option<&xds_core::SocketAddress> {
206    let cla = cluster.load_assignment.as_ref()?;
207    let endpoint = cla.endpoints.first()?;
208    let lb_endpoint = endpoint.lb_endpoints.first()?;
209
210    let endpoint_addr = match lb_endpoint.host_identifier.as_ref()? {
211        xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(endpoint) => {
212            endpoint.address.as_ref()?
213        }
214        _ => return None,
215    };
216
217    match endpoint_addr.address.as_ref()? {
218        xds_core::address::Address::SocketAddress(socket_address) => Some(socket_address),
219        _ => None,
220    }
221}
222
223#[inline]
224fn xds_port(port_specifier: Option<&xds_core::socket_address::PortSpecifier>) -> Option<u16> {
225    match port_specifier {
226        Some(xds_core::socket_address::PortSpecifier::PortValue(v)) => (*v).try_into().ok(),
227        _ => None,
228    }
229}
230
231impl LbPolicy {
232    pub(crate) fn from_xds(
233        cluster: &xds_cluster::Cluster,
234        route_action: Option<&xds_route::RouteAction>,
235    ) -> Result<Self, Error> {
236        match cluster.lb_policy() {
237            // for ROUND_ROBIN, ignore the slow_start_config entirely and return a brand new
238            // RoundRobin policy each time. validate that the config matches the enum field even
239            // though it's ignored.
240            xds_cluster::cluster::LbPolicy::RoundRobin => match cluster.lb_config.as_ref() {
241                Some(xds_cluster::cluster::LbConfig::RoundRobinLbConfig(_)) => {
242                    Ok(LbPolicy::RoundRobin)
243                }
244                None => Ok(LbPolicy::Unspecified),
245                _ => Err(
246                    Error::new_static("RoundRobin lb_policy has a mismatched lb_config")
247                        .with_field("lb_config"),
248                ),
249            },
250            // for RING_HASH pull the config out if set or use default values to populate our
251            // config.
252            xds_cluster::cluster::LbPolicy::RingHash => {
253                let lb_config = match cluster.lb_config.as_ref() {
254                    Some(xds_cluster::cluster::LbConfig::RingHashLbConfig(config)) => config,
255                    None => &xds_cluster::cluster::RingHashLbConfig::default(),
256                    _ => {
257                        return Err(Error::new_static(
258                            "RingHash lb_policy has a mismatched lb_config",
259                        )
260                        .with_field("lb_config"))
261                    }
262                };
263
264                // hash function must be XX_HASH to match gRPC
265                if lb_config.hash_function() != HashFunction::XxHash {
266                    return Err(Error::new(format!(
267                        "unsupported hash function: {:?}",
268                        lb_config.hash_function(),
269                    )))
270                    .with_fields("lb_config", "hash_function");
271                }
272
273                let min_ring_size = value_or_default!(
274                    lb_config.minimum_ring_size,
275                    crate::backend::default_min_ring_size() as u64
276                );
277                let min_ring_size = min_ring_size
278                    .try_into()
279                    .map_err(|_| Error::new_static("int overflow"))
280                    .with_fields("lb_config", ",minimum_ring_size")?;
281
282                let hash_params = route_action
283                    .map(hash_policies)
284                    .transpose()
285                    .with_field("route_action")?;
286
287                Ok(LbPolicy::RingHash(RingHashParams {
288                    min_ring_size,
289                    hash_params: hash_params.unwrap_or_default(),
290                }))
291            }
292            _ => Err(Error::new_static("unsupported lb policy")).with_field("lb_policy"),
293        }
294    }
295
296    pub(crate) fn to_xds(
297        &self,
298    ) -> Option<(
299        xds_cluster::cluster::LbPolicy,
300        xds_cluster::cluster::LbConfig,
301    )> {
302        match self {
303            LbPolicy::RoundRobin => Some((
304                xds_cluster::cluster::LbPolicy::RoundRobin,
305                xds_cluster::cluster::LbConfig::RoundRobinLbConfig(Default::default()),
306            )),
307            LbPolicy::RingHash(params) => Some((
308                xds_cluster::cluster::LbPolicy::RingHash,
309                xds_cluster::cluster::LbConfig::RingHashLbConfig(
310                    xds_cluster::cluster::RingHashLbConfig {
311                        minimum_ring_size: Some((params.min_ring_size as u64).into()),
312                        hash_function:
313                            xds_cluster::cluster::ring_hash_lb_config::HashFunction::XxHash as i32,
314                        maximum_ring_size: None,
315                    },
316                ),
317            )),
318            // an unspecified LB policy just sets LbConfig to none
319            LbPolicy::Unspecified => None,
320        }
321    }
322}
323
324#[inline]
325fn hash_policies(action: &xds_route::RouteAction) -> Result<Vec<RequestHashPolicy>, Error> {
326    let res: Result<Vec<_>, Error> = action
327        .hash_policy
328        .iter()
329        .enumerate()
330        .map(|(i, policy)| RequestHashPolicy::from_xds(policy).with_index(i))
331        .collect();
332
333    res.with_field("hash_policy")
334}
335
336impl RequestHashPolicy {
337    pub(crate) fn to_xds(&self) -> xds_route::route_action::HashPolicy {
338        use xds_route::route_action::hash_policy::{Header, PolicySpecifier, QueryParameter};
339
340        let policy_specifier = match &self.hasher {
341            RequestHasher::Header { name } => PolicySpecifier::Header(Header {
342                header_name: name.clone(),
343                regex_rewrite: None,
344            }),
345            RequestHasher::QueryParam { name } => {
346                PolicySpecifier::QueryParameter(QueryParameter { name: name.clone() })
347            }
348        };
349
350        xds_route::route_action::HashPolicy {
351            terminal: self.terminal,
352            policy_specifier: Some(policy_specifier),
353        }
354    }
355
356    pub(crate) fn from_xds(xds: &xds_route::route_action::HashPolicy) -> Result<Self, Error> {
357        use xds_route::route_action::hash_policy::PolicySpecifier;
358
359        match &xds.policy_specifier {
360            Some(PolicySpecifier::Header(header)) => Ok(Self {
361                terminal: xds.terminal,
362                hasher: RequestHasher::Header {
363                    name: header.header_name.clone(),
364                },
365            }),
366            Some(PolicySpecifier::QueryParameter(query)) => Ok(Self {
367                terminal: xds.terminal,
368                hasher: RequestHasher::QueryParam {
369                    name: query.name.clone(),
370                },
371            }),
372            Some(_) => {
373                Err(Error::new_static("unsupported hash policy").with_field("policy_specifier"))
374            }
375            None => Err(Error::new_static("no policy specified").with_field("policy_specifier")),
376        }
377    }
378}
379
380#[cfg(test)]
381mod test {
382    use super::*;
383
384    #[test]
385    fn test_unspecified_lb_roundtrips() {
386        let web = Service::kube("prod", "web").unwrap();
387
388        let backend = Backend {
389            id: web.as_backend_id(8891),
390            lb: LbPolicy::Unspecified,
391        };
392        assert_eq!(backend, Backend::from_xds(&backend.to_xds(), None).unwrap(),);
393        assert_eq!(backend.to_xds_hash_policies(), vec![]);
394    }
395
396    #[test]
397    fn test_round_robin_lb_roundtrips() {
398        let web = Service::kube("prod", "web").unwrap();
399
400        let backend = Backend {
401            id: web.as_backend_id(7891),
402            lb: LbPolicy::RoundRobin,
403        };
404        assert_eq!(backend, Backend::from_xds(&backend.to_xds(), None).unwrap(),);
405        assert_eq!(backend.to_xds_hash_policies(), vec![]);
406    }
407
408    #[test]
409    fn test_ringhash_roundtrip() {
410        let web = Service::kube("prod", "web").unwrap();
411
412        let backend = Backend {
413            id: web.as_backend_id(6666),
414            lb: LbPolicy::RingHash(RingHashParams {
415                min_ring_size: 1024,
416                hash_params: vec![
417                    RequestHashPolicy {
418                        terminal: true,
419                        hasher: RequestHasher::QueryParam {
420                            name: "q".to_string(),
421                        },
422                    },
423                    RequestHashPolicy {
424                        terminal: false,
425                        hasher: RequestHasher::Header {
426                            name: "x-user".to_string(),
427                        },
428                    },
429                    RequestHashPolicy {
430                        terminal: true,
431                        hasher: RequestHasher::Header {
432                            name: "x-env".to_string(),
433                        },
434                    },
435                ],
436            }),
437        };
438
439        let cluster = backend.to_xds();
440        let hash_policy = backend.to_xds_hash_policies();
441
442        let parsed = Backend::from_xds(
443            &cluster,
444            Some(&xds_route::RouteAction {
445                hash_policy,
446                ..Default::default()
447            }),
448        )
449        .unwrap();
450        assert_eq!(parsed, backend);
451    }
452
453    #[test]
454    fn test_lb_route_config_roundtrip() {
455        let web = Service::kube("prod", "web").unwrap();
456
457        let backend = Backend {
458            id: web.as_backend_id(12321),
459            lb: LbPolicy::RingHash(RingHashParams {
460                min_ring_size: 1024,
461                hash_params: vec![
462                    RequestHashPolicy {
463                        terminal: false,
464                        hasher: RequestHasher::Header {
465                            name: "x-user".to_string(),
466                        },
467                    },
468                    RequestHashPolicy {
469                        terminal: false,
470                        hasher: RequestHasher::Header {
471                            name: "x-env".to_string(),
472                        },
473                    },
474                ],
475            }),
476        };
477
478        let cluster = backend.to_xds();
479        let lb_config_route = backend.to_xds_lb_route_config();
480
481        let parsed = Backend::from_xds(&cluster, {
482            let vhost = lb_config_route.virtual_hosts.first().unwrap();
483            let route = vhost.routes.first().unwrap();
484            route.action.as_ref().map(|action| match action {
485                xds_route::route::Action::Route(action) => action,
486                _ => panic!("invalid route"),
487            })
488        })
489        .unwrap();
490        assert_eq!(backend, parsed);
491    }
492}