1use bytes::Bytes;
33use cache::{Cache, CacheReader};
34use enum_map::EnumMap;
35use futures::{FutureExt, TryStreamExt};
36use junction_api::{backend::BackendId, http::Route, Hostname, Service};
37use std::{
38 borrow::Cow, collections::BTreeSet, future::Future, io::ErrorKind, sync::Arc, time::Duration,
39};
40use tokio::sync::mpsc::{self, Receiver};
41use tokio_stream::wrappers::ReceiverStream;
42use tonic::{transport::Endpoint, Streaming};
43use tracing::debug;
44use xds_api::pb::{
45 envoy::{
46 config::core::v3 as xds_core,
47 service::discovery::v3::{
48 aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
49 DeltaDiscoveryRequest, DeltaDiscoveryResponse,
50 },
51 },
52 google::{protobuf, rpc::Status as GrpcStatus},
53};
54
55mod cache;
56
57mod resources;
58pub use resources::ResourceVersion;
59pub(crate) use resources::{ResourceType, ResourceVec};
60
61use crate::{dns::StdlibResolver, BackendLb, ConfigCache};
62
63mod csds;
64
65#[cfg(test)]
66mod test;
67
68#[derive(Debug, Default, Clone)]
71pub struct XdsConfig {
72 pub name: String,
73 pub type_url: String,
74 pub version: Option<ResourceVersion>,
75 pub xds: Option<protobuf::Any>,
76 pub last_error: Option<(ResourceVersion, String)>,
77}
78
79#[derive(Debug)]
80enum SubscriptionUpdate {
81 AddHosts(Vec<String>),
82 AddBackends(Vec<BackendId>),
83 AddEndpoints(Vec<BackendId>),
84
85 #[allow(unused)]
86 RemoveHosts(Vec<String>),
87 #[allow(unused)]
88 RemoveBackends(Vec<BackendId>),
89 #[allow(unused)]
90 RemoveEndpoints(Vec<BackendId>),
91}
92
93#[derive(Clone)]
104pub(super) struct AdsClient {
105 subs: mpsc::Sender<SubscriptionUpdate>,
106 cache: CacheReader,
107 dns: StdlibResolver,
108}
109
110impl AdsClient {
111 pub(super) fn build(
121 address: impl Into<Bytes>,
122 node_id: String,
123 cluster: String,
124 ) -> Result<(AdsClient, AdsTask), tonic::transport::Error> {
125 let endpoint = Endpoint::from_shared(address)?
127 .connect_timeout(Duration::from_secs(5))
128 .tcp_nodelay(true);
129
130 let node_info = xds_core::Node {
131 id: node_id,
132 cluster,
133 client_features: vec![
134 "envoy.lb.does_not_support_overprovisioning".to_string(),
135 "envoy.lrs.supports_send_all_clusters".to_string(),
136 ],
137 ..Default::default()
138 };
139
140 let (sub_tx, sub_rx) = mpsc::channel(10);
142 let cache = Cache::default();
143
144 let dns = StdlibResolver::new_with(Duration::from_secs(5), Duration::from_millis(500), 2);
146
147 let client = AdsClient {
148 subs: sub_tx,
149 cache: cache.reader(),
150 dns: dns.clone(),
151 };
152 let task = AdsTask {
153 endpoint,
154 initial_channel: None,
155 node_info,
156 cache,
157 dns,
158 subs: sub_rx,
159 };
160
161 Ok((client, task))
162 }
163
164 pub(super) fn csds_server(
165 &self,
166 port: u16,
167 ) -> impl Future<Output = Result<(), tonic::transport::Error>> + Send + 'static {
168 csds::local_server(self.cache.clone(), port)
169 }
170
171 pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
172 self.cache.iter_routes()
173 }
174
175 pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
176 self.cache.iter_backends()
177 }
178
179 pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
180 self.cache.iter_xds()
181 }
182}
183
184impl ConfigCache for AdsClient {
190 async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
191 let hosts = vec![host.as_ref().to_string()];
192 let _ = self.subs.send(SubscriptionUpdate::AddHosts(hosts)).await;
193
194 self.cache.get_route(host).await
195 }
196
197 async fn get_backend(
198 &self,
199 backend: &junction_api::backend::BackendId,
200 ) -> Option<std::sync::Arc<crate::BackendLb>> {
201 let bs = vec![backend.clone()];
202 let _ = self.subs.send(SubscriptionUpdate::AddBackends(bs)).await;
203
204 self.cache.get_backend(backend).await
205 }
206
207 async fn get_endpoints(
208 &self,
209 backend: &junction_api::backend::BackendId,
210 ) -> Option<std::sync::Arc<crate::EndpointGroup>> {
211 let bs = vec![backend.clone()];
212 let _ = self.subs.send(SubscriptionUpdate::AddEndpoints(bs)).await;
213
214 match &backend.service {
215 junction_api::Service::Dns(dns) => {
216 self.dns
217 .get_endpoints_await(&dns.hostname, backend.port)
218 .await
219 }
220 _ => self.cache.get_endpoints(backend).await,
221 }
222 }
223}
224
225pub(crate) struct AdsTask {
227 endpoint: tonic::transport::Endpoint,
228 initial_channel: Option<tonic::transport::Channel>,
229 node_info: xds_core::Node,
230 cache: Cache,
231 dns: StdlibResolver,
232 subs: mpsc::Receiver<SubscriptionUpdate>,
233}
234
235#[derive(Debug, thiserror::Error)]
236struct ShutdownError;
237
238impl std::fmt::Display for ShutdownError {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 write!(f, "AdsTask started after shutdown")
241 }
242}
243
244macro_rules! log_request {
245 ($request:expr) => {
246 tracing::debug!(
247 nack = $request.error_detail.is_some(),
248 "DeltaDiscoveryRequest(n={:?}, ty={:?}, r={:?}, u={:?} init={:?})",
249 $request.response_nonce,
250 $request.type_url,
251 $request.resource_names_subscribe,
252 $request.resource_names_unsubscribe,
253 $request.initial_resource_versions,
254 );
255 };
256}
257
258macro_rules! log_response {
259 ($response:expr) => {
260 if tracing::enabled!(tracing::Level::DEBUG) {
261 let names_and_versions = names_and_versions(&$response);
262 tracing::debug!(
263 "DeltaDiscoveryResponse(n={:?}, ty={:?}, r={:?}, removed={:?})",
264 $response.nonce,
265 $response.type_url,
266 names_and_versions,
267 $response.removed_resources,
268 );
269 }
270 };
271}
272
273fn names_and_versions(response: &DeltaDiscoveryResponse) -> Vec<(String, String)> {
274 response
275 .resources
276 .iter()
277 .map(|r| (r.name.clone(), r.version.clone()))
278 .collect()
279}
280
281impl AdsTask {
282 pub(super) fn is_shutdown(&self) -> bool {
283 self.subs.is_closed()
284 }
285
286 pub(super) async fn run(&mut self) -> Result<(), &(dyn std::error::Error + 'static)> {
287 if self.is_shutdown() {
288 return Err(&ShutdownError);
289 }
290
291 loop {
292 match self.run_connection().await {
293 Ok(()) => break,
294 Err(ConnectionError::AdsDisconnected) => continue,
296 Err(ConnectionError::Connect(e)) => {
303 debug!(err = %e, "failed to connect to ADS server");
304 tokio::time::sleep(Duration::from_secs(2)).await;
305 }
306 Err(ConnectionError::Status(status)) => {
312 let is_broken_pipe =
314 unwrap_io_error(&status).is_some_and(|e| e.kind() == ErrorKind::BrokenPipe);
315
316 if !is_broken_pipe {
317 debug!(err = %status, "ADS connection closed unexpectedly");
318 }
319
320 tokio::time::sleep(if is_broken_pipe {
321 Duration::from_millis(50)
322 } else {
323 Duration::from_secs(2)
324 })
325 .await;
326 }
327 };
328 }
329
330 Ok(())
331 }
332
333 async fn run_connection(&mut self) -> Result<(), ConnectionError> {
344 let (xds_tx, xds_rx) = tokio::sync::mpsc::channel(10);
345
346 let channel = self.new_connection().await?;
348 let mut client = AggregatedDiscoveryServiceClient::new(channel);
349 let stream_response = client
350 .delta_aggregated_resources(ReceiverStream::new(xds_rx))
351 .await?;
352 let mut incoming = stream_response.into_inner();
353
354 self.dns.set_names(self.cache.dns_names());
356
357 let (mut conn, initial_requests) =
359 AdsConnection::new(self.node_info.clone(), &mut self.cache);
360
361 for msg in initial_requests {
362 log_request!(msg);
363 if xds_tx.send(msg).await.is_err() {
364 return Err(ConnectionError::AdsDisconnected);
365 }
366 }
367
368 loop {
369 tracing::trace!("handle_update_batch");
370 let is_eof = handle_update_batch(&mut conn, &mut self.subs, &mut incoming).await?;
371 if is_eof {
372 return Ok(());
373 }
374
375 let (outgoing, dns_updates) = conn.outgoing();
376 for msg in outgoing {
377 log_request!(msg);
378 if xds_tx.send(msg).await.is_err() {
379 return Err(ConnectionError::AdsDisconnected);
380 }
381 }
382 update_dns(&self.dns, dns_updates.add, dns_updates.remove);
383 }
384 }
385
386 pub(super) async fn connect(&mut self) -> Result<(), tonic::transport::Error> {
387 if self.initial_channel.is_none() {
388 let channel = self.endpoint.connect().await?;
389 self.initial_channel = Some(channel)
390 }
391
392 Ok(())
393 }
394
395 async fn new_connection(
396 &mut self,
397 ) -> Result<tonic::transport::Channel, tonic::transport::Error> {
398 match self.initial_channel.take() {
399 Some(channel) => Ok(channel),
400 None => self.endpoint.connect().await,
401 }
402 }
403}
404
405async fn handle_update_batch(
411 conn: &mut AdsConnection<'_>,
412 subs: &mut Receiver<SubscriptionUpdate>,
413 incoming: &mut Streaming<DeltaDiscoveryResponse>,
414) -> Result<bool, ConnectionError> {
415 async fn next_update(
426 conn: &mut AdsConnection<'_>,
427 subs: &mut Receiver<SubscriptionUpdate>,
428 incoming: &mut Streaming<DeltaDiscoveryResponse>,
429 ) -> Result<bool, ConnectionError> {
430 tokio::select! {
431 biased;
432
433 xds_msg = incoming.try_next() => {
434 let response = match xds_msg? {
438 Some(response) => response,
439 None => return Err(ConnectionError::AdsDisconnected),
440 };
441 log_response!(response);
442
443 tracing::trace!("ads connection: handle_ads_message");
444 conn.handle_ads_message(response);
445 }
446 sub_update = subs.recv() => {
447 let Some(sub_update) = sub_update else {
448 return Ok(true)
449 };
450
451 tracing::trace!(
452 ?sub_update,
453 "ads connection: handle_subscription_update",
454 );
455 conn.handle_subscription_update(sub_update);
456 }
457 }
458 Ok(false)
459 }
460
461 if next_update(conn, subs, incoming).await? {
463 return Ok(true);
464 }
465
466 loop {
470 let Some(should_exit) = next_update(conn, subs, incoming).now_or_never() else {
471 break;
472 };
473
474 if should_exit? {
475 return Ok(true);
476 }
477 }
478
479 Ok(false)
480}
481
482#[inline]
483fn update_dns(
484 dns: &StdlibResolver,
485 add: BTreeSet<(Hostname, u16)>,
486 remove: BTreeSet<(Hostname, u16)>,
487) {
488 for (name, port) in add {
489 dns.subscribe(name, port);
490 }
491 for (name, port) in remove {
492 dns.unsubscribe(&name, port);
493 }
494}
495
496#[derive(Debug, thiserror::Error)]
497enum ConnectionError {
498 #[error(transparent)]
499 Connect(#[from] tonic::transport::Error),
500
501 #[error(transparent)]
502 Status(#[from] tonic::Status),
503
504 #[error("ADS server closed the stream")]
505 AdsDisconnected,
506}
507
508fn unwrap_io_error(status: &tonic::Status) -> Option<&std::io::Error> {
514 let mut err: &(dyn std::error::Error + 'static) = status;
515
516 loop {
517 if let Some(e) = err.downcast_ref::<std::io::Error>() {
518 return Some(e);
519 }
520
521 if let Some(e) = err.downcast_ref::<h2::Error>().and_then(|e| e.get_io()) {
523 return Some(e);
524 }
525
526 err = err.source()?;
527 }
528}
529
530struct AdsConnection<'a> {
531 cache: &'a mut Cache,
532 node: Option<xds_core::Node>,
533 acks: EnumMap<ResourceType, Option<AckState>>,
534 unknown_types: Vec<(String, String)>,
535}
536
537#[derive(Debug, Default)]
538struct AckState {
539 nonce: String,
540 error: Option<Cow<'static, str>>,
541}
542
543impl AckState {
544 fn into_ack(self) -> (String, Option<GrpcStatus>) {
545 let nonce = self.nonce;
546 let error = self.error.map(|message| GrpcStatus {
547 message: message.to_string(),
548 code: tonic::Code::InvalidArgument.into(),
549 ..Default::default()
550 });
551
552 (nonce, error)
553 }
554}
555
556impl<'a> AdsConnection<'a> {
557 fn new(node: xds_core::Node, cache: &'a mut Cache) -> (Self, Vec<DeltaDiscoveryRequest>) {
558 let mut requests = Vec::with_capacity(ResourceType::all().len());
559
560 let mut node = Some(node);
561 for &rtype in ResourceType::all() {
562 let initial_versions = cache.versions(rtype);
563 let mut subscribe = cache.initial_subscriptions(rtype);
564 if cache.is_wildcard(rtype) && !subscribe.is_empty() {
565 subscribe.push("*".to_string());
566 }
567
568 if !cache.is_wildcard(rtype) && subscribe.is_empty() && initial_versions.is_empty() {
569 continue;
570 }
571
572 requests.push(DeltaDiscoveryRequest {
573 node: node.take(),
574 type_url: rtype.type_url().to_string(),
575 resource_names_subscribe: subscribe,
576 initial_resource_versions: initial_versions,
577 ..Default::default()
578 });
579 }
580
581 let conn = Self {
582 cache,
583 node,
584 acks: Default::default(),
585 unknown_types: Vec::new(),
586 };
587 (conn, requests)
588 }
589
590 fn outgoing(&mut self) -> (Vec<DeltaDiscoveryRequest>, DnsUpdates) {
591 let mut responses = Vec::with_capacity(ResourceType::all().len());
592
593 for (response_nonce, type_url) in std::mem::take(&mut self.unknown_types) {
598 let error_detail = Some(xds_api::pb::google::rpc::Status {
599 code: tonic::Code::InvalidArgument.into(),
600 message: "unknown type".to_string(),
601 ..Default::default()
602 });
603 responses.push(DeltaDiscoveryRequest {
604 type_url,
605 response_nonce,
606 error_detail,
607 ..Default::default()
608 })
609 }
610
611 let (resources, dns) = self.cache.collect();
613
614 for (rtype, changes) in resources {
618 let ack = self.get_ack(rtype);
619
620 if ack.is_none() && changes.is_empty() {
621 continue;
622 }
623
624 let node = self.node.take();
625 let (response_nonce, error_detail) = ack.map(|a| a.into_ack()).unwrap_or_default();
626 let resource_names_subscribe = changes.added.into_iter().collect();
627 let resource_names_unsubscribe = changes.removed.into_iter().collect();
628
629 responses.push(DeltaDiscoveryRequest {
630 node,
631 type_url: rtype.type_url().to_string(),
632 response_nonce,
633 error_detail,
634 resource_names_subscribe,
635 resource_names_unsubscribe,
636 ..Default::default()
637 })
638 }
639
640 (responses, dns)
641 }
642
643 fn handle_ads_message(&mut self, resp: DeltaDiscoveryResponse) {
644 let Some(rtype) = ResourceType::from_type_url(&resp.type_url) else {
645 tracing::trace!(type_url = %resp.type_url, "unknown type url");
646 self.set_unknown(resp.nonce, resp.type_url);
647 return;
648 };
649
650 let resources = match ResourceVec::from_resources(rtype, resp.resources) {
652 Ok(r) => r,
653 Err(e) => {
654 tracing::trace!(err = %e, "invalid proto");
655 self.set_ack(
656 rtype,
657 resp.nonce,
658 Some(format!("invalid resource: {e}").into()),
659 );
660 return;
661 }
662 };
663
664 let resource_errors = self.cache.insert(resources);
665 let error = match &resource_errors[..] {
666 &[] => None,
667 _ => Some("invalid resources".into()),
669 };
670 self.set_ack(rtype, resp.nonce, error);
671
672 self.cache.remove(rtype, &resp.removed_resources);
674 }
675
676 fn handle_subscription_update(&mut self, update: SubscriptionUpdate) {
677 match update {
678 SubscriptionUpdate::AddHosts(hosts) => {
679 for host in hosts {
680 self.cache.subscribe(ResourceType::Listener, &host);
681 }
682 }
683 SubscriptionUpdate::RemoveHosts(hosts) => {
684 for host in hosts {
685 self.cache.unsubscribe(ResourceType::Listener, &host);
686 }
687 }
688 SubscriptionUpdate::AddBackends(backends) => {
689 for backend in backends {
690 if let Service::Dns(dns) = &backend.service {
691 self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
692 }
693 self.cache.subscribe(ResourceType::Cluster, &backend.name());
694 }
695 }
696 SubscriptionUpdate::RemoveBackends(backends) => {
697 for backend in backends {
698 if let Service::Dns(dns) = &backend.service {
699 self.cache
700 .unsubscribe_dns(dns.hostname.clone(), backend.port);
701 }
702 self.cache
703 .unsubscribe(ResourceType::Cluster, &backend.name());
704 }
705 }
706 SubscriptionUpdate::AddEndpoints(backends) => {
707 for backend in backends {
708 match &backend.service {
709 Service::Dns(dns) => {
710 self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
711 }
712 _ => self
713 .cache
714 .subscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
715 }
716 }
717 }
718 SubscriptionUpdate::RemoveEndpoints(backends) => {
719 for backend in backends {
720 match &backend.service {
721 Service::Dns(dns) => {
722 self.cache
723 .unsubscribe_dns(dns.hostname.clone(), backend.port);
724 }
725 _ => self
726 .cache
727 .unsubscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
728 }
729 }
730 }
731 }
732 }
733
734 fn set_unknown(&mut self, nonce: String, type_url: String) {
735 self.unknown_types.push((nonce, type_url))
736 }
737
738 fn set_ack(&mut self, rtype: ResourceType, nonce: String, error: Option<Cow<'static, str>>) {
739 self.acks[rtype] = Some(AckState { nonce, error })
740 }
741
742 fn get_ack(&mut self, rtype: ResourceType) -> Option<AckState> {
743 self.acks[rtype].take()
744 }
745}
746
747#[derive(Debug, Default, PartialEq, Eq)]
748struct DnsUpdates {
749 add: BTreeSet<(Hostname, u16)>,
750 remove: BTreeSet<(Hostname, u16)>,
751 sync: bool,
752}
753
754#[cfg(test)]
755impl DnsUpdates {
756 fn is_noop(&self) -> bool {
757 self.add.is_empty() && self.remove.is_empty() && !self.sync
758 }
759}
760
761#[cfg(test)]
762mod test_ads_conn {
763 use std::collections::HashMap;
764
765 use cache::Cache;
766 use once_cell::sync::Lazy;
767 use pretty_assertions::assert_eq;
768 use xds_api::pb::envoy::service::discovery::v3 as xds_discovery;
769
770 use super::test as xds_test;
771 use super::*;
772
773 static TEST_NODE: Lazy<xds_core::Node> = Lazy::new(|| xds_core::Node {
774 id: "unit-test".to_string(),
775 ..Default::default()
776 });
777
778 #[track_caller]
781 fn new_conn(cache: &mut Cache) -> (AdsConnection, Vec<DeltaDiscoveryRequest>) {
782 let (conn, mut outgoing) = AdsConnection::new(TEST_NODE.clone(), cache);
783
784 if let Some(first) = outgoing.first_mut() {
786 let node = first
787 .node
788 .take()
789 .expect("expected first outgoing request to have a node");
790
791 assert_eq!(node, *TEST_NODE);
792 };
793
794 (conn, outgoing)
795 }
796
797 #[test]
798 fn test_init_empty_wildcard() {
799 let mut cache = Cache::default();
800 cache.set_wildcard(ResourceType::Listener, true);
801 cache.set_wildcard(ResourceType::Cluster, true);
802
803 let (_, outgoing) = new_conn(&mut cache);
804
805 assert_eq!(
806 outgoing,
807 vec![
808 xds_test::req!(t = ResourceType::Cluster),
809 xds_test::req!(t = ResourceType::Listener),
810 ]
811 )
812 }
813
814 #[test]
815 fn test_init_empty_explicit() {
816 let mut cache = Cache::default();
817 cache.set_wildcard(ResourceType::Listener, false);
818 cache.set_wildcard(ResourceType::Cluster, false);
819
820 let (_, outgoing) = new_conn(&mut cache);
821 assert!(outgoing.is_empty());
822 }
823
824 #[test]
825 fn test_init_subscription_wildcard() {
826 let mut cache = Cache::default();
827 cache.set_wildcard(ResourceType::Listener, false);
828 cache.set_wildcard(ResourceType::Cluster, true);
829
830 cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
831 cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
832
833 let (_, outgoing) = new_conn(&mut cache);
836 assert_eq!(
837 outgoing,
838 vec![
839 xds_test::req!(
840 t = ResourceType::Cluster,
841 add = vec!["cluster.example:7891", "*"],
842 init = vec![],
843 ),
844 xds_test::req!(
845 t = ResourceType::ClusterLoadAssignment,
846 add = vec!["cluster.example:7891",],
847 init = vec![],
848 )
849 ]
850 );
851 }
852
853 #[test]
854 fn test_init_subscription_explicit() {
855 let mut cache = Cache::default();
856 cache.set_wildcard(ResourceType::Listener, false);
857 cache.set_wildcard(ResourceType::Cluster, false);
858
859 cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
860 cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
861
862 let (_, outgoing) = new_conn(&mut cache);
863 assert_eq!(
864 outgoing,
865 vec![
866 xds_test::req!(
867 t = ResourceType::Cluster,
868 add = vec!["cluster.example:7891",],
869 init = vec![],
870 ),
871 xds_test::req!(
872 t = ResourceType::ClusterLoadAssignment,
873 add = vec!["cluster.example:7891",],
874 init = vec![],
875 ),
876 ]
877 );
878 }
879
880 #[test]
881 fn test_init_initial_versions() {
882 let mut cache = Cache::default();
883 assert!(cache.is_wildcard(ResourceType::Listener));
884 assert!(!cache.is_wildcard(ResourceType::RouteConfiguration));
885
886 cache.insert(ResourceVec::from_listeners(
887 "123".into(),
888 vec![xds_test::listener!("cooler.example.org", "cool-route")],
889 ));
890 cache.insert(ResourceVec::from_listeners(
891 "456".into(),
892 vec![xds_test::listener!("warmer.example.org", "warm-route")],
893 ));
894 cache.insert(ResourceVec::from_route_configs(
895 "789".into(),
896 vec![xds_test::route_config!(
897 "cool-route",
898 vec![xds_test::vhost!(
899 "an-vhost",
900 ["cooler.example.org"],
901 [xds_test::route!(default "cooler.example.internal:8008")]
902 )]
903 )],
904 ));
905
906 let (_, outgoing) = new_conn(&mut cache);
909 assert_eq!(
910 outgoing,
911 vec![
912 xds_test::req!(
913 t = ResourceType::Cluster,
914 add = vec!["cooler.example.internal:8008", "*"],
915 init = vec![],
916 ),
917 xds_test::req!(
918 t = ResourceType::Listener,
919 add = vec![],
920 init = vec![("cooler.example.org", "123"), ("warmer.example.org", "456"),]
921 ),
922 xds_test::req!(
923 t = ResourceType::RouteConfiguration,
924 add = vec!["warm-route"],
925 init = vec![("cool-route", "789")]
926 ),
927 ],
928 );
929 }
930
931 #[test]
932 fn test_handle_subscribe_hostname() {
933 let mut cache = Cache::default();
934 let (mut conn, _) = new_conn(&mut cache);
935
936 conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![
937 Service::dns("website.internal").unwrap().name(),
938 Service::kube("default", "nginx")
939 .unwrap()
940 .as_backend_id(4443)
941 .name(),
942 ]));
943
944 let (outgoing, dns) = conn.outgoing();
945 assert!(dns.is_noop());
947 assert_eq!(
948 outgoing,
949 vec![xds_test::req!(
950 t = ResourceType::Listener,
951 add = vec!["nginx.default.svc.cluster.local:4443", "website.internal"],
952 )]
953 );
954 }
955
956 #[test]
957 fn test_handle_subscribe_backend() {
958 let mut cache = Cache::default();
959 let (mut conn, _) = new_conn(&mut cache);
960
961 conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![
962 Service::dns("website.internal").unwrap().as_backend_id(80),
963 Service::kube("default", "nginx")
964 .unwrap()
965 .as_backend_id(4443),
966 ]));
967
968 let (outgoing, dns) = conn.outgoing();
969 assert_eq!(
971 dns,
972 DnsUpdates {
973 add: [(Hostname::from_static("website.internal"), 80)]
974 .into_iter()
975 .collect(),
976 ..Default::default()
977 }
978 );
979
980 assert_eq!(
982 outgoing,
983 vec![xds_test::req!(
984 t = ResourceType::Cluster,
985 add = vec![
986 "nginx.default.svc.cluster.local:4443",
987 "website.internal:80"
988 ],
989 )]
990 );
991 }
992
993 #[test]
994 fn test_handle_ads_message_listener_route() {
995 let mut cache = Cache::default();
996 assert!(cache.is_wildcard(ResourceType::Listener));
997
998 let (mut conn, _) = new_conn(&mut cache);
999
1000 conn.handle_ads_message(xds_test::resp!(
1001 n = "1",
1002 add = ResourceVec::from_listeners(
1003 "123".into(),
1004 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1005 ),
1006 remove = vec![],
1007 ));
1008 conn.handle_ads_message(xds_test::resp!(
1009 n = "2",
1010 add = ResourceVec::from_listeners(
1011 "456".into(),
1012 vec![xds_test::listener!("warmer.example.org", "warm-route")],
1013 ),
1014 remove = vec![],
1015 ));
1016 conn.handle_ads_message(xds_test::resp!(
1017 n = "3",
1018 add = ResourceVec::from_route_configs(
1019 "789".into(),
1020 vec![xds_test::route_config!(
1021 "cool-route",
1022 vec![xds_test::vhost!(
1023 "an-vhost",
1024 ["cooler.example.org"],
1025 [xds_test::route!(default "cooler.example.internal:8008")]
1026 )]
1027 )],
1028 ),
1029 remove = vec![],
1030 ));
1031
1032 let (outgoing, dns) = conn.outgoing();
1033 assert!(dns.is_noop());
1035
1036 assert_eq!(
1037 outgoing,
1038 vec![
1039 xds_test::req!(
1041 t = ResourceType::Cluster,
1042 add = vec!["cooler.example.internal:8008"]
1043 ),
1044 xds_test::req!(t = ResourceType::Listener, n = "2"),
1046 xds_test::req!(
1048 t = ResourceType::RouteConfiguration,
1049 n = "3",
1050 add = vec!["warm-route"]
1051 ),
1052 ],
1053 );
1054 }
1055
1056 #[test]
1057 fn test_handle_ads_message_listener_swap_route() {
1058 let mut cache = Cache::default();
1059 assert!(cache.is_wildcard(ResourceType::Listener));
1060
1061 let (mut conn, _) = new_conn(&mut cache);
1062
1063 conn.handle_ads_message(xds_test::resp!(
1065 n = "1",
1066 add = ResourceVec::from_listeners(
1067 "111".into(),
1068 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1069 ),
1070 remove = vec![],
1071 ));
1072 conn.handle_ads_message(xds_test::resp!(
1073 n = "2",
1074 add = ResourceVec::from_route_configs(
1075 "222".into(),
1076 vec![xds_test::route_config!(
1077 "cool-route",
1078 vec![xds_test::vhost!(
1079 "an-vhost",
1080 ["cooler.example.org"],
1081 [xds_test::route!(default "cooler.example.internal:8008")]
1082 )]
1083 )],
1084 ),
1085 remove = vec![],
1086 ));
1087
1088 let (outgoing, dns) = conn.outgoing();
1089 assert!(dns.is_noop());
1090 assert_eq!(
1091 outgoing,
1092 vec![
1093 xds_test::req!(
1095 t = ResourceType::Cluster,
1096 add = vec!["cooler.example.internal:8008"]
1097 ),
1098 xds_test::req!(t = ResourceType::Listener, n = "1"),
1100 xds_test::req!(t = ResourceType::RouteConfiguration, n = "2"),
1102 ],
1103 );
1104
1105 assert_eq!(
1106 conn.cache.versions(ResourceType::Listener),
1107 HashMap::from_iter([("cooler.example.org".to_string(), "111".to_string())])
1108 );
1109 assert_eq!(
1110 conn.cache.versions(ResourceType::RouteConfiguration),
1111 HashMap::from_iter([("cool-route".to_string(), "222".to_string())])
1112 );
1113
1114 conn.handle_ads_message(xds_test::resp!(
1119 n = "3",
1120 add = ResourceVec::from_listeners(
1121 "333".into(),
1122 vec![xds_test::listener!("cooler.example.org", "lame-route")],
1123 ),
1124 remove = vec![],
1125 ));
1126 conn.handle_ads_message(xds_test::resp!(
1127 n = "4",
1128 add = ResourceVec::from_listeners(
1129 "444".into(),
1130 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1131 ),
1132 remove = vec![],
1133 ));
1134
1135 let (outgoing, dns) = conn.outgoing();
1136 assert!(dns.is_noop());
1137 assert_eq!(
1138 outgoing,
1139 vec![
1140 xds_test::req!(t = ResourceType::Listener, n = "4"),
1142 xds_test::req!(
1144 t = ResourceType::RouteConfiguration,
1145 n = "",
1146 add = vec!["lame-route"],
1147 remove = vec!["lame-route"]
1148 ),
1149 ]
1150 );
1151
1152 assert_eq!(
1153 conn.cache.versions(ResourceType::Listener),
1154 HashMap::from_iter([("cooler.example.org".to_string(), "444".to_string())])
1155 );
1156 assert_eq!(
1157 conn.cache.versions(ResourceType::RouteConfiguration),
1158 HashMap::from_iter([("cool-route".to_string(), "222".to_string())])
1159 );
1160 }
1161
1162 #[test]
1163 fn test_handle_ads_message_add_remove_add() {
1164 tracing_subscriber::fmt::init();
1165 tracing::trace!("HELLO?");
1166
1167 let mut cache = Cache::default();
1168 assert!(cache.is_wildcard(ResourceType::Listener));
1169 assert!(cache.is_wildcard(ResourceType::Cluster));
1170
1171 let (mut conn, _) = new_conn(&mut cache);
1172
1173 conn.handle_ads_message(xds_test::resp!(
1175 n = "1",
1176 add = ResourceVec::from_listeners(
1177 "111".into(),
1178 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1179 ),
1180 remove = vec![],
1181 ));
1182 conn.handle_ads_message(xds_test::resp!(
1183 n = "2",
1184 add = ResourceVec::from_route_configs(
1185 "222".into(),
1186 vec![xds_test::route_config!(
1187 "cool-route",
1188 vec![xds_test::vhost!(
1189 "an-vhost",
1190 ["cooler.example.org"],
1191 [xds_test::route!(default "cooler.example.internal:8008")]
1192 )]
1193 )],
1194 ),
1195 remove = vec![],
1196 ));
1197 conn.handle_ads_message(xds_test::resp!(
1198 n = "3",
1199 add = ResourceVec::from_clusters(
1200 "333".into(),
1201 vec![xds_test::cluster!("cooler.example.internal:8008"),],
1202 ),
1203 remove = vec![],
1204 ));
1205 let (outgoing, _dns) = conn.outgoing();
1206 assert_eq!(
1207 outgoing,
1208 vec![
1209 xds_test::req!(t = ResourceType::Cluster, n = "3"),
1211 xds_test::req!(
1213 t = ResourceType::Listener,
1214 n = "1",
1215 add = vec!["cooler.example.internal.lb.jct:8008"]
1216 ),
1217 xds_test::req!(t = ResourceType::RouteConfiguration, n = "2"),
1219 ],
1220 );
1221
1222 conn.handle_ads_message(xds_test::resp!(
1224 n = "4",
1225 add = ResourceVec::from_listeners(
1226 "444".into(),
1227 vec![xds_test::listener!("cooler.example.org", "very-cool-route")],
1228 ),
1229 remove = vec![],
1230 ));
1231 conn.handle_ads_message(xds_test::resp!(
1232 n = "5",
1233 add = ResourceVec::from_route_configs("444".into(), vec![]),
1234 remove = vec!["very-cool-route"],
1235 ));
1236 let (outgoing, _dns) = conn.outgoing();
1237 assert_eq!(
1238 outgoing,
1239 vec![
1240 xds_test::req!(
1242 t = ResourceType::Cluster,
1243 remove = vec!["cooler.example.internal:8008"]
1244 ),
1245 xds_test::req!(
1247 t = ResourceType::Listener,
1248 n = "4",
1249 add = vec![],
1250 remove = vec!["cooler.example.internal.lb.jct:8008"]
1251 ),
1252 xds_test::req!(
1254 t = ResourceType::RouteConfiguration,
1255 n = "5",
1256 add = vec!["very-cool-route"],
1257 remove = vec!["cool-route"]
1258 ),
1259 ],
1260 );
1261
1262 conn.handle_ads_message(xds_test::resp!(
1265 n = "6",
1266 add = ResourceVec::from_clusters("444".into(), vec![]),
1267 remove = vec!["cooler.example.internal:8008"],
1268 ));
1269 conn.handle_ads_message(xds_test::resp!(
1270 n = "7",
1271 add = ResourceVec::from_listeners("444".into(), vec![]),
1272 remove = vec!["cooler.example.internal.lb.jct:8008"],
1273 ));
1274 conn.handle_ads_message(xds_test::resp!(
1275 n = "8",
1276 add = ResourceVec::from_route_configs(
1277 "555".into(),
1278 vec![xds_test::route_config!(
1279 "very-cool-route",
1280 vec![xds_test::vhost!(
1281 "an-vhost",
1282 ["cooler.example.org"],
1283 [xds_test::route!(default "cooler.example.internal:8008")]
1284 )]
1285 )],
1286 ),
1287 remove = vec![],
1288 ));
1289 let (outgoing, _dns) = conn.outgoing();
1290 assert_eq!(
1291 outgoing,
1292 vec![
1293 xds_test::req!(
1295 t = ResourceType::Cluster,
1296 n = "6",
1297 add = vec!["cooler.example.internal:8008"]
1298 ),
1299 xds_test::req!(t = ResourceType::Listener, n = "7"),
1301 xds_test::req!(t = ResourceType::RouteConfiguration, n = "8"),
1303 ],
1304 );
1305 }
1306
1307 #[test]
1308 fn test_handle_ads_message_listener_removed() {
1309 let mut cache = Cache::default();
1310 assert!(cache.is_wildcard(ResourceType::Listener));
1311
1312 let (mut conn, _) = new_conn(&mut cache);
1313
1314 conn.handle_ads_message(xds_test::resp!(
1315 n = "1",
1316 add = ResourceVec::from_listeners(
1317 "123".into(),
1318 vec![xds_test::listener!("cooler.example.org", "cool-route")],
1319 ),
1320 remove = vec![],
1321 ));
1322 conn.handle_ads_message(xds_test::resp!(
1323 n = "2",
1324 add = ResourceVec::from_listeners(
1325 "456".into(),
1326 vec![xds_test::listener!("warmer.example.org", "warm-route")],
1327 ),
1328 remove = vec![],
1329 ));
1330 conn.handle_ads_message(xds_test::resp!(
1331 n = "3",
1332 add = ResourceVec::from_route_configs(
1333 "789".into(),
1334 vec![xds_test::route_config!(
1335 "cool-route",
1336 vec![xds_test::vhost!(
1337 "an-vhost",
1338 ["cooler.example.org"],
1339 [xds_test::route!(default "cooler.example.internal:8008")]
1340 )]
1341 )],
1342 ),
1343 remove = vec![],
1344 ));
1345
1346 let (outgoing, dns) = conn.outgoing();
1347 assert!(dns.is_noop());
1349
1350 assert_eq!(
1351 outgoing,
1352 vec![
1353 xds_test::req!(
1355 t = ResourceType::Cluster,
1356 add = vec!["cooler.example.internal:8008"]
1357 ),
1358 xds_test::req!(t = ResourceType::Listener, n = "2"),
1360 xds_test::req!(
1362 t = ResourceType::RouteConfiguration,
1363 n = "3",
1364 add = vec!["warm-route"]
1365 ),
1366 ],
1367 );
1368
1369 conn.handle_ads_message(xds_test::resp!(
1371 n = "4",
1372 add = ResourceVec::from_listeners("123".into(), vec![]),
1373 remove = vec!["warmer.example.org"],
1374 ));
1375
1376 let (outgoing, dns) = conn.outgoing();
1377 assert!(dns.is_noop());
1378 assert_eq!(
1379 outgoing,
1380 vec![
1381 xds_test::req!(t = ResourceType::Listener, n = "4"),
1383 xds_test::req!(
1385 t = ResourceType::RouteConfiguration,
1386 remove = vec!["warm-route"],
1387 ),
1388 ]
1389 );
1390 }
1391
1392 #[test]
1393 fn test_handle_ads_message_cluster_cla() {
1394 let mut cache = Cache::default();
1395 assert!(cache.is_wildcard(ResourceType::Cluster));
1396
1397 let (mut conn, _) = new_conn(&mut cache);
1398
1399 conn.handle_ads_message(xds_test::resp!(
1400 n = "1",
1401 add = ResourceVec::from_clusters(
1402 "123".into(),
1403 vec![
1404 xds_test::cluster!("cooler.example.org:2345"),
1405 xds_test::cluster!("thing.default.svc.cluster.local:9876"),
1406 ],
1407 ),
1408 remove = vec![],
1409 ));
1410 conn.handle_ads_message(xds_test::resp!(
1411 n = "2",
1412 add = ResourceVec::from_load_assignments(
1413 "123".into(),
1414 vec![xds_test::cla!(
1415 "thing.default.svc.cluster.local:9876" => {
1416 "zone1" => ["1.1.1.1"]
1417 }
1418 )],
1419 ),
1420 remove = vec![],
1421 ));
1422 conn.handle_ads_message(xds_test::resp!(
1423 n = "3",
1424 add = ResourceVec::from_listeners("555".into(), vec![
1425 xds_test::listener!("cooler.example.org.lb.jct:2345", "lb-route" => [xds_test::vhost!(
1426 "lb-vhost",
1427 ["cooler.example.org.lb.jct:2345"],
1428 [xds_test::route!(default ring_hash = "x-user", "cooler.example.org:2345")],
1429 )]),
1430 xds_test::listener!("thing.default.svc.cluster.local.lb.jct:9876", "lb-route" => [xds_test::vhost!(
1431 "lb-vhost",
1432 ["cooler.example.org.lb.jct:2345"],
1433 [xds_test::route!(default ring_hash = "x-user", "thing.default.svc.cluster.local:9876")],
1434 )])
1435 ]),
1436 remove = vec![],
1437 ));
1438
1439 let (outgoing, dns) = conn.outgoing();
1440 assert_eq!(
1442 dns,
1443 DnsUpdates {
1444 add: [(Hostname::from_static("cooler.example.org"), 2345)]
1445 .into_iter()
1446 .collect(),
1447 ..Default::default()
1448 }
1449 );
1450 assert_eq!(
1452 outgoing,
1453 vec![
1454 xds_test::req!(t = ResourceType::Cluster, n = "1"),
1455 xds_test::req!(t = ResourceType::ClusterLoadAssignment, n = "2"),
1456 xds_test::req!(t = ResourceType::Listener, n = "3"),
1457 ]
1458 );
1459 }
1460
1461 #[test]
1462 fn test_set_node_after_init() {
1463 let mut cache = Cache::default();
1464 for rtype in ResourceType::all() {
1465 cache.set_wildcard(*rtype, false);
1466 }
1467
1468 let (mut conn, outgoing) = new_conn(&mut cache);
1469 assert!(outgoing.is_empty());
1470
1471 let svc = Service::dns("website.internal").unwrap().as_backend_id(80);
1472 conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![svc]));
1473
1474 let (outgoing, _) = conn.outgoing();
1475 assert_eq!(outgoing[0].node.as_ref(), Some(&*TEST_NODE));
1476 }
1477
1478 #[test]
1479 fn test_handle_unknown_type_url() {
1480 let mut cache = Cache::default();
1481 let (mut conn, _) = new_conn(&mut cache);
1482
1483 conn.handle_ads_message(DeltaDiscoveryResponse {
1484 type_url: "made.up.type_url/Potato".to_string(),
1485 ..Default::default()
1486 });
1487
1488 let (outgoing, dns) = conn.outgoing();
1489 assert!(dns.is_noop());
1490 assert_eq!(
1491 outgoing,
1492 vec![DeltaDiscoveryRequest {
1493 type_url: "made.up.type_url/Potato".to_string(),
1494 error_detail: Some(xds_api::pb::google::rpc::Status {
1495 code: tonic::Code::InvalidArgument.into(),
1496 message: "unknown type".to_string(),
1497 ..Default::default()
1498 }),
1499 ..Default::default()
1500 }]
1501 );
1502 }
1503
1504 #[test]
1505 fn test_handle_invalid_resource() {
1506 let mut cache = Cache::default();
1507 let (mut conn, _) = new_conn(&mut cache);
1508
1509 let node = xds_core::Node {
1510 id: "some-node".to_string(),
1511 ..Default::default()
1512 };
1513 conn.handle_ads_message(DeltaDiscoveryResponse {
1514 type_url: ResourceType::Listener.type_url().to_string(),
1515 resources: vec![xds_discovery::Resource {
1516 resource: Some(protobuf::Any::from_msg(&node).unwrap()),
1517 ..Default::default()
1518 }],
1519 ..Default::default()
1520 });
1521
1522 let (outgoing, dns) = conn.outgoing();
1523 assert!(dns.is_noop());
1524 assert!(matches!(
1525 &outgoing[..],
1526 [DeltaDiscoveryRequest { type_url, error_detail, ..}] if
1527 type_url == ResourceType::Listener.type_url() &&
1528 error_detail.as_ref().is_some_and(|e| e.message.starts_with("invalid resource"))
1529 ));
1530 }
1531
1532 #[test]
1533 fn test_handle_does_not_exist() {
1534 let mut cache = Cache::default();
1535 let (mut conn, _) = new_conn(&mut cache);
1536
1537 let does_not_exist = Service::dns("website.internal").unwrap().name();
1539 conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![does_not_exist.clone()]));
1540 let _ = conn.outgoing();
1541
1542 conn.handle_ads_message(DeltaDiscoveryResponse {
1543 nonce: "boo".to_string(),
1544 type_url: ResourceType::Listener.type_url().to_string(),
1545 removed_resources: vec![does_not_exist.clone()],
1546 ..Default::default()
1547 });
1548
1549 let (outgoing, dns) = conn.outgoing();
1551 assert!(dns.is_noop());
1552 assert_eq!(
1553 outgoing,
1554 vec![xds_test::req!(t = ResourceType::Listener, n = "boo")],
1555 );
1556
1557 let route = cache
1559 .reader()
1560 .get_route("website.internal")
1561 .now_or_never()
1562 .unwrap();
1563 assert_eq!(route, None);
1564 }
1565}