junction_core/
xds.rs

1//! Junction XDS.
2//!
3//! This module contains [AdsClient], the interface between Junction and XDS. A
4//! client is a long-lived pile of state that gets wrapped by one or more Junction
5//! Clients to present Route and Backend data to the world.
6//!
7//! The stateful internals of a client are written in a sans-io way as much as
8//! possible to make it easier to test and verify the complexity of ADS. Most of
9//! the nasty bits are wrapped up in the [cache] module - a Cache is responsible
10//! for parsing and storing raw xDS and its Junction equivalent, and for tracking
11//! the relationship between resources.
12//!
13//! An [AdsConnection] wraps a cache and a DNS resolver and multiplexes the
14//! input from remote connections, subscriptions from clients and uses the
15//! state of the cache to register interest in DNS names and to subscribe to
16//! xDS resources.
17//!
18//! The [AdsTask] returned from a client is the actual io in this module - an
19//! [AdsTask] actually does gRPC and listens on sockets and drives a new
20//! [AdsConnection] every time it reconnects.
21
22//  # TODO
23//
24// - Figure out how to run a Client without an upstream ADS server. Right now
25//   we don't process subscription updates until a gRPC connection gets
26//   established which seems bad.
27//
28//  - XDS client features:
29//    `envoy.lb.does_not_support_overprovisioning` and friends. See
30//    <https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md>.
31
32use bytes::Bytes;
33use cache::{Cache, CacheReader};
34use enum_map::EnumMap;
35use futures::{FutureExt, TryStreamExt};
36use junction_api::{backend::BackendId, http::Route, Hostname, Service};
37use std::{
38    borrow::Cow, collections::BTreeSet, future::Future, io::ErrorKind, sync::Arc, time::Duration,
39};
40use tokio::sync::mpsc::{self, Receiver};
41use tokio_stream::wrappers::ReceiverStream;
42use tonic::{transport::Endpoint, Streaming};
43use tracing::debug;
44use xds_api::pb::{
45    envoy::{
46        config::core::v3 as xds_core,
47        service::discovery::v3::{
48            aggregated_discovery_service_client::AggregatedDiscoveryServiceClient,
49            DeltaDiscoveryRequest, DeltaDiscoveryResponse,
50        },
51    },
52    google::{protobuf, rpc::Status as GrpcStatus},
53};
54
55mod cache;
56
57mod resources;
58pub use resources::ResourceVersion;
59pub(crate) use resources::{ResourceType, ResourceVec};
60
61use crate::{dns::StdlibResolver, BackendLb, ConfigCache};
62
63mod csds;
64
65#[cfg(test)]
66mod test;
67
68/// A single xDS configuration object, with additional metadata about when it
69/// was fetched and processed.
70#[derive(Debug, Default, Clone)]
71pub struct XdsConfig {
72    pub name: String,
73    pub type_url: String,
74    pub version: Option<ResourceVersion>,
75    pub xds: Option<protobuf::Any>,
76    pub last_error: Option<(ResourceVersion, String)>,
77}
78
79#[derive(Debug)]
80enum SubscriptionUpdate {
81    AddHosts(Vec<String>),
82    AddBackends(Vec<BackendId>),
83    AddEndpoints(Vec<BackendId>),
84
85    #[allow(unused)]
86    RemoveHosts(Vec<String>),
87    #[allow(unused)]
88    RemoveBackends(Vec<BackendId>),
89    #[allow(unused)]
90    RemoveEndpoints(Vec<BackendId>),
91}
92
93/// A Junction ADS client that manages long-lived xDS state by connecting to a
94/// remote server.
95///
96/// The client presents downstream as a [ConfigCache] so that a client can query
97/// `Route` and `Backend` data. It also exposes a subscription interface for
98/// both so that clients can register interest without having to know about the
99/// details of xDS.
100///
101/// See the module docs for the general design of this whole module and how the
102/// client pulls it all together.
103#[derive(Clone)]
104pub(super) struct AdsClient {
105    subs: mpsc::Sender<SubscriptionUpdate>,
106    cache: CacheReader,
107    dns: StdlibResolver,
108}
109
110impl AdsClient {
111    /// Create a new paired `AdsClient`` and `AdsTask`.
112    ///
113    /// A single `AdsTask` is expected to run in the background and communicate
114    /// with an ADS service, while any number of `AdsClient`s can use it to read
115    /// and request discovery data.
116    ///
117    /// This method doesn't start the background work necessary to communicate with
118    /// an ADS server. To do that, call the [run][AdsTask::run] method on the returned
119    /// `AdsTask`.
120    pub(super) fn build(
121        address: impl Into<Bytes>,
122        node_id: String,
123        cluster: String,
124    ) -> Result<(AdsClient, AdsTask), tonic::transport::Error> {
125        // FIXME: make this configurable
126        let endpoint = Endpoint::from_shared(address)?
127            .connect_timeout(Duration::from_secs(5))
128            .tcp_nodelay(true);
129
130        let node_info = xds_core::Node {
131            id: node_id,
132            cluster,
133            client_features: vec![
134                "envoy.lb.does_not_support_overprovisioning".to_string(),
135                "envoy.lrs.supports_send_all_clusters".to_string(),
136            ],
137            ..Default::default()
138        };
139
140        // TODO: how should we pick this number?
141        let (sub_tx, sub_rx) = mpsc::channel(10);
142        let cache = Cache::default();
143
144        // FIXME: make this configurable
145        let dns = StdlibResolver::new_with(Duration::from_secs(5), Duration::from_millis(500), 2);
146
147        let client = AdsClient {
148            subs: sub_tx,
149            cache: cache.reader(),
150            dns: dns.clone(),
151        };
152        let task = AdsTask {
153            endpoint,
154            initial_channel: None,
155            node_info,
156            cache,
157            dns,
158            subs: sub_rx,
159        };
160
161        Ok((client, task))
162    }
163
164    pub(super) fn csds_server(
165        &self,
166        port: u16,
167    ) -> impl Future<Output = Result<(), tonic::transport::Error>> + Send + 'static {
168        csds::local_server(self.cache.clone(), port)
169    }
170
171    pub(super) fn iter_routes(&self) -> impl Iterator<Item = Arc<Route>> + '_ {
172        self.cache.iter_routes()
173    }
174
175    pub(super) fn iter_backends(&self) -> impl Iterator<Item = Arc<BackendLb>> + '_ {
176        self.cache.iter_backends()
177    }
178
179    pub(super) fn iter_xds(&self) -> impl Iterator<Item = XdsConfig> + '_ {
180        self.cache.iter_xds()
181    }
182}
183
184// TODO: the whole add-a-subscription-on-get thing is a bit werid but we don't
185// have a better signal yet. there probably is one, but we need some way to
186// distinguish between "get_endpoints was called because client.resolve_http was
187// called and its downstream of a listener" and "get_endpoints was called
188// because there is a DNS cluster in a static config".
189impl ConfigCache for AdsClient {
190    async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
191        let hosts = vec![host.as_ref().to_string()];
192        let _ = self.subs.send(SubscriptionUpdate::AddHosts(hosts)).await;
193
194        self.cache.get_route(host).await
195    }
196
197    async fn get_backend(
198        &self,
199        backend: &junction_api::backend::BackendId,
200    ) -> Option<std::sync::Arc<crate::BackendLb>> {
201        let bs = vec![backend.clone()];
202        let _ = self.subs.send(SubscriptionUpdate::AddBackends(bs)).await;
203
204        self.cache.get_backend(backend).await
205    }
206
207    async fn get_endpoints(
208        &self,
209        backend: &junction_api::backend::BackendId,
210    ) -> Option<std::sync::Arc<crate::EndpointGroup>> {
211        let bs = vec![backend.clone()];
212        let _ = self.subs.send(SubscriptionUpdate::AddEndpoints(bs)).await;
213
214        match &backend.service {
215            junction_api::Service::Dns(dns) => {
216                self.dns
217                    .get_endpoints_await(&dns.hostname, backend.port)
218                    .await
219            }
220            _ => self.cache.get_endpoints(backend).await,
221        }
222    }
223}
224
225/// The IO-doing, gRPC adjacent part of running an ADS client.
226pub(crate) struct AdsTask {
227    endpoint: tonic::transport::Endpoint,
228    initial_channel: Option<tonic::transport::Channel>,
229    node_info: xds_core::Node,
230    cache: Cache,
231    dns: StdlibResolver,
232    subs: mpsc::Receiver<SubscriptionUpdate>,
233}
234
235#[derive(Debug, thiserror::Error)]
236struct ShutdownError;
237
238impl std::fmt::Display for ShutdownError {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        write!(f, "AdsTask started after shutdown")
241    }
242}
243
244macro_rules! log_request {
245    ($request:expr) => {
246        tracing::debug!(
247            nack = $request.error_detail.is_some(),
248            "DeltaDiscoveryRequest(n={:?}, ty={:?}, r={:?}, u={:?} init={:?})",
249            $request.response_nonce,
250            $request.type_url,
251            $request.resource_names_subscribe,
252            $request.resource_names_unsubscribe,
253            $request.initial_resource_versions,
254        );
255    };
256}
257
258macro_rules! log_response {
259    ($response:expr) => {
260        if tracing::enabled!(tracing::Level::DEBUG) {
261            let names_and_versions = names_and_versions(&$response);
262            tracing::debug!(
263                "DeltaDiscoveryResponse(n={:?}, ty={:?}, r={:?}, removed={:?})",
264                $response.nonce,
265                $response.type_url,
266                names_and_versions,
267                $response.removed_resources,
268            );
269        }
270    };
271}
272
273fn names_and_versions(response: &DeltaDiscoveryResponse) -> Vec<(String, String)> {
274    response
275        .resources
276        .iter()
277        .map(|r| (r.name.clone(), r.version.clone()))
278        .collect()
279}
280
281impl AdsTask {
282    pub(super) fn is_shutdown(&self) -> bool {
283        self.subs.is_closed()
284    }
285
286    pub(super) async fn run(&mut self) -> Result<(), &(dyn std::error::Error + 'static)> {
287        if self.is_shutdown() {
288            return Err(&ShutdownError);
289        }
290
291        loop {
292            match self.run_connection().await {
293                Ok(()) => break,
294                // on an ADS disconnect, just reconnect
295                Err(ConnectionError::AdsDisconnected) => continue,
296                // On a connection error, reconnect with a backoff and try to
297                // find a new ADS server.
298                //
299                // There's no great way to distingush between a connection
300                // that's never going to work and a temporary (but long) outage,
301                // so we'll just patiently keep trying.
302                Err(ConnectionError::Connect(e)) => {
303                    debug!(err = %e, "failed to connect to ADS server");
304                    tokio::time::sleep(Duration::from_secs(2)).await;
305                }
306                // The stream closed with a Tonic error. This is usually either
307                // a broken pipe or some other kind of IO error.
308                //
309                // There's also nothing to do here but log an error and
310                // continue, but don't wait too long on broken pipe.
311                Err(ConnectionError::Status(status)) => {
312                    // FIXME: emit an event with tracing or metrics or something here
313                    let is_broken_pipe =
314                        unwrap_io_error(&status).is_some_and(|e| e.kind() == ErrorKind::BrokenPipe);
315
316                    if !is_broken_pipe {
317                        debug!(err = %status, "ADS connection closed unexpectedly");
318                    }
319
320                    tokio::time::sleep(if is_broken_pipe {
321                        Duration::from_millis(50)
322                    } else {
323                        Duration::from_secs(2)
324                    })
325                    .await;
326                }
327            };
328        }
329
330        Ok(())
331    }
332
333    // TODO: can we split this even further from IO so we can run it without an
334    // active server? it would be nice to process subscription updates even
335    // while the connection is dead, and might allow adding static resources
336    // directly to a cache instead of keeping a separate static cache.
337    //
338    // To do it in a resasonable way, we need to pull the GRPC connection out
339    // of here. right now this async fn is implicitly a single-connection state
340    // machine - we could keep that and have a separate disconnected loop that
341    // we transition into, or we could pass a "NewConnection" message into here
342    // and manually manage connected vs. disconnected state.
343    async fn run_connection(&mut self) -> Result<(), ConnectionError> {
344        let (xds_tx, xds_rx) = tokio::sync::mpsc::channel(10);
345
346        // set up the gRPC stream
347        let channel = self.new_connection().await?;
348        let mut client = AggregatedDiscoveryServiceClient::new(channel);
349        let stream_response = client
350            .delta_aggregated_resources(ReceiverStream::new(xds_rx))
351            .await?;
352        let mut incoming = stream_response.into_inner();
353
354        // set DNS names
355        self.dns.set_names(self.cache.dns_names());
356
357        // set up the xDS connection and start sending messages
358        let (mut conn, initial_requests) =
359            AdsConnection::new(self.node_info.clone(), &mut self.cache);
360
361        for msg in initial_requests {
362            log_request!(msg);
363            if xds_tx.send(msg).await.is_err() {
364                return Err(ConnectionError::AdsDisconnected);
365            }
366        }
367
368        loop {
369            tracing::trace!("handle_update_batch");
370            let is_eof = handle_update_batch(&mut conn, &mut self.subs, &mut incoming).await?;
371            if is_eof {
372                return Ok(());
373            }
374
375            let (outgoing, dns_updates) = conn.outgoing();
376            for msg in outgoing {
377                log_request!(msg);
378                if xds_tx.send(msg).await.is_err() {
379                    return Err(ConnectionError::AdsDisconnected);
380                }
381            }
382            update_dns(&self.dns, dns_updates.add, dns_updates.remove);
383        }
384    }
385
386    pub(super) async fn connect(&mut self) -> Result<(), tonic::transport::Error> {
387        if self.initial_channel.is_none() {
388            let channel = self.endpoint.connect().await?;
389            self.initial_channel = Some(channel)
390        }
391
392        Ok(())
393    }
394
395    async fn new_connection(
396        &mut self,
397    ) -> Result<tonic::transport::Channel, tonic::transport::Error> {
398        match self.initial_channel.take() {
399            Some(channel) => Ok(channel),
400            None => self.endpoint.connect().await,
401        }
402    }
403}
404
405// handle a batch of incoming messages/subscriptions.
406//
407// awaits until an update is recvd from either subscriptions or xds, and then
408// immediately grabs any pending updates as well. returns as soon as there's
409// nothing to immediately do and handling updates would block.
410async fn handle_update_batch(
411    conn: &mut AdsConnection<'_>,
412    subs: &mut Receiver<SubscriptionUpdate>,
413    incoming: &mut Streaming<DeltaDiscoveryResponse>,
414) -> Result<bool, ConnectionError> {
415    // handle the next possible input. runs a biased select over gRPC and
416    // subscription inputs.
417    //
418    // this function is inlined here because:
419    // - abstracting a handle_batch method is miserable, the type system makes
420    //   it hard to abstract over a bunch of mut references like this.
421    // - there is no reason, even just testing, to run this function by
422    //   itself
423    //
424    // it's a bit weird to inline, but only a bit
425    async fn next_update(
426        conn: &mut AdsConnection<'_>,
427        subs: &mut Receiver<SubscriptionUpdate>,
428        incoming: &mut Streaming<DeltaDiscoveryResponse>,
429    ) -> Result<bool, ConnectionError> {
430        tokio::select! {
431            biased;
432
433            xds_msg = incoming.try_next() => {
434                // on GRPC status errors, the connection has died and we're
435                // going to reconnect. pass the error up to reset things
436                // and move on.
437                let response = match xds_msg? {
438                    Some(response) => response,
439                    None => return Err(ConnectionError::AdsDisconnected),
440                };
441                log_response!(response);
442
443                tracing::trace!("ads connection: handle_ads_message");
444                conn.handle_ads_message(response);
445            }
446            sub_update = subs.recv() => {
447                let Some(sub_update) = sub_update else {
448                    return Ok(true)
449                };
450
451                tracing::trace!(
452                    ?sub_update,
453                    "ads connection: handle_subscription_update",
454                );
455                conn.handle_subscription_update(sub_update);
456            }
457        }
458        Ok(false)
459    }
460
461    // await the next update
462    if next_update(conn, subs, incoming).await? {
463        return Ok(true);
464    }
465
466    // try to handle any immediately pending updates. do not await, there is
467    // probably some work to be done to handle effects now, so we should
468    // return back to the caller.
469    loop {
470        let Some(should_exit) = next_update(conn, subs, incoming).now_or_never() else {
471            break;
472        };
473
474        if should_exit? {
475            return Ok(true);
476        }
477    }
478
479    Ok(false)
480}
481
482#[inline]
483fn update_dns(
484    dns: &StdlibResolver,
485    add: BTreeSet<(Hostname, u16)>,
486    remove: BTreeSet<(Hostname, u16)>,
487) {
488    for (name, port) in add {
489        dns.subscribe(name, port);
490    }
491    for (name, port) in remove {
492        dns.unsubscribe(&name, port);
493    }
494}
495
496#[derive(Debug, thiserror::Error)]
497enum ConnectionError {
498    #[error(transparent)]
499    Connect(#[from] tonic::transport::Error),
500
501    #[error(transparent)]
502    Status(#[from] tonic::Status),
503
504    #[error("ADS server closed the stream")]
505    AdsDisconnected,
506}
507
508/// Returns `true` if this tonic [Status] was caused by a [std::io::Error].
509///
510/// Adapted from the `tonic` examples.
511///
512/// https://github.com/hyperium/tonic/blob/941726cc46b995dcc393c9d2b462d440bd3514f3/examples/src/streaming/server.rs#L15
513fn unwrap_io_error(status: &tonic::Status) -> Option<&std::io::Error> {
514    let mut err: &(dyn std::error::Error + 'static) = status;
515
516    loop {
517        if let Some(e) = err.downcast_ref::<std::io::Error>() {
518            return Some(e);
519        }
520
521        // https://github.com/hyperium/h2/pull/462
522        if let Some(e) = err.downcast_ref::<h2::Error>().and_then(|e| e.get_io()) {
523            return Some(e);
524        }
525
526        err = err.source()?;
527    }
528}
529
530struct AdsConnection<'a> {
531    cache: &'a mut Cache,
532    node: Option<xds_core::Node>,
533    acks: EnumMap<ResourceType, Option<AckState>>,
534    unknown_types: Vec<(String, String)>,
535}
536
537#[derive(Debug, Default)]
538struct AckState {
539    nonce: String,
540    error: Option<Cow<'static, str>>,
541}
542
543impl AckState {
544    fn into_ack(self) -> (String, Option<GrpcStatus>) {
545        let nonce = self.nonce;
546        let error = self.error.map(|message| GrpcStatus {
547            message: message.to_string(),
548            code: tonic::Code::InvalidArgument.into(),
549            ..Default::default()
550        });
551
552        (nonce, error)
553    }
554}
555
556impl<'a> AdsConnection<'a> {
557    fn new(node: xds_core::Node, cache: &'a mut Cache) -> (Self, Vec<DeltaDiscoveryRequest>) {
558        let mut requests = Vec::with_capacity(ResourceType::all().len());
559
560        let mut node = Some(node);
561        for &rtype in ResourceType::all() {
562            let initial_versions = cache.versions(rtype);
563            let mut subscribe = cache.initial_subscriptions(rtype);
564            if cache.is_wildcard(rtype) && !subscribe.is_empty() {
565                subscribe.push("*".to_string());
566            }
567
568            if !cache.is_wildcard(rtype) && subscribe.is_empty() && initial_versions.is_empty() {
569                continue;
570            }
571
572            requests.push(DeltaDiscoveryRequest {
573                node: node.take(),
574                type_url: rtype.type_url().to_string(),
575                resource_names_subscribe: subscribe,
576                initial_resource_versions: initial_versions,
577                ..Default::default()
578            });
579        }
580
581        let conn = Self {
582            cache,
583            node,
584            acks: Default::default(),
585            unknown_types: Vec::new(),
586        };
587        (conn, requests)
588    }
589
590    fn outgoing(&mut self) -> (Vec<DeltaDiscoveryRequest>, DnsUpdates) {
591        let mut responses = Vec::with_capacity(ResourceType::all().len());
592
593        // tee up invalid type messages.
594        //
595        // this should be a hyper rare ocurrence, so `take` the vec to reset the
596        // allocation to nothing instead of `drain` which keeps the capacity.
597        for (response_nonce, type_url) in std::mem::take(&mut self.unknown_types) {
598            let error_detail = Some(xds_api::pb::google::rpc::Status {
599                code: tonic::Code::InvalidArgument.into(),
600                message: "unknown type".to_string(),
601                ..Default::default()
602            });
603            responses.push(DeltaDiscoveryRequest {
604                type_url,
605                response_nonce,
606                error_detail,
607                ..Default::default()
608            })
609        }
610
611        // map changes into responses. DNS updates get passed through directly
612        let (resources, dns) = self.cache.collect();
613
614        // EnumMap::into_iter will always cover all variants as keys in xDS
615        // make-before-break order, so just iterating over `resources` here gets
616        // us responses in an appropriate order.
617        for (rtype, changes) in resources {
618            let ack = self.get_ack(rtype);
619
620            if ack.is_none() && changes.is_empty() {
621                continue;
622            }
623
624            let node = self.node.take();
625            let (response_nonce, error_detail) = ack.map(|a| a.into_ack()).unwrap_or_default();
626            let resource_names_subscribe = changes.added.into_iter().collect();
627            let resource_names_unsubscribe = changes.removed.into_iter().collect();
628
629            responses.push(DeltaDiscoveryRequest {
630                node,
631                type_url: rtype.type_url().to_string(),
632                response_nonce,
633                error_detail,
634                resource_names_subscribe,
635                resource_names_unsubscribe,
636                ..Default::default()
637            })
638        }
639
640        (responses, dns)
641    }
642
643    fn handle_ads_message(&mut self, resp: DeltaDiscoveryResponse) {
644        let Some(rtype) = ResourceType::from_type_url(&resp.type_url) else {
645            tracing::trace!(type_url = %resp.type_url, "unknown type url");
646            self.set_unknown(resp.nonce, resp.type_url);
647            return;
648        };
649
650        // add resources
651        let resources = match ResourceVec::from_resources(rtype, resp.resources) {
652            Ok(r) => r,
653            Err(e) => {
654                tracing::trace!(err = %e, "invalid proto");
655                self.set_ack(
656                    rtype,
657                    resp.nonce,
658                    Some(format!("invalid resource: {e}").into()),
659                );
660                return;
661            }
662        };
663
664        let resource_errors = self.cache.insert(resources);
665        let error = match &resource_errors[..] {
666            &[] => None,
667            // TOOD: actually generate a useful error message here
668            _ => Some("invalid resources".into()),
669        };
670        self.set_ack(rtype, resp.nonce, error);
671
672        // remove resources
673        self.cache.remove(rtype, &resp.removed_resources);
674    }
675
676    fn handle_subscription_update(&mut self, update: SubscriptionUpdate) {
677        match update {
678            SubscriptionUpdate::AddHosts(hosts) => {
679                for host in hosts {
680                    self.cache.subscribe(ResourceType::Listener, &host);
681                }
682            }
683            SubscriptionUpdate::RemoveHosts(hosts) => {
684                for host in hosts {
685                    self.cache.unsubscribe(ResourceType::Listener, &host);
686                }
687            }
688            SubscriptionUpdate::AddBackends(backends) => {
689                for backend in backends {
690                    if let Service::Dns(dns) = &backend.service {
691                        self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
692                    }
693                    self.cache.subscribe(ResourceType::Cluster, &backend.name());
694                }
695            }
696            SubscriptionUpdate::RemoveBackends(backends) => {
697                for backend in backends {
698                    if let Service::Dns(dns) = &backend.service {
699                        self.cache
700                            .unsubscribe_dns(dns.hostname.clone(), backend.port);
701                    }
702                    self.cache
703                        .unsubscribe(ResourceType::Cluster, &backend.name());
704                }
705            }
706            SubscriptionUpdate::AddEndpoints(backends) => {
707                for backend in backends {
708                    match &backend.service {
709                        Service::Dns(dns) => {
710                            self.cache.subscribe_dns(dns.hostname.clone(), backend.port);
711                        }
712                        _ => self
713                            .cache
714                            .subscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
715                    }
716                }
717            }
718            SubscriptionUpdate::RemoveEndpoints(backends) => {
719                for backend in backends {
720                    match &backend.service {
721                        Service::Dns(dns) => {
722                            self.cache
723                                .unsubscribe_dns(dns.hostname.clone(), backend.port);
724                        }
725                        _ => self
726                            .cache
727                            .unsubscribe(ResourceType::ClusterLoadAssignment, &backend.name()),
728                    }
729                }
730            }
731        }
732    }
733
734    fn set_unknown(&mut self, nonce: String, type_url: String) {
735        self.unknown_types.push((nonce, type_url))
736    }
737
738    fn set_ack(&mut self, rtype: ResourceType, nonce: String, error: Option<Cow<'static, str>>) {
739        self.acks[rtype] = Some(AckState { nonce, error })
740    }
741
742    fn get_ack(&mut self, rtype: ResourceType) -> Option<AckState> {
743        self.acks[rtype].take()
744    }
745}
746
747#[derive(Debug, Default, PartialEq, Eq)]
748struct DnsUpdates {
749    add: BTreeSet<(Hostname, u16)>,
750    remove: BTreeSet<(Hostname, u16)>,
751    sync: bool,
752}
753
754#[cfg(test)]
755impl DnsUpdates {
756    fn is_noop(&self) -> bool {
757        self.add.is_empty() && self.remove.is_empty() && !self.sync
758    }
759}
760
761#[cfg(test)]
762mod test_ads_conn {
763    use std::collections::HashMap;
764
765    use cache::Cache;
766    use once_cell::sync::Lazy;
767    use pretty_assertions::assert_eq;
768    use xds_api::pb::envoy::service::discovery::v3 as xds_discovery;
769
770    use super::test as xds_test;
771    use super::*;
772
773    static TEST_NODE: Lazy<xds_core::Node> = Lazy::new(|| xds_core::Node {
774        id: "unit-test".to_string(),
775        ..Default::default()
776    });
777
778    /// create a new connection with TEST_NODE and the given cache. asserts that
779    /// the first outgoing message has its Node set to TEST_NODE.
780    #[track_caller]
781    fn new_conn(cache: &mut Cache) -> (AdsConnection, Vec<DeltaDiscoveryRequest>) {
782        let (conn, mut outgoing) = AdsConnection::new(TEST_NODE.clone(), cache);
783
784        // assert the node is there
785        if let Some(first) = outgoing.first_mut() {
786            let node = first
787                .node
788                .take()
789                .expect("expected first outgoing request to have a node");
790
791            assert_eq!(node, *TEST_NODE);
792        };
793
794        (conn, outgoing)
795    }
796
797    #[test]
798    fn test_init_empty_wildcard() {
799        let mut cache = Cache::default();
800        cache.set_wildcard(ResourceType::Listener, true);
801        cache.set_wildcard(ResourceType::Cluster, true);
802
803        let (_, outgoing) = new_conn(&mut cache);
804
805        assert_eq!(
806            outgoing,
807            vec![
808                xds_test::req!(t = ResourceType::Cluster),
809                xds_test::req!(t = ResourceType::Listener),
810            ]
811        )
812    }
813
814    #[test]
815    fn test_init_empty_explicit() {
816        let mut cache = Cache::default();
817        cache.set_wildcard(ResourceType::Listener, false);
818        cache.set_wildcard(ResourceType::Cluster, false);
819
820        let (_, outgoing) = new_conn(&mut cache);
821        assert!(outgoing.is_empty());
822    }
823
824    #[test]
825    fn test_init_subscription_wildcard() {
826        let mut cache = Cache::default();
827        cache.set_wildcard(ResourceType::Listener, false);
828        cache.set_wildcard(ResourceType::Cluster, true);
829
830        cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
831        cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
832
833        // only the Clusters should have the wildcard sub, CLA should not, since it's
834        // not a wildcard-capable resource type
835        let (_, outgoing) = new_conn(&mut cache);
836        assert_eq!(
837            outgoing,
838            vec![
839                xds_test::req!(
840                    t = ResourceType::Cluster,
841                    add = vec!["cluster.example:7891", "*"],
842                    init = vec![],
843                ),
844                xds_test::req!(
845                    t = ResourceType::ClusterLoadAssignment,
846                    add = vec!["cluster.example:7891",],
847                    init = vec![],
848                )
849            ]
850        );
851    }
852
853    #[test]
854    fn test_init_subscription_explicit() {
855        let mut cache = Cache::default();
856        cache.set_wildcard(ResourceType::Listener, false);
857        cache.set_wildcard(ResourceType::Cluster, false);
858
859        cache.subscribe(ResourceType::Cluster, "cluster.example:7891");
860        cache.subscribe(ResourceType::ClusterLoadAssignment, "cluster.example:7891");
861
862        let (_, outgoing) = new_conn(&mut cache);
863        assert_eq!(
864            outgoing,
865            vec![
866                xds_test::req!(
867                    t = ResourceType::Cluster,
868                    add = vec!["cluster.example:7891",],
869                    init = vec![],
870                ),
871                xds_test::req!(
872                    t = ResourceType::ClusterLoadAssignment,
873                    add = vec!["cluster.example:7891",],
874                    init = vec![],
875                ),
876            ]
877        );
878    }
879
880    #[test]
881    fn test_init_initial_versions() {
882        let mut cache = Cache::default();
883        assert!(cache.is_wildcard(ResourceType::Listener));
884        assert!(!cache.is_wildcard(ResourceType::RouteConfiguration));
885
886        cache.insert(ResourceVec::from_listeners(
887            "123".into(),
888            vec![xds_test::listener!("cooler.example.org", "cool-route")],
889        ));
890        cache.insert(ResourceVec::from_listeners(
891            "456".into(),
892            vec![xds_test::listener!("warmer.example.org", "warm-route")],
893        ));
894        cache.insert(ResourceVec::from_route_configs(
895            "789".into(),
896            vec![xds_test::route_config!(
897                "cool-route",
898                vec![xds_test::vhost!(
899                    "an-vhost",
900                    ["cooler.example.org"],
901                    [xds_test::route!(default "cooler.example.internal:8008")]
902                )]
903            )],
904        ));
905
906        // both wildcard and non-wildcard should start with an empty add list
907        // but resources in init
908        let (_, outgoing) = new_conn(&mut cache);
909        assert_eq!(
910            outgoing,
911            vec![
912                xds_test::req!(
913                    t = ResourceType::Cluster,
914                    add = vec!["cooler.example.internal:8008", "*"],
915                    init = vec![],
916                ),
917                xds_test::req!(
918                    t = ResourceType::Listener,
919                    add = vec![],
920                    init = vec![("cooler.example.org", "123"), ("warmer.example.org", "456"),]
921                ),
922                xds_test::req!(
923                    t = ResourceType::RouteConfiguration,
924                    add = vec!["warm-route"],
925                    init = vec![("cool-route", "789")]
926                ),
927            ],
928        );
929    }
930
931    #[test]
932    fn test_handle_subscribe_hostname() {
933        let mut cache = Cache::default();
934        let (mut conn, _) = new_conn(&mut cache);
935
936        conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![
937            Service::dns("website.internal").unwrap().name(),
938            Service::kube("default", "nginx")
939                .unwrap()
940                .as_backend_id(4443)
941                .name(),
942        ]));
943
944        let (outgoing, dns) = conn.outgoing();
945        // dns should not update on listeners
946        assert!(dns.is_noop());
947        assert_eq!(
948            outgoing,
949            vec![xds_test::req!(
950                t = ResourceType::Listener,
951                add = vec!["nginx.default.svc.cluster.local:4443", "website.internal"],
952            )]
953        );
954    }
955
956    #[test]
957    fn test_handle_subscribe_backend() {
958        let mut cache = Cache::default();
959        let (mut conn, _) = new_conn(&mut cache);
960
961        conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![
962            Service::dns("website.internal").unwrap().as_backend_id(80),
963            Service::kube("default", "nginx")
964                .unwrap()
965                .as_backend_id(4443),
966        ]));
967
968        let (outgoing, dns) = conn.outgoing();
969        // dns shouldn't preemptively update on dns backends
970        assert_eq!(
971            dns,
972            DnsUpdates {
973                add: [(Hostname::from_static("website.internal"), 80)]
974                    .into_iter()
975                    .collect(),
976                ..Default::default()
977            }
978        );
979
980        // should generate xds for clusters
981        assert_eq!(
982            outgoing,
983            vec![xds_test::req!(
984                t = ResourceType::Cluster,
985                add = vec![
986                    "nginx.default.svc.cluster.local:4443",
987                    "website.internal:80"
988                ],
989            )]
990        );
991    }
992
993    #[test]
994    fn test_handle_ads_message_listener_route() {
995        let mut cache = Cache::default();
996        assert!(cache.is_wildcard(ResourceType::Listener));
997
998        let (mut conn, _) = new_conn(&mut cache);
999
1000        conn.handle_ads_message(xds_test::resp!(
1001            n = "1",
1002            add = ResourceVec::from_listeners(
1003                "123".into(),
1004                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1005            ),
1006            remove = vec![],
1007        ));
1008        conn.handle_ads_message(xds_test::resp!(
1009            n = "2",
1010            add = ResourceVec::from_listeners(
1011                "456".into(),
1012                vec![xds_test::listener!("warmer.example.org", "warm-route")],
1013            ),
1014            remove = vec![],
1015        ));
1016        conn.handle_ads_message(xds_test::resp!(
1017            n = "3",
1018            add = ResourceVec::from_route_configs(
1019                "789".into(),
1020                vec![xds_test::route_config!(
1021                    "cool-route",
1022                    vec![xds_test::vhost!(
1023                        "an-vhost",
1024                        ["cooler.example.org"],
1025                        [xds_test::route!(default "cooler.example.internal:8008")]
1026                    )]
1027                )],
1028            ),
1029            remove = vec![],
1030        ));
1031
1032        let (outgoing, dns) = conn.outgoing();
1033        // no dns changes until we get a cluster
1034        assert!(dns.is_noop());
1035
1036        assert_eq!(
1037            outgoing,
1038            vec![
1039                // new resource subs
1040                xds_test::req!(
1041                    t = ResourceType::Cluster,
1042                    add = vec!["cooler.example.internal:8008"]
1043                ),
1044                // listener ack
1045                xds_test::req!(t = ResourceType::Listener, n = "2"),
1046                // route config acks and new sub
1047                xds_test::req!(
1048                    t = ResourceType::RouteConfiguration,
1049                    n = "3",
1050                    add = vec!["warm-route"]
1051                ),
1052            ],
1053        );
1054    }
1055
1056    #[test]
1057    fn test_handle_ads_message_listener_swap_route() {
1058        let mut cache = Cache::default();
1059        assert!(cache.is_wildcard(ResourceType::Listener));
1060
1061        let (mut conn, _) = new_conn(&mut cache);
1062
1063        // get set up with a listener pointing to a route
1064        conn.handle_ads_message(xds_test::resp!(
1065            n = "1",
1066            add = ResourceVec::from_listeners(
1067                "111".into(),
1068                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1069            ),
1070            remove = vec![],
1071        ));
1072        conn.handle_ads_message(xds_test::resp!(
1073            n = "2",
1074            add = ResourceVec::from_route_configs(
1075                "222".into(),
1076                vec![xds_test::route_config!(
1077                    "cool-route",
1078                    vec![xds_test::vhost!(
1079                        "an-vhost",
1080                        ["cooler.example.org"],
1081                        [xds_test::route!(default "cooler.example.internal:8008")]
1082                    )]
1083                )],
1084            ),
1085            remove = vec![],
1086        ));
1087
1088        let (outgoing, dns) = conn.outgoing();
1089        assert!(dns.is_noop());
1090        assert_eq!(
1091            outgoing,
1092            vec![
1093                // new resource subs
1094                xds_test::req!(
1095                    t = ResourceType::Cluster,
1096                    add = vec!["cooler.example.internal:8008"]
1097                ),
1098                // listener ack
1099                xds_test::req!(t = ResourceType::Listener, n = "1"),
1100                // route config ack
1101                xds_test::req!(t = ResourceType::RouteConfiguration, n = "2"),
1102            ],
1103        );
1104
1105        assert_eq!(
1106            conn.cache.versions(ResourceType::Listener),
1107            HashMap::from_iter([("cooler.example.org".to_string(), "111".to_string())])
1108        );
1109        assert_eq!(
1110            conn.cache.versions(ResourceType::RouteConfiguration),
1111            HashMap::from_iter([("cool-route".to_string(), "222".to_string())])
1112        );
1113
1114        // swap the route and immediately swap it back
1115        //
1116        // the cached resources should not change, but the Listener version should
1117        // update and the outgoing messages should include a Listener ACK.
1118        conn.handle_ads_message(xds_test::resp!(
1119            n = "3",
1120            add = ResourceVec::from_listeners(
1121                "333".into(),
1122                vec![xds_test::listener!("cooler.example.org", "lame-route")],
1123            ),
1124            remove = vec![],
1125        ));
1126        conn.handle_ads_message(xds_test::resp!(
1127            n = "4",
1128            add = ResourceVec::from_listeners(
1129                "444".into(),
1130                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1131            ),
1132            remove = vec![],
1133        ));
1134
1135        let (outgoing, dns) = conn.outgoing();
1136        assert!(dns.is_noop());
1137        assert_eq!(
1138            outgoing,
1139            vec![
1140                // listener ack
1141                xds_test::req!(t = ResourceType::Listener, n = "4"),
1142                // route config for lame-route was both added and removed
1143                xds_test::req!(
1144                    t = ResourceType::RouteConfiguration,
1145                    n = "",
1146                    add = vec!["lame-route"],
1147                    remove = vec!["lame-route"]
1148                ),
1149            ]
1150        );
1151
1152        assert_eq!(
1153            conn.cache.versions(ResourceType::Listener),
1154            HashMap::from_iter([("cooler.example.org".to_string(), "444".to_string())])
1155        );
1156        assert_eq!(
1157            conn.cache.versions(ResourceType::RouteConfiguration),
1158            HashMap::from_iter([("cool-route".to_string(), "222".to_string())])
1159        );
1160    }
1161
1162    #[test]
1163    fn test_handle_ads_message_add_remove_add() {
1164        tracing_subscriber::fmt::init();
1165        tracing::trace!("HELLO?");
1166
1167        let mut cache = Cache::default();
1168        assert!(cache.is_wildcard(ResourceType::Listener));
1169        assert!(cache.is_wildcard(ResourceType::Cluster));
1170
1171        let (mut conn, _) = new_conn(&mut cache);
1172
1173        // set up a listener -> route -> cluster resource chain
1174        conn.handle_ads_message(xds_test::resp!(
1175            n = "1",
1176            add = ResourceVec::from_listeners(
1177                "111".into(),
1178                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1179            ),
1180            remove = vec![],
1181        ));
1182        conn.handle_ads_message(xds_test::resp!(
1183            n = "2",
1184            add = ResourceVec::from_route_configs(
1185                "222".into(),
1186                vec![xds_test::route_config!(
1187                    "cool-route",
1188                    vec![xds_test::vhost!(
1189                        "an-vhost",
1190                        ["cooler.example.org"],
1191                        [xds_test::route!(default "cooler.example.internal:8008")]
1192                    )]
1193                )],
1194            ),
1195            remove = vec![],
1196        ));
1197        conn.handle_ads_message(xds_test::resp!(
1198            n = "3",
1199            add = ResourceVec::from_clusters(
1200                "333".into(),
1201                vec![xds_test::cluster!("cooler.example.internal:8008"),],
1202            ),
1203            remove = vec![],
1204        ));
1205        let (outgoing, _dns) = conn.outgoing();
1206        assert_eq!(
1207            outgoing,
1208            vec![
1209                // cluster ack
1210                xds_test::req!(t = ResourceType::Cluster, n = "3"),
1211                // should try sub to the cluster lb route
1212                xds_test::req!(
1213                    t = ResourceType::Listener,
1214                    n = "1",
1215                    add = vec!["cooler.example.internal.lb.jct:8008"]
1216                ),
1217                // route config ack
1218                xds_test::req!(t = ResourceType::RouteConfiguration, n = "2"),
1219            ],
1220        );
1221
1222        // swap out the listener for a new route and remove the old route
1223        conn.handle_ads_message(xds_test::resp!(
1224            n = "4",
1225            add = ResourceVec::from_listeners(
1226                "444".into(),
1227                vec![xds_test::listener!("cooler.example.org", "very-cool-route")],
1228            ),
1229            remove = vec![],
1230        ));
1231        conn.handle_ads_message(xds_test::resp!(
1232            n = "5",
1233            add = ResourceVec::from_route_configs("444".into(), vec![]),
1234            remove = vec!["very-cool-route"],
1235        ));
1236        let (outgoing, _dns) = conn.outgoing();
1237        assert_eq!(
1238            outgoing,
1239            vec![
1240                // cluster remove
1241                xds_test::req!(
1242                    t = ResourceType::Cluster,
1243                    remove = vec!["cooler.example.internal:8008"]
1244                ),
1245                // listener ACK, also removes cluster lb listener
1246                xds_test::req!(
1247                    t = ResourceType::Listener,
1248                    n = "4",
1249                    add = vec![],
1250                    remove = vec!["cooler.example.internal.lb.jct:8008"]
1251                ),
1252                // route config add and remove
1253                xds_test::req!(
1254                    t = ResourceType::RouteConfiguration,
1255                    n = "5",
1256                    add = vec!["very-cool-route"],
1257                    remove = vec!["cool-route"]
1258                ),
1259            ],
1260        );
1261
1262        // server is required to re-send the remove on a wildcard. it sends the
1263        // removes and adds the new route config.
1264        conn.handle_ads_message(xds_test::resp!(
1265            n = "6",
1266            add = ResourceVec::from_clusters("444".into(), vec![]),
1267            remove = vec!["cooler.example.internal:8008"],
1268        ));
1269        conn.handle_ads_message(xds_test::resp!(
1270            n = "7",
1271            add = ResourceVec::from_listeners("444".into(), vec![]),
1272            remove = vec!["cooler.example.internal.lb.jct:8008"],
1273        ));
1274        conn.handle_ads_message(xds_test::resp!(
1275            n = "8",
1276            add = ResourceVec::from_route_configs(
1277                "555".into(),
1278                vec![xds_test::route_config!(
1279                    "very-cool-route",
1280                    vec![xds_test::vhost!(
1281                        "an-vhost",
1282                        ["cooler.example.org"],
1283                        [xds_test::route!(default "cooler.example.internal:8008")]
1284                    )]
1285                )],
1286            ),
1287            remove = vec![],
1288        ));
1289        let (outgoing, _dns) = conn.outgoing();
1290        assert_eq!(
1291            outgoing,
1292            vec![
1293                // re-sub to the cluster
1294                xds_test::req!(
1295                    t = ResourceType::Cluster,
1296                    n = "6",
1297                    add = vec!["cooler.example.internal:8008"]
1298                ),
1299                // ack the listener
1300                xds_test::req!(t = ResourceType::Listener, n = "7"),
1301                // ack the route config
1302                xds_test::req!(t = ResourceType::RouteConfiguration, n = "8"),
1303            ],
1304        );
1305    }
1306
1307    #[test]
1308    fn test_handle_ads_message_listener_removed() {
1309        let mut cache = Cache::default();
1310        assert!(cache.is_wildcard(ResourceType::Listener));
1311
1312        let (mut conn, _) = new_conn(&mut cache);
1313
1314        conn.handle_ads_message(xds_test::resp!(
1315            n = "1",
1316            add = ResourceVec::from_listeners(
1317                "123".into(),
1318                vec![xds_test::listener!("cooler.example.org", "cool-route")],
1319            ),
1320            remove = vec![],
1321        ));
1322        conn.handle_ads_message(xds_test::resp!(
1323            n = "2",
1324            add = ResourceVec::from_listeners(
1325                "456".into(),
1326                vec![xds_test::listener!("warmer.example.org", "warm-route")],
1327            ),
1328            remove = vec![],
1329        ));
1330        conn.handle_ads_message(xds_test::resp!(
1331            n = "3",
1332            add = ResourceVec::from_route_configs(
1333                "789".into(),
1334                vec![xds_test::route_config!(
1335                    "cool-route",
1336                    vec![xds_test::vhost!(
1337                        "an-vhost",
1338                        ["cooler.example.org"],
1339                        [xds_test::route!(default "cooler.example.internal:8008")]
1340                    )]
1341                )],
1342            ),
1343            remove = vec![],
1344        ));
1345
1346        let (outgoing, dns) = conn.outgoing();
1347        // no dns changes until we get a cluster
1348        assert!(dns.is_noop());
1349
1350        assert_eq!(
1351            outgoing,
1352            vec![
1353                // new resource subs
1354                xds_test::req!(
1355                    t = ResourceType::Cluster,
1356                    add = vec!["cooler.example.internal:8008"]
1357                ),
1358                // listener ack
1359                xds_test::req!(t = ResourceType::Listener, n = "2"),
1360                // route config acks and new sub
1361                xds_test::req!(
1362                    t = ResourceType::RouteConfiguration,
1363                    n = "3",
1364                    add = vec!["warm-route"]
1365                ),
1366            ],
1367        );
1368
1369        // the server gets a delete for the listener we already have
1370        conn.handle_ads_message(xds_test::resp!(
1371            n = "4",
1372            add = ResourceVec::from_listeners("123".into(), vec![]),
1373            remove = vec!["warmer.example.org"],
1374        ));
1375
1376        let (outgoing, dns) = conn.outgoing();
1377        assert!(dns.is_noop());
1378        assert_eq!(
1379            outgoing,
1380            vec![
1381                // listener ack
1382                xds_test::req!(t = ResourceType::Listener, n = "4"),
1383                // route config remove
1384                xds_test::req!(
1385                    t = ResourceType::RouteConfiguration,
1386                    remove = vec!["warm-route"],
1387                ),
1388            ]
1389        );
1390    }
1391
1392    #[test]
1393    fn test_handle_ads_message_cluster_cla() {
1394        let mut cache = Cache::default();
1395        assert!(cache.is_wildcard(ResourceType::Cluster));
1396
1397        let (mut conn, _) = new_conn(&mut cache);
1398
1399        conn.handle_ads_message(xds_test::resp!(
1400            n = "1",
1401            add = ResourceVec::from_clusters(
1402                "123".into(),
1403                vec![
1404                    xds_test::cluster!("cooler.example.org:2345"),
1405                    xds_test::cluster!("thing.default.svc.cluster.local:9876"),
1406                ],
1407            ),
1408            remove = vec![],
1409        ));
1410        conn.handle_ads_message(xds_test::resp!(
1411            n = "2",
1412            add = ResourceVec::from_load_assignments(
1413                "123".into(),
1414                vec![xds_test::cla!(
1415                    "thing.default.svc.cluster.local:9876" => {
1416                        "zone1" => ["1.1.1.1"]
1417                    }
1418                )],
1419            ),
1420            remove = vec![],
1421        ));
1422        conn.handle_ads_message(xds_test::resp!(
1423            n = "3",
1424            add = ResourceVec::from_listeners("555".into(), vec![
1425                xds_test::listener!("cooler.example.org.lb.jct:2345", "lb-route" => [xds_test::vhost!(
1426                    "lb-vhost",
1427                    ["cooler.example.org.lb.jct:2345"],
1428                    [xds_test::route!(default ring_hash = "x-user", "cooler.example.org:2345")],
1429                )]),
1430                xds_test::listener!("thing.default.svc.cluster.local.lb.jct:9876", "lb-route" => [xds_test::vhost!(
1431                    "lb-vhost",
1432                    ["cooler.example.org.lb.jct:2345"],
1433                    [xds_test::route!(default ring_hash = "x-user", "thing.default.svc.cluster.local:9876")],
1434                )])
1435            ]),
1436            remove = vec![],
1437        ));
1438
1439        let (outgoing, dns) = conn.outgoing();
1440        // dns changes, we got a dns cluster
1441        assert_eq!(
1442            dns,
1443            DnsUpdates {
1444                add: [(Hostname::from_static("cooler.example.org"), 2345)]
1445                    .into_iter()
1446                    .collect(),
1447                ..Default::default()
1448            }
1449        );
1450        // should generate ACKs
1451        assert_eq!(
1452            outgoing,
1453            vec![
1454                xds_test::req!(t = ResourceType::Cluster, n = "1"),
1455                xds_test::req!(t = ResourceType::ClusterLoadAssignment, n = "2"),
1456                xds_test::req!(t = ResourceType::Listener, n = "3"),
1457            ]
1458        );
1459    }
1460
1461    #[test]
1462    fn test_set_node_after_init() {
1463        let mut cache = Cache::default();
1464        for rtype in ResourceType::all() {
1465            cache.set_wildcard(*rtype, false);
1466        }
1467
1468        let (mut conn, outgoing) = new_conn(&mut cache);
1469        assert!(outgoing.is_empty());
1470
1471        let svc = Service::dns("website.internal").unwrap().as_backend_id(80);
1472        conn.handle_subscription_update(SubscriptionUpdate::AddBackends(vec![svc]));
1473
1474        let (outgoing, _) = conn.outgoing();
1475        assert_eq!(outgoing[0].node.as_ref(), Some(&*TEST_NODE));
1476    }
1477
1478    #[test]
1479    fn test_handle_unknown_type_url() {
1480        let mut cache = Cache::default();
1481        let (mut conn, _) = new_conn(&mut cache);
1482
1483        conn.handle_ads_message(DeltaDiscoveryResponse {
1484            type_url: "made.up.type_url/Potato".to_string(),
1485            ..Default::default()
1486        });
1487
1488        let (outgoing, dns) = conn.outgoing();
1489        assert!(dns.is_noop());
1490        assert_eq!(
1491            outgoing,
1492            vec![DeltaDiscoveryRequest {
1493                type_url: "made.up.type_url/Potato".to_string(),
1494                error_detail: Some(xds_api::pb::google::rpc::Status {
1495                    code: tonic::Code::InvalidArgument.into(),
1496                    message: "unknown type".to_string(),
1497                    ..Default::default()
1498                }),
1499                ..Default::default()
1500            }]
1501        );
1502    }
1503
1504    #[test]
1505    fn test_handle_invalid_resource() {
1506        let mut cache = Cache::default();
1507        let (mut conn, _) = new_conn(&mut cache);
1508
1509        let node = xds_core::Node {
1510            id: "some-node".to_string(),
1511            ..Default::default()
1512        };
1513        conn.handle_ads_message(DeltaDiscoveryResponse {
1514            type_url: ResourceType::Listener.type_url().to_string(),
1515            resources: vec![xds_discovery::Resource {
1516                resource: Some(protobuf::Any::from_msg(&node).unwrap()),
1517                ..Default::default()
1518            }],
1519            ..Default::default()
1520        });
1521
1522        let (outgoing, dns) = conn.outgoing();
1523        assert!(dns.is_noop());
1524        assert!(matches!(
1525            &outgoing[..],
1526            [DeltaDiscoveryRequest { type_url, error_detail, ..}] if
1527                type_url == ResourceType::Listener.type_url() &&
1528                error_detail.as_ref().is_some_and(|e| e.message.starts_with("invalid resource"))
1529        ));
1530    }
1531
1532    #[test]
1533    fn test_handle_does_not_exist() {
1534        let mut cache = Cache::default();
1535        let (mut conn, _) = new_conn(&mut cache);
1536
1537        // handle a subscription update
1538        let does_not_exist = Service::dns("website.internal").unwrap().name();
1539        conn.handle_subscription_update(SubscriptionUpdate::AddHosts(vec![does_not_exist.clone()]));
1540        let _ = conn.outgoing();
1541
1542        conn.handle_ads_message(DeltaDiscoveryResponse {
1543            nonce: "boo".to_string(),
1544            type_url: ResourceType::Listener.type_url().to_string(),
1545            removed_resources: vec![does_not_exist.clone()],
1546            ..Default::default()
1547        });
1548
1549        // should generate an ACK immediately
1550        let (outgoing, dns) = conn.outgoing();
1551        assert!(dns.is_noop());
1552        assert_eq!(
1553            outgoing,
1554            vec![xds_test::req!(t = ResourceType::Listener, n = "boo")],
1555        );
1556
1557        // route should be tombstoned
1558        let route = cache
1559            .reader()
1560            .get_route("website.internal")
1561            .now_or_never()
1562            .unwrap();
1563        assert_eq!(route, None);
1564    }
1565}