1use std::str::FromStr;
2use xds_api::pb::envoy::config::{
3 cluster::v3::{self as xds_cluster, cluster::ring_hash_lb_config::HashFunction},
4 core::v3 as xds_core,
5 endpoint::v3 as xds_endpoint,
6 route::v3 as xds_route,
7};
8
9use crate::{
10 backend::{Backend, LbPolicy, RequestHashPolicy, RequestHasher, RingHashParams},
11 error::{Error, ErrorContext},
12 value_or_default,
13 xds::ads_config_source,
14 BackendId, Service,
15};
16
17impl Backend {
18 pub fn from_xds(
19 cluster: &xds_cluster::Cluster,
20 route_action: Option<&xds_route::RouteAction>,
21 ) -> Result<Self, Error> {
22 use xds_cluster::cluster::DiscoveryType;
23
24 let lb = LbPolicy::from_xds(cluster, route_action)?;
25 let id = BackendId::from_str(&cluster.name)?;
26
27 let discovery_type = cluster_discovery_type(cluster);
28
29 match &id.service {
30 Service::Dns(dns) => {
34 if discovery_type != Some(DiscoveryType::LogicalDns) {
35 return Err(Error::new_static("mismatched discovery type"))
36 .with_field("cluster_discovery_type");
37 }
38
39 let addr_matches = logical_dns_address(cluster).is_some_and(|sa| {
40 sa.address == dns.hostname.as_ref()
41 && xds_port(sa.port_specifier.as_ref()) == Some(id.port)
42 });
43 if !addr_matches {
44 return Err(Error::new_static("cluster is not a valid DNS cluster"))
48 .with_fields("load_assignment", "endpoints");
49 }
50 }
51 Service::Kube(_) => {
53 if discovery_type != Some(DiscoveryType::Eds) {
54 return Err(Error::new_static("mismatched discovery type"))
55 .with_field("cluster_discovery_type");
56 }
57
58 let Some(eds_config) = &cluster.eds_cluster_config else {
59 return Err(Error::new_static("missing EDS config"))
60 .with_field("eds_cluster_conig");
61 };
62
63 if eds_config.service_name != id.name() {
64 return Err(Error::new_static("cluster_name must match EDS name"))
65 .with_fields("eds_cluster_config", "service_name");
66 }
67
68 if eds_config.eds_config != Some(ads_config_source()) {
69 return Err(Error::new_static("EDS cluster is not configured for ADS"))
70 .with_fields("eds_cluster_config", "eds_config");
71 }
72 }
73 }
74
75 Ok(Backend { id, lb })
76 }
77
78 pub fn to_xds(&self) -> xds_cluster::Cluster {
79 use xds_cluster::cluster::ClusterDiscoveryType;
80 use xds_cluster::cluster::DiscoveryType;
81 use xds_cluster::cluster::EdsClusterConfig;
82
83 let (lb_policy, lb_config) = match self.lb.to_xds() {
84 Some((policy, config)) => (policy, Some(config)),
85 None => (xds_cluster::cluster::LbPolicy::default(), None),
86 };
87
88 let (cluster_discovery_type, eds_cluster_config, load_assignment) = match &self.id.service {
89 Service::Dns(dns) => {
90 let dtype = ClusterDiscoveryType::Type(DiscoveryType::LogicalDns.into());
91 let host_identifier = Some(xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(
92 xds_endpoint::Endpoint {
93 address: Some(to_xds_address(&dns.hostname, self.id.port)),
94 ..Default::default()
95 },
96 ));
97 let endpoints = vec![xds_endpoint::LocalityLbEndpoints {
98 lb_endpoints: vec![xds_endpoint::LbEndpoint {
99 host_identifier,
100 ..Default::default()
101 }],
102 ..Default::default()
103 }];
104 let load_assignment = Some(xds_endpoint::ClusterLoadAssignment {
105 endpoints,
106 ..Default::default()
107 });
108 (dtype, None, load_assignment)
109 }
110 Service::Kube(_) => {
111 let cluster_discovery_type = ClusterDiscoveryType::Type(DiscoveryType::Eds.into());
112 let eds_cluster_config = Some(EdsClusterConfig {
113 eds_config: Some(crate::xds::ads_config_source()),
114 service_name: self.id.name(),
115 });
116 (cluster_discovery_type, eds_cluster_config, None)
117 }
118 };
119
120 let cluster_discovery_type = Some(cluster_discovery_type);
121 xds_cluster::Cluster {
122 name: self.id.name(),
123 lb_policy: lb_policy.into(),
124 lb_config,
125 cluster_discovery_type,
126 load_assignment,
127 eds_cluster_config,
128 ..Default::default()
129 }
130 }
131
132 #[doc(hidden)]
137 pub fn to_xds_lb_route_config(&self) -> xds_route::RouteConfiguration {
138 use xds_route::route::Action;
139 use xds_route::route_action::ClusterSpecifier;
140 use xds_route::route_match::PathSpecifier;
141
142 let default_action = Action::Route(xds_route::RouteAction {
143 cluster_specifier: Some(ClusterSpecifier::Cluster(self.id.name())),
144 hash_policy: self.to_xds_hash_policies(),
145 ..Default::default()
146 });
147
148 let default_route = xds_route::Route {
149 r#match: Some(xds_route::RouteMatch {
150 path_specifier: Some(PathSpecifier::Prefix("".to_string())),
151 ..Default::default()
152 }),
153 action: Some(default_action),
154 ..Default::default()
155 };
156
157 let vhost = xds_route::VirtualHost {
158 domains: vec!["*".to_string()],
159 routes: vec![default_route],
160 ..Default::default()
161 };
162
163 xds_route::RouteConfiguration {
164 name: self.id.lb_config_route_name(),
165 virtual_hosts: vec![vhost],
166 ..Default::default()
167 }
168 }
169
170 fn to_xds_hash_policies(&self) -> Vec<xds_route::route_action::HashPolicy> {
171 match &self.lb {
172 LbPolicy::RingHash(ring_hash) => {
173 ring_hash.hash_params.iter().map(|p| p.to_xds()).collect()
174 }
175 _ => Vec::new(),
176 }
177 }
178}
179
180fn to_xds_address(hostname: &crate::Hostname, port: u16) -> xds_core::Address {
181 let socket_address = xds_core::SocketAddress {
182 address: hostname.to_string(),
183 port_specifier: Some(xds_core::socket_address::PortSpecifier::PortValue(
184 port as u32,
185 )),
186 ..Default::default()
187 };
188
189 xds_core::Address {
190 address: Some(xds_core::address::Address::SocketAddress(socket_address)),
191 }
192}
193
194fn cluster_discovery_type(
195 cluster: &xds_cluster::Cluster,
196) -> Option<xds_cluster::cluster::DiscoveryType> {
197 match cluster.cluster_discovery_type {
198 Some(xds_cluster::cluster::ClusterDiscoveryType::Type(cdt)) => {
199 xds_cluster::cluster::DiscoveryType::try_from(cdt).ok()
200 }
201 _ => None,
202 }
203}
204
205fn logical_dns_address(cluster: &xds_cluster::Cluster) -> Option<&xds_core::SocketAddress> {
206 let cla = cluster.load_assignment.as_ref()?;
207 let endpoint = cla.endpoints.first()?;
208 let lb_endpoint = endpoint.lb_endpoints.first()?;
209
210 let endpoint_addr = match lb_endpoint.host_identifier.as_ref()? {
211 xds_endpoint::lb_endpoint::HostIdentifier::Endpoint(endpoint) => {
212 endpoint.address.as_ref()?
213 }
214 _ => return None,
215 };
216
217 match endpoint_addr.address.as_ref()? {
218 xds_core::address::Address::SocketAddress(socket_address) => Some(socket_address),
219 _ => None,
220 }
221}
222
223#[inline]
224fn xds_port(port_specifier: Option<&xds_core::socket_address::PortSpecifier>) -> Option<u16> {
225 match port_specifier {
226 Some(xds_core::socket_address::PortSpecifier::PortValue(v)) => (*v).try_into().ok(),
227 _ => None,
228 }
229}
230
231impl LbPolicy {
232 pub(crate) fn from_xds(
233 cluster: &xds_cluster::Cluster,
234 route_action: Option<&xds_route::RouteAction>,
235 ) -> Result<Self, Error> {
236 match cluster.lb_policy() {
237 xds_cluster::cluster::LbPolicy::RoundRobin => match cluster.lb_config.as_ref() {
241 Some(xds_cluster::cluster::LbConfig::RoundRobinLbConfig(_)) => {
242 Ok(LbPolicy::RoundRobin)
243 }
244 None => Ok(LbPolicy::Unspecified),
245 _ => Err(
246 Error::new_static("RoundRobin lb_policy has a mismatched lb_config")
247 .with_field("lb_config"),
248 ),
249 },
250 xds_cluster::cluster::LbPolicy::RingHash => {
253 let lb_config = match cluster.lb_config.as_ref() {
254 Some(xds_cluster::cluster::LbConfig::RingHashLbConfig(config)) => config,
255 None => &xds_cluster::cluster::RingHashLbConfig::default(),
256 _ => {
257 return Err(Error::new_static(
258 "RingHash lb_policy has a mismatched lb_config",
259 )
260 .with_field("lb_config"))
261 }
262 };
263
264 if lb_config.hash_function() != HashFunction::XxHash {
266 return Err(Error::new(format!(
267 "unsupported hash function: {:?}",
268 lb_config.hash_function(),
269 )))
270 .with_fields("lb_config", "hash_function");
271 }
272
273 let min_ring_size = value_or_default!(
274 lb_config.minimum_ring_size,
275 crate::backend::default_min_ring_size() as u64
276 );
277 let min_ring_size = min_ring_size
278 .try_into()
279 .map_err(|_| Error::new_static("int overflow"))
280 .with_fields("lb_config", ",minimum_ring_size")?;
281
282 let hash_params = route_action
283 .map(hash_policies)
284 .transpose()
285 .with_field("route_action")?;
286
287 Ok(LbPolicy::RingHash(RingHashParams {
288 min_ring_size,
289 hash_params: hash_params.unwrap_or_default(),
290 }))
291 }
292 _ => Err(Error::new_static("unsupported lb policy")).with_field("lb_policy"),
293 }
294 }
295
296 pub(crate) fn to_xds(
297 &self,
298 ) -> Option<(
299 xds_cluster::cluster::LbPolicy,
300 xds_cluster::cluster::LbConfig,
301 )> {
302 match self {
303 LbPolicy::RoundRobin => Some((
304 xds_cluster::cluster::LbPolicy::RoundRobin,
305 xds_cluster::cluster::LbConfig::RoundRobinLbConfig(Default::default()),
306 )),
307 LbPolicy::RingHash(params) => Some((
308 xds_cluster::cluster::LbPolicy::RingHash,
309 xds_cluster::cluster::LbConfig::RingHashLbConfig(
310 xds_cluster::cluster::RingHashLbConfig {
311 minimum_ring_size: Some((params.min_ring_size as u64).into()),
312 hash_function:
313 xds_cluster::cluster::ring_hash_lb_config::HashFunction::XxHash as i32,
314 maximum_ring_size: None,
315 },
316 ),
317 )),
318 LbPolicy::Unspecified => None,
320 }
321 }
322}
323
324#[inline]
325fn hash_policies(action: &xds_route::RouteAction) -> Result<Vec<RequestHashPolicy>, Error> {
326 let res: Result<Vec<_>, Error> = action
327 .hash_policy
328 .iter()
329 .enumerate()
330 .map(|(i, policy)| RequestHashPolicy::from_xds(policy).with_index(i))
331 .collect();
332
333 res.with_field("hash_policy")
334}
335
336impl RequestHashPolicy {
337 pub(crate) fn to_xds(&self) -> xds_route::route_action::HashPolicy {
338 use xds_route::route_action::hash_policy::{Header, PolicySpecifier, QueryParameter};
339
340 let policy_specifier = match &self.hasher {
341 RequestHasher::Header { name } => PolicySpecifier::Header(Header {
342 header_name: name.clone(),
343 regex_rewrite: None,
344 }),
345 RequestHasher::QueryParam { name } => {
346 PolicySpecifier::QueryParameter(QueryParameter { name: name.clone() })
347 }
348 };
349
350 xds_route::route_action::HashPolicy {
351 terminal: self.terminal,
352 policy_specifier: Some(policy_specifier),
353 }
354 }
355
356 pub(crate) fn from_xds(xds: &xds_route::route_action::HashPolicy) -> Result<Self, Error> {
357 use xds_route::route_action::hash_policy::PolicySpecifier;
358
359 match &xds.policy_specifier {
360 Some(PolicySpecifier::Header(header)) => Ok(Self {
361 terminal: xds.terminal,
362 hasher: RequestHasher::Header {
363 name: header.header_name.clone(),
364 },
365 }),
366 Some(PolicySpecifier::QueryParameter(query)) => Ok(Self {
367 terminal: xds.terminal,
368 hasher: RequestHasher::QueryParam {
369 name: query.name.clone(),
370 },
371 }),
372 Some(_) => {
373 Err(Error::new_static("unsupported hash policy").with_field("policy_specifier"))
374 }
375 None => Err(Error::new_static("no policy specified").with_field("policy_specifier")),
376 }
377 }
378}
379
380#[cfg(test)]
381mod test {
382 use super::*;
383
384 #[test]
385 fn test_unspecified_lb_roundtrips() {
386 let web = Service::kube("prod", "web").unwrap();
387
388 let backend = Backend {
389 id: web.as_backend_id(8891),
390 lb: LbPolicy::Unspecified,
391 };
392 assert_eq!(backend, Backend::from_xds(&backend.to_xds(), None).unwrap(),);
393 assert_eq!(backend.to_xds_hash_policies(), vec![]);
394 }
395
396 #[test]
397 fn test_round_robin_lb_roundtrips() {
398 let web = Service::kube("prod", "web").unwrap();
399
400 let backend = Backend {
401 id: web.as_backend_id(7891),
402 lb: LbPolicy::RoundRobin,
403 };
404 assert_eq!(backend, Backend::from_xds(&backend.to_xds(), None).unwrap(),);
405 assert_eq!(backend.to_xds_hash_policies(), vec![]);
406 }
407
408 #[test]
409 fn test_ringhash_roundtrip() {
410 let web = Service::kube("prod", "web").unwrap();
411
412 let backend = Backend {
413 id: web.as_backend_id(6666),
414 lb: LbPolicy::RingHash(RingHashParams {
415 min_ring_size: 1024,
416 hash_params: vec![
417 RequestHashPolicy {
418 terminal: true,
419 hasher: RequestHasher::QueryParam {
420 name: "q".to_string(),
421 },
422 },
423 RequestHashPolicy {
424 terminal: false,
425 hasher: RequestHasher::Header {
426 name: "x-user".to_string(),
427 },
428 },
429 RequestHashPolicy {
430 terminal: true,
431 hasher: RequestHasher::Header {
432 name: "x-env".to_string(),
433 },
434 },
435 ],
436 }),
437 };
438
439 let cluster = backend.to_xds();
440 let hash_policy = backend.to_xds_hash_policies();
441
442 let parsed = Backend::from_xds(
443 &cluster,
444 Some(&xds_route::RouteAction {
445 hash_policy,
446 ..Default::default()
447 }),
448 )
449 .unwrap();
450 assert_eq!(parsed, backend);
451 }
452
453 #[test]
454 fn test_lb_route_config_roundtrip() {
455 let web = Service::kube("prod", "web").unwrap();
456
457 let backend = Backend {
458 id: web.as_backend_id(12321),
459 lb: LbPolicy::RingHash(RingHashParams {
460 min_ring_size: 1024,
461 hash_params: vec![
462 RequestHashPolicy {
463 terminal: false,
464 hasher: RequestHasher::Header {
465 name: "x-user".to_string(),
466 },
467 },
468 RequestHashPolicy {
469 terminal: false,
470 hasher: RequestHasher::Header {
471 name: "x-env".to_string(),
472 },
473 },
474 ],
475 }),
476 };
477
478 let cluster = backend.to_xds();
479 let lb_config_route = backend.to_xds_lb_route_config();
480
481 let parsed = Backend::from_xds(&cluster, {
482 let vhost = lb_config_route.virtual_hosts.first().unwrap();
483 let route = vhost.routes.first().unwrap();
484 route.action.as_ref().map(|action| match action {
485 xds_route::route::Action::Route(action) => action,
486 _ => panic!("invalid route"),
487 })
488 })
489 .unwrap();
490 assert_eq!(backend, parsed);
491 }
492}