junction_core/
client.rs

1use crate::{
2    dns,
3    endpoints::{EndpointGroup, EndpointIter},
4    error::Trace,
5    load_balancer::BackendLb,
6    xds::AdsClient,
7    ConfigCache, Endpoint, Error, StaticConfig,
8};
9use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
10use junction_api::{
11    backend::{Backend, BackendId},
12    http::{HeaderMatch, PathMatch, QueryParamMatch, Route, RouteMatch, RouteRule},
13    Hostname,
14};
15use rand::distributions::WeightedError;
16use serde::Deserialize;
17use std::{
18    borrow::Cow,
19    time::{Duration, Instant},
20};
21use std::{net::SocketAddr, sync::Arc};
22
23/// An outgoing HTTP Request, before any rewrites or modifications have been
24/// made.
25///
26/// Requests are a collection of references and are cheap to clone.
27#[derive(Debug, Clone)]
28pub struct HttpRequest<'a> {
29    /// The HTTP Method of the request.
30    method: &'a http::Method,
31
32    /// The request URL, before any rewrites or modifications have been made.
33    url: &'a crate::Url,
34
35    /// The request headers, before
36    headers: &'a http::HeaderMap,
37}
38
39impl<'a> HttpRequest<'a> {
40    /// Create a request from individual parts.
41    pub fn from_parts(
42        method: &'a http::Method,
43        url: &'a crate::Url,
44        headers: &'a http::HeaderMap,
45    ) -> crate::Result<Self> {
46        Ok(Self {
47            method,
48            url,
49            headers,
50        })
51    }
52}
53
54/// The result of resolving a route (see [Client::resolve_route]).
55#[derive(Debug, Clone)]
56pub struct ResolvedRoute {
57    /// The resolved route.
58    pub route: Arc<Route>,
59
60    /// The index of the rule that matched the request.
61    //TODO: doesn't need to be optional? remove it for the request trace anyway
62    pub rule: usize,
63
64    /// The backend selected as part of route resolution.
65    pub backend: BackendId,
66
67    /// smuggle the request trace through here
68    trace: Trace,
69}
70
71/// The context required to select an address from a backend. Includes the URL
72/// and headers from an outgoing request.
73#[derive(Debug, Clone)]
74pub struct LbContext<'a> {
75    url: &'a crate::Url,
76
77    headers: &'a http::HeaderMap,
78
79    previous_addrs: &'a [SocketAddr],
80
81    /// smuggle the request trace through here
82    trace: Trace,
83}
84
85impl<'a> LbContext<'a> {
86    // unused, allowed so that we can make select_endpoint public without exposing Trace
87    #[allow(unused)]
88    pub fn from_parts(url: &'a crate::Url, headers: &'a http::HeaderMap) -> Self {
89        let trace = Trace::new();
90        Self {
91            url,
92            headers,
93            previous_addrs: &[],
94            trace,
95        }
96    }
97
98    fn new(trace: Trace, url: &'a crate::Url, headers: &'a http::HeaderMap) -> Self {
99        Self {
100            url,
101            headers,
102            previous_addrs: &[],
103            trace,
104        }
105    }
106}
107
108/// The result of selecting an endpoint (see [Client::select_endpoint]).
109pub struct SelectedEndpoint {
110    /// The selected endpoint address
111    pub addr: SocketAddr,
112
113    // smuggle trace data back out
114    trace: Trace,
115}
116
117/// The result of making an HTTP request.
118#[derive(Debug, Clone)]
119pub enum HttpResult {
120    /// The client received a complete HTTP response with a status code that was
121    /// not a client error (4xx) or a server error (5xx).
122    StatusOk(http::StatusCode),
123
124    /// The client received a complete HTTP response with a status code that was
125    /// a client error (4xx) or a server error (5xx).
126    StatusError(http::StatusCode),
127
128    /// The client didn't receive a complete HTTP response. This covers any IO
129    /// error or protocol error. From the Junction client's point of view, there
130    /// is no point in distinguishing them.
131    StatusFailed,
132}
133
134impl HttpResult {
135    pub fn is_ok(&self) -> bool {
136        matches!(self, Self::StatusOk(_))
137    }
138
139    pub fn from_u16(code: u16) -> Result<Self, http::status::InvalidStatusCode> {
140        let code = http::StatusCode::from_u16(code)?;
141        Ok(Self::from_code(code))
142    }
143
144    pub fn from_code(code: http::StatusCode) -> Self {
145        if code.is_client_error() || code.is_server_error() {
146            Self::StatusError(code)
147        } else {
148            Self::StatusOk(code)
149        }
150    }
151}
152
153/// A service discovery client that looks up URL information based on URLs,
154/// headers, and methods.
155///
156/// Clients use a shared in-memory cache to keep data warm so that a request
157/// never has to block on a remote service.
158///
159/// Clients are cheaply cloneable, and should be cloned to create multiple
160/// clients that share the same in-memory cache.
161#[derive(Clone)]
162pub struct Client {
163    // resolve options
164    //
165    // TODO: make configurable with a builder or something, not sure if they
166    // will survive.
167    resolve_timeout: Duration,
168
169    // configuration used when searching for additional possible resolved routes currently by
170    // expanding the search over the set of possible authority matches.
171    search_config: SearchConfig,
172
173    // junction data
174    config: Config,
175}
176
177#[derive(Clone, Default, Deserialize)]
178pub struct SearchConfig {
179    // ndots, like dns, but for resolving junction names.
180    //
181    // like dns, names only use the search path if they contain fewer than
182    // `ndots` dots. unlike dns, names are all resolved in-order.
183    pub ndots: u8,
184
185    // the list of suffixes searched during hostname lookup. only consulted if the number of
186    // dots in a url's hostname is less than `ndots`.
187    pub search: Vec<Hostname>,
188}
189
190impl SearchConfig {
191    pub fn new(ndots: u8, search: Vec<Hostname>) -> Self {
192        Self { ndots, search }
193    }
194}
195// the entire static config thing might be a mistake and worth revisting - we
196// could insert resources into cache and let multiple clients in the same process
197// all see static resources. is that good? idk. we already have to_xds() on every
198// resource type, and it would remove a lot of code.
199//
200// revisit this when we hit the problem of static bootstrapping/fallback for
201// clients .
202#[derive(Clone)]
203enum Config {
204    Static(Arc<StaticConfig>),
205    DynamicEndpoints(Arc<StaticConfig>, Arc<DynamicConfig>),
206    Dynamic(Arc<DynamicConfig>),
207}
208
209struct DynamicConfig {
210    ads_client: AdsClient,
211
212    /// a the shared handle to the task that's actually running the client in
213    /// the background. should not drop until every active client drops.
214    ///
215    /// TODO: should this get bundled into AdsClient? shrug emoji?
216    #[allow(unused)]
217    ads_task: tokio::task::JoinHandle<()>,
218}
219
220impl Config {
221    fn ads(&self) -> Option<&AdsClient> {
222        match self {
223            Config::Static(_) => None,
224            Config::DynamicEndpoints(_, d) | Config::Dynamic(d) => Some(&d.ads_client),
225        }
226    }
227}
228
229impl ConfigCache for Config {
230    async fn get_route<S: AsRef<str>>(&self, host: S) -> Option<Arc<Route>> {
231        match &self {
232            Config::Static(s) => s.get_route(host).await,
233            Config::DynamicEndpoints(s, _) => s.get_route(host).await,
234            Config::Dynamic(d) => d.ads_client.get_route(host).await,
235        }
236    }
237
238    async fn get_backend(&self, target: &BackendId) -> Option<Arc<BackendLb>> {
239        match &self {
240            Config::Static(s) => s.get_backend(target).await,
241            Config::DynamicEndpoints(s, _) => s.get_backend(target).await,
242            Config::Dynamic(d) => d.ads_client.get_backend(target).await,
243        }
244    }
245
246    async fn get_endpoints(&self, backend: &BackendId) -> Option<Arc<EndpointGroup>> {
247        match &self {
248            Config::Static(s) => s.get_endpoints(backend).await,
249            Config::DynamicEndpoints(_, d) => d.ads_client.get_endpoints(backend).await,
250            Config::Dynamic(d) => d.ads_client.get_endpoints(backend).await,
251        }
252    }
253}
254
255// FIXME: Vec<Endpoints> is probably the wrong thing to return from all our
256// resolve methods. We probably need a struct that has something like a list
257// of primary endpoints to cycle through on retries, and a separate list of
258// endpoints to mirror traffic to. Figure that out once we support mirroring.
259
260impl Client {
261    /// Build a new dynamic client, spawning a new ADS client in the background.
262    ///
263    ///This method creates a new ADS client and ADS connection. Dynamic data
264    ///will not be shared with existing clients. To create a client that shares
265    ///data with existing clients, [clone][Client::clone] an existing client.
266    ///
267    /// This function assumes that you're currently running the context of a
268    /// `tokio` runtime and spawns background work on a tokio executor.
269    pub async fn build(
270        address: String,
271        node_id: String,
272        cluster: String,
273    ) -> Result<Self, Box<dyn std::error::Error>> {
274        let (ads_client, mut ads_task) = AdsClient::build(address, node_id, cluster).unwrap();
275
276        // try to start the ADS connection while blocking. if it fails, fail
277        // fast here instead of letting the client start.
278        //
279        // once it's started, hand off the task to the executor in the
280        // background.
281        ads_task.connect().await?;
282        let handle = tokio::spawn(async move {
283            match ads_task.run().await {
284                Ok(()) => (),
285                Err(e) => panic!(
286                    "junction-core: ads client exited with an unexpected error: {e}. this is a bug in Junction!"
287                ),
288            }
289        });
290
291        // load search-path config from the system.
292        //
293        // this should eventually be configurable, but for now we're trying
294        // resolv.conf to match kube's default behavior out of the box. on other
295        // systems this may not be useful yet - that's ok.
296        let search_config = match dns::load_config("/etc/resolv.conf") {
297            Ok(config) => SearchConfig::new(config.ndots, config.search),
298            // ignore any errors and set this to defaults
299            Err(_) => SearchConfig::default(),
300        };
301
302        // wrap it all up in a dynamic config and return
303        let config = Config::Dynamic(Arc::new(DynamicConfig {
304            ads_client,
305            ads_task: handle,
306        }));
307        let client = Self {
308            resolve_timeout: Duration::from_secs(5),
309            config,
310            search_config,
311        };
312
313        Ok(client)
314    }
315
316    /// Build a client with static configuration. This client will use the
317    /// passed configuration to resolve routes and backends, but will still
318    /// fetch endpoints dynamically.
319    ///
320    /// This method will panic if the client being cloned is fully static. To
321    /// convert a static client to a client that uses dynamic config, create a
322    /// new client.
323    pub fn with_static_config(self, routes: Vec<Route>, backends: Vec<Backend>) -> Client {
324        let static_config = Arc::new(StaticConfig::with_inferred(routes, backends));
325
326        let dyn_config = match &self.config {
327            Config::Static(_) => panic!("can't use dynamic endpoints with a fully static client"),
328            Config::DynamicEndpoints(_, d) => Arc::clone(d),
329            Config::Dynamic(d) => Arc::clone(d),
330        };
331
332        let config = Config::DynamicEndpoints(static_config, dyn_config);
333        Client { config, ..self }
334    }
335
336    /// Construct a client that uses fully static configuration and does not
337    /// connect to a control plane at all.
338    ///
339    /// This is intended to be used to test configuration in controlled settings
340    /// or to use Junction an offline mode. Once a client has been converted to
341    /// fully static, it's not possible to convert it back to using dynamic
342    /// discovery data.
343    pub fn with_static_endpoints(self, routes: Vec<Route>, backends: Vec<Backend>) -> Client {
344        let static_config = Arc::new(StaticConfig::with_inferred(routes, backends));
345        let config = Config::Static(static_config);
346        Client { config, ..self }
347    }
348
349    /// Resolve an HTTP method, URL, and headers into an [Endpoint].
350    ///
351    /// This is the main entry point into Junction. When building an
352    /// integration, use this method to fetch an initial endpoint. After making
353    /// an initial request, use [report_status][Self::report_status] to report
354    /// the status of the request and to retry on failure.
355    ///
356    /// The endpoint returned from this method should be a complete description
357    /// of how to make an HTTP request - it contains the IP address to use, the
358    /// full URL and hostname, the complete set of headers, and retry and timeout
359    /// policy the client should use to make a request.
360    pub async fn resolve_http(
361        &self,
362        method: &http::Method,
363        url: &crate::Url,
364        headers: &http::HeaderMap,
365    ) -> crate::Result<Endpoint> {
366        let deadline = Instant::now() + self.resolve_timeout;
367
368        let request = HttpRequest::from_parts(method, url, headers)?;
369
370        let resolved = self.resolve_route(request, Some(deadline)).await?;
371
372        let lb_context = LbContext::new(resolved.trace, url, headers);
373        let selected = self
374            .select_endpoint(&resolved.backend, lb_context, Some(deadline))
375            .await?;
376
377        let address = selected.addr;
378        let trace = selected.trace;
379        let (timeouts, retry) = {
380            let rule = &resolved.route.rules[resolved.rule];
381            (rule.timeouts.clone(), rule.retry.clone())
382        };
383
384        Ok(Endpoint {
385            method: method.clone(),
386            url: url.clone(),
387            headers: headers.clone(),
388            address,
389            timeouts,
390            retry,
391            backend: resolved.backend,
392            trace,
393            previous_addrs: vec![],
394        })
395    }
396
397    /// Report the status of an externally made HTTP request made against an
398    /// [Endpoint] returned from `resolve_http`.
399    ///
400    /// If retrying the response is appropriate, a new Endpoint will be returned
401    /// with updated address and host info set - calling `resolve_http` to start
402    /// a retry attempt will drop request history and may result in too many
403    /// retries.
404    ///
405    /// If a retry is not appropriate, the returned Endpoint will have updated
406    /// history information, but request details will remain the same. Clients
407    /// may use that value for status or error reporting.
408    pub async fn report_status(
409        &self,
410        endpoint: Endpoint,
411        response: HttpResult,
412    ) -> crate::Result<Endpoint> {
413        // TODO: track response stats
414
415        // if there's no reason to pick a new endpoint, just return the existing one as-is
416        if response.is_ok() || !endpoint.should_retry(response) {
417            return Ok(endpoint);
418        }
419
420        // redo endpoint selection
421        // FIXME: real deadline here
422        let deadline = Instant::now() + self.resolve_timeout;
423        let lb_context = LbContext {
424            url: &endpoint.url,
425            headers: &endpoint.headers,
426            previous_addrs: &endpoint.previous_addrs,
427            trace: endpoint.trace,
428        };
429        let next = self
430            .select_endpoint(&endpoint.backend, lb_context, Some(deadline))
431            .await?;
432        let address = next.addr;
433        let trace = next.trace;
434
435        // track address history
436        let mut previous_addrs = endpoint.previous_addrs;
437        previous_addrs.push(endpoint.address);
438
439        Ok(Endpoint {
440            address,
441            trace,
442            previous_addrs,
443            ..endpoint
444        })
445    }
446
447    /// Resolve an HTTP method, URL, and headers to a target backend, returning
448    /// the Route that matched, the index of the rule that matched, and the
449    /// backend that was chosen - to make backend choice determinstic with
450    /// multiple backends, set the `JUNCTION_SEED` environment variable.
451    ///
452    /// This is a lower-level method that only performs the Route matching part
453    /// of resolution. It's intended for debugging or querying a client for
454    /// specific information. For everyday use, prefer [Client::resolve_http].
455    pub async fn resolve_route(
456        &self,
457        request: HttpRequest<'_>,
458        deadline: Option<Instant>,
459    ) -> crate::Result<ResolvedRoute> {
460        let trace = Trace::new();
461        resolve_routes(&self.config, trace, request, deadline, &self.search_config).await
462    }
463
464    /// Select an endpoint address for this backend from the set of currently
465    /// available endpoints.
466    ///
467    /// This is a lower level method that only performs part of route
468    /// resolution, and is intended for debugging and testing. For everyday use,
469    /// prefer [Client::resolve_http].
470    pub async fn select_endpoint(
471        &self,
472        backend: &BackendId,
473        ctx: LbContext<'_>,
474        deadline: Option<Instant>,
475    ) -> crate::Result<SelectedEndpoint> {
476        select_endpoint(&self.config, backend, ctx, deadline).await
477    }
478
479    /// Start a gRPC CSDS server on the given port. To run the server, you must
480    /// `await` this future.
481    ///
482    /// For static clients, this does nothing.
483    pub async fn csds_server(self, port: u16) -> Result<(), tonic::transport::Error> {
484        match self.config.ads() {
485            Some(ads) => ads.csds_server(port).await,
486            None => std::future::pending().await,
487        }
488    }
489
490    /// Dump the client's current cache of xDS resources, as fetched from the
491    /// config server.
492    ///
493    /// This is a programmatic view of the same data that you can fetch over
494    /// gRPC by starting a [Client::csds_server].
495    pub fn dump_xds(&self, not_found: bool) -> Vec<crate::XdsConfig> {
496        match self.config.ads() {
497            Some(ads) => {
498                if not_found {
499                    ads.iter_xds().collect()
500                } else {
501                    ads.iter_xds().filter(|c| c.xds.is_some()).collect()
502                }
503            }
504            None => Vec::new(),
505        }
506    }
507
508    /// Dump xDS resources that failed to update. This is a view of the data
509    /// returned by [Client::dump_xds] that only contains resources with
510    /// errors.
511    pub fn dump_xds_errors(&self) -> Vec<crate::XdsConfig> {
512        match self.config.ads() {
513            Some(ads) => ads
514                .iter_xds()
515                .filter(|xds| xds.last_error.is_some())
516                .collect(),
517            None => Vec::new(),
518        }
519    }
520
521    /// Dump the Client's current table of [Route]s, merging together any
522    /// default routes and remotely fetched routes the same way the client would
523    /// when resolving endpoints.
524    pub fn dump_routes(&self) -> Vec<Arc<Route>> {
525        match &self.config {
526            Config::Static(c) | Config::DynamicEndpoints(c, _) => c.routes.clone(),
527            Config::Dynamic(d) => d.ads_client.iter_routes().collect(),
528        }
529    }
530
531    /// Dump the Client's current table of [BackendLb]s, merging together any
532    /// default configuration and remotely fetched config the same way the
533    /// client would when resolving endpoints.
534    pub fn dump_backends(&self) -> Vec<Arc<BackendLb>> {
535        match &self.config {
536            Config::Static(c) | Config::DynamicEndpoints(c, _) => {
537                c.backends.values().cloned().collect()
538            }
539            Config::Dynamic(d) => d.ads_client.iter_backends().collect(),
540        }
541    }
542
543    /// Return the endpoints currently in cache for this backend.
544    ///
545    /// The returned endpoints are a snapshot of what is currently in cache and
546    /// will not update as new discovery information is pushed.
547    pub fn dump_endpoints(&self, backend: &BackendId) -> Option<EndpointIter> {
548        self.config
549            .get_endpoints(backend)
550            .now_or_never()
551            .flatten()
552            .map(EndpointIter::from)
553    }
554}
555
556macro_rules! with_deadline {
557    ($fut:expr, $deadline:expr, $msg:expr, $trace:expr $(,)*) => {
558        tokio::select! {
559            biased;
560
561            res = $fut => res,
562            _ = sleep_until($deadline) => {
563                return Err(Error::timed_out($msg, $trace));
564            }
565        }
566    };
567}
568
569pub(crate) async fn resolve_routes(
570    cache: &impl ConfigCache,
571    mut trace: Trace,
572    request: HttpRequest<'_>,
573    deadline: Option<Instant>,
574    search_config: &SearchConfig,
575) -> crate::Result<ResolvedRoute> {
576    use rand::seq::SliceRandom;
577
578    let uris_to_search = search(search_config, request.url);
579    assert!(
580        !uris_to_search.is_empty(),
581        "URI search is empty, this is a bug in Junction."
582    );
583
584    let mut futures_ordered = FuturesOrdered::new();
585    for url in uris_to_search {
586        futures_ordered.push_back(cache.get_route(url.authority().to_string()));
587    }
588
589    // NB[pt): two potential surprises below:
590    //  1. we do not surface any errors that occur subsequent to the first success in the list of
591    //     routes.
592    //  2. we rely on .next() called on FuturesOrdered to start all the futures contained in
593    //     futures_ordered. We expect all routes to be checked in parallel depending on load in
594    //     tokio.
595    let msg = "timed out fetching route";
596    let route = loop {
597        match with_deadline!(futures_ordered.next(), deadline, msg, trace) {
598            Some(Some(route)) => break route,
599            Some(None) => {
600                continue;
601            }
602            None => {
603                return Err(Error::no_route_matched(
604                    request.url.authority().to_string(),
605                    trace,
606                ))
607            }
608        }
609    };
610
611    trace.lookup_route(&route);
612
613    // match the request against the list of RouteRules that are part of this
614    // request. the hostname and port of the request have already matched but we
615    // need to match headers/url params/method and so on.
616    let (rule, matching_rule) = match find_matching_rule(&route, request.clone()) {
617        Some((idx, r)) => (idx, r),
618        None => return Err(Error::no_rule_matched(route.id.clone(), trace)),
619    };
620    trace.matched_rule(
621        rule,
622        route.rules.get(rule).and_then(|rule| rule.name.as_ref()),
623    );
624
625    // pick a target at random from the list, respecting weights. if there are
626    // no backends listed we should blackhole here.
627    let weighted_backend = &crate::rand::with_thread_rng(|rng| {
628        matching_rule.backends.choose_weighted(rng, |wc| wc.weight)
629    });
630    let backend_ref = match weighted_backend {
631        Ok(backend_ref) => backend_ref,
632        Err(WeightedError::NoItem) => {
633            // TODO: should this just return a special endpoint that 500s?
634            return Err(Error::invalid_route(
635                "route has no backends",
636                route.id.clone(),
637                rule,
638                trace,
639            ));
640        }
641        Err(_) => {
642            return Err(Error::invalid_route(
643                "backends weights are invalid: total weights must be greater than zero",
644                route.id.clone(),
645                rule,
646                trace,
647            ))
648        }
649    };
650    let backend = backend_ref.into_backend_id(request.url.default_port());
651    trace.select_backend(&backend);
652
653    Ok(ResolvedRoute {
654        route,
655        rule,
656        backend,
657        trace,
658    })
659}
660
661async fn select_endpoint(
662    cache: &impl ConfigCache,
663    backend: &BackendId,
664    mut ctx: LbContext<'_>,
665    deadline: Option<Instant>,
666) -> crate::Result<SelectedEndpoint> {
667    // start the next trace phase
668    ctx.trace.start_endpoint_selection();
669
670    // lookup backend and endpoints
671    //
672    // these lookups are done sequentially, even though they could be raced, so
673    // that we can tell what step a lookup timed out on. we're assuming that
674    // this is okay because in the background fetching a backend for a cache
675    // will also trigger fetching its endpoints and parallelism here won't do
676    // much for us.
677    let blb = with_deadline!(
678        cache.get_backend(backend),
679        deadline,
680        "timed out fetching backend",
681        ctx.trace,
682    );
683    let Some(blb) = blb else {
684        return Err(Error::no_backend(backend.clone(), ctx.trace));
685    };
686    ctx.trace.lookup_backend(backend);
687
688    let endpoints = with_deadline!(
689        cache.get_endpoints(backend),
690        deadline,
691        "timed out fetching endpoints",
692        ctx.trace,
693    );
694    let Some(endpoints) = endpoints else {
695        return Err(Error::no_reachable_endpoints(backend.clone(), ctx.trace));
696    };
697    ctx.trace.lookup_endpoints(backend);
698
699    // load balance.
700    //
701    // no trace is done here, the load balancer impls stamp the traces themselves
702    let addr = blb.load_balancer.load_balance(
703        &mut ctx.trace,
704        &endpoints,
705        ctx.url,
706        ctx.headers,
707        ctx.previous_addrs,
708    );
709    let Some(addr) = addr else {
710        return Err(Error::no_reachable_endpoints(backend.clone(), ctx.trace));
711    };
712
713    Ok(SelectedEndpoint {
714        addr: *addr,
715        trace: ctx.trace,
716    })
717}
718
719async fn sleep_until(deadline: Option<Instant>) {
720    match deadline {
721        Some(d) => tokio::time::sleep_until(d.into()).await,
722        None => std::future::pending().await,
723    }
724}
725
726//FIXME(routing): picking between these is way more complicated than finding the
727//first match
728fn find_matching_rule<'a>(
729    route: &'a Route,
730    request: HttpRequest<'_>,
731) -> Option<(usize, &'a RouteRule)> {
732    let rule_idx = route
733        .rules
734        .iter()
735        .position(|rule| is_route_rule_match(rule, request.method, request.url, request.headers))?;
736
737    let rule = &route.rules[rule_idx];
738    Some((rule_idx, rule))
739}
740
741pub fn is_route_rule_match(
742    rule: &RouteRule,
743    method: &http::Method,
744    url: &crate::Url,
745    headers: &http::HeaderMap,
746) -> bool {
747    if rule.matches.is_empty() {
748        return true;
749    }
750    rule.matches
751        .iter()
752        .any(|m| is_route_match_match(m, method, url, headers))
753}
754
755pub fn is_route_match_match(
756    rule: &RouteMatch,
757    method: &http::Method,
758    url: &crate::Url,
759    headers: &http::HeaderMap,
760) -> bool {
761    let mut method_matches = true;
762    if let Some(rule_method) = &rule.method {
763        method_matches = rule_method.eq(&method.to_string());
764    }
765
766    let mut path_matches = true;
767    if let Some(rule_path) = &rule.path {
768        path_matches = match &rule_path {
769            PathMatch::Exact { value } => value == url.path(),
770            PathMatch::Prefix { value } => url.path().starts_with(value),
771            PathMatch::RegularExpression { value } => value.is_match(url.path()),
772        }
773    }
774
775    let headers_matches = rule.headers.iter().all(|m| is_header_match(m, headers));
776    let qp_matches = rule
777        .query_params
778        .iter()
779        .all(|m| is_query_params_match(m, url.query()));
780
781    method_matches && path_matches && headers_matches && qp_matches
782}
783
784pub fn is_header_match(rule: &HeaderMatch, headers: &http::HeaderMap) -> bool {
785    let Some(header_val) = headers.get(rule.name()) else {
786        return false;
787    };
788    let Ok(header_val) = header_val.to_str() else {
789        return false;
790    };
791    rule.is_match(header_val)
792}
793
794pub fn is_query_params_match(rule: &QueryParamMatch, query: Option<&str>) -> bool {
795    let Some(query) = query else {
796        return false;
797    };
798    for (param, value) in form_urlencoded::parse(query.as_bytes()) {
799        if param == rule.name() {
800            return rule.is_match(&value);
801        }
802    }
803    false
804}
805
806/// generate a URL search path for this url.
807///
808/// the resturned Vec will always contain either:
809///
810/// - a single element, a ref to the original URL
811///
812/// - `search.len() + 1` elements, where the first element is the original
813///   URl and the rest of the entries are the result of appending the URL's
814///   hostname to the suffixes in search_config.search. the order of the suffixes in
815///   search is preserved.
816fn search<'a>(search_config: &SearchConfig, url: &'a crate::Url) -> Vec<Cow<'a, crate::Url>> {
817    // TODO: this could return an enum { Original(url), Search(url, path) } that
818    // implements Iterator and lazily generates Cow<Url>. there's no reason to
819    // do that at the moment but it'd be a little more correct.
820
821    let hostname = url.hostname();
822    let dots = hostname.as_bytes().iter().filter(|&&b| b == b'.').count();
823
824    let mut urls = vec![Cow::Borrowed(url)];
825
826    if dots < search_config.ndots as usize {
827        for suffix in &search_config.search {
828            let mut new_hostname = String::with_capacity(hostname.len() + hostname.len() + 1);
829            new_hostname.push_str(hostname);
830            new_hostname.push('.');
831            new_hostname.push_str(suffix);
832
833            let new_url = url
834                .with_hostname(&new_hostname)
835                .expect("SearchConfig search produced an invalid URL. this is a bug in Junction");
836            urls.push(Cow::Owned(new_url));
837        }
838    }
839
840    urls
841}
842
843// TODO: thorough tests for matching
844
845#[cfg(test)]
846mod test {
847    use crate::Url;
848    use junction_api::{http::BackendRef, Hostname, Name, Regex, Service};
849    use std::str::FromStr;
850
851    use pretty_assertions::assert_eq;
852
853    use super::*;
854
855    fn assert_send<T: Send>() {}
856    fn assert_sync<T: Sync>() {}
857
858    #[test]
859    fn assert_send_sync() {
860        assert_send::<HttpRequest<'_>>();
861        assert_sync::<HttpRequest<'_>>();
862    }
863
864    #[test]
865    fn test_search() {
866        let url = Url::from_str("https://tasty.potato.tomato:9876").unwrap();
867        let search_setup: Vec<Hostname> = vec![
868            Hostname::from_static("foo.bar.baz"),
869            Hostname::from_static("bar.baz"),
870            Hostname::from_static("baz"),
871        ];
872
873        // with ndots < dots, should just return the original url
874        assert_eq!(
875            search(&SearchConfig::new(0, search_setup.clone()), &url),
876            vec![Cow::Borrowed(&url)]
877        );
878        assert_eq!(
879            search(&SearchConfig::new(1, search_setup.clone()), &url),
880            vec![Cow::Borrowed(&url)]
881        );
882        assert_eq!(
883            search(&SearchConfig::new(2, search_setup.clone()), &url),
884            vec![Cow::Borrowed(&url)]
885        );
886
887        // with high-enough ndots should return a borrowed URL and owned URLs
888        assert_eq!(
889            search(&SearchConfig::new(3, search_setup), &url),
890            vec![
891                Cow::Borrowed(&url),
892                Cow::Owned(
893                    "https://tasty.potato.tomato.foo.bar.baz:9876"
894                        .parse()
895                        .unwrap()
896                ),
897                Cow::Owned("https://tasty.potato.tomato.bar.baz:9876".parse().unwrap()),
898                Cow::Owned("https://tasty.potato.tomato.baz:9876".parse().unwrap()),
899            ],
900        );
901    }
902
903    #[track_caller]
904    fn assert_resolve_routes(cache: &impl ConfigCache, request: HttpRequest<'_>) -> ResolvedRoute {
905        resolve_routes(cache, Trace::new(), request, None, &SearchConfig::default())
906            .now_or_never()
907            .unwrap()
908            .unwrap()
909    }
910
911    #[track_caller]
912    fn assert_resolve_err(cache: &impl ConfigCache, request: HttpRequest<'_>) -> crate::Error {
913        resolve_routes(cache, Trace::new(), request, None, &SearchConfig::default())
914            .now_or_never()
915            .unwrap()
916            .unwrap_err()
917    }
918
919    #[test]
920    fn test_resolve_passthrough_route() {
921        let svc = Service::dns("example.com").unwrap();
922
923        let routes = StaticConfig::new(
924            vec![Route::passthrough_route(
925                Name::from_static("example"),
926                svc.clone(),
927            )],
928            vec![],
929        );
930
931        // check with no port
932        let url = Url::from_str("http://example.com/test-path").unwrap();
933        let headers = http::HeaderMap::default();
934        let request = HttpRequest::from_parts(&http::Method::GET, &url, &headers).unwrap();
935
936        let resolved = assert_resolve_routes(&routes, request);
937        assert_eq!(resolved.backend, svc.as_backend_id(80));
938
939        // check with explicit ports
940        for port in [443, 8008] {
941            let url = Url::from_str(&format!("http://example.com:{port}/test-path")).unwrap();
942            let headers = http::HeaderMap::default();
943            let request = HttpRequest::from_parts(&http::Method::GET, &url, &headers).unwrap();
944
945            let resolved = assert_resolve_routes(&routes, request);
946            assert_eq!(resolved.backend, svc.as_backend_id(port));
947        }
948    }
949
950    #[test]
951    fn test_resolve_route_no_rules() {
952        let route = Route {
953            id: Name::from_static("no-rules"),
954            hostnames: vec![Hostname::from_static("example.com").into()],
955            ports: vec![],
956            tags: Default::default(),
957            rules: vec![],
958        };
959
960        let routes = StaticConfig::new(vec![route], vec![]);
961
962        let url = Url::from_str("http://example.com:3214/users/123").unwrap();
963        let headers = http::HeaderMap::default();
964        let request = HttpRequest::from_parts(&http::Method::GET, &url, &headers).unwrap();
965
966        let err = assert_resolve_err(&routes, request);
967        assert!(err.to_string().contains("no rules matched the request"));
968        assert!(!err.is_temporary());
969    }
970
971    #[test]
972    fn test_resolve_route_no_rules_with_search_config() {
973        let route = Route {
974            id: Name::from_static("no-rules"),
975            hostnames: vec![Hostname::from_static("example.com").into()],
976            ports: vec![],
977            tags: Default::default(),
978            rules: vec![],
979        };
980
981        let routes = StaticConfig::new(vec![route], vec![]);
982
983        let url = Url::from_str("http://example.com:3214/users/123").unwrap();
984        let headers = http::HeaderMap::default();
985        let request = HttpRequest::from_parts(&http::Method::GET, &url, &headers).unwrap();
986
987        let err = resolve_routes(
988            &routes,
989            Trace::new(),
990            request,
991            None,
992            &SearchConfig::new(2, vec![Hostname::from_static("example.com")]),
993        )
994        .now_or_never()
995        .unwrap()
996        .unwrap_err();
997
998        assert!(err.to_string().contains("no rules matched the request"));
999        assert!(!err.is_temporary());
1000    }
1001
1002    #[test]
1003    fn test_resolve_route_no_backends() {
1004        let route = Route {
1005            id: Name::from_static("no-backends"),
1006            hostnames: vec![Hostname::from_static("example.com").into()],
1007            ports: vec![],
1008            tags: Default::default(),
1009            rules: vec![RouteRule {
1010                matches: vec![RouteMatch {
1011                    path: Some(PathMatch::Prefix {
1012                        value: "".to_string(),
1013                    }),
1014                    ..Default::default()
1015                }],
1016                ..Default::default()
1017            }],
1018        };
1019
1020        let routes = StaticConfig::new(vec![route], vec![]);
1021
1022        for port in [80, 7887] {
1023            let method = &http::Method::GET;
1024            let url = &Url::from_str(&format!("http://example.com:{port}/users/123")).unwrap();
1025            let headers = &http::HeaderMap::default();
1026            let request = HttpRequest::from_parts(method, url, headers).unwrap();
1027
1028            let err = assert_resolve_err(&routes, request);
1029            assert_eq!(err.to_string(), "invalid route configuration");
1030            assert!(!err.is_temporary());
1031        }
1032    }
1033
1034    #[test]
1035    fn test_resolve_path_match() {
1036        let backend_one = Service::kube("web", "svc1").unwrap();
1037        let backend_two = Service::kube("web", "svc2").unwrap();
1038
1039        let route = Route {
1040            id: Name::from_static("path-match"),
1041            hostnames: vec![Hostname::from_static("example.com").into()],
1042            ports: vec![],
1043            tags: Default::default(),
1044            rules: vec![
1045                RouteRule {
1046                    matches: vec![RouteMatch {
1047                        path: Some(PathMatch::Prefix {
1048                            value: "/users".to_string(),
1049                        }),
1050                        ..Default::default()
1051                    }],
1052                    backends: vec![BackendRef {
1053                        weight: 1,
1054                        service: backend_one.clone(),
1055                        port: Some(8910),
1056                    }],
1057                    ..Default::default()
1058                },
1059                RouteRule {
1060                    backends: vec![BackendRef {
1061                        weight: 1,
1062                        service: backend_two.clone(),
1063                        port: Some(8919),
1064                    }],
1065                    ..Default::default()
1066                },
1067            ],
1068        };
1069
1070        let routes = StaticConfig::new(vec![route], vec![]);
1071
1072        let url = &Url::from_str("http://example.com/test-path").unwrap();
1073        let headers = &http::HeaderMap::default();
1074        let request = HttpRequest::from_parts(&http::Method::GET, url, headers).unwrap();
1075        let resolved = assert_resolve_routes(&routes, request);
1076
1077        // should match the fallthrough rule
1078        assert_eq!(resolved.rule, 1);
1079        assert_eq!(resolved.backend, backend_two.as_backend_id(8919));
1080
1081        let url = Url::from_str("http://example.com/users/123").unwrap();
1082        let headers = &http::HeaderMap::default();
1083        let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1084        let resolved = assert_resolve_routes(&routes, request);
1085
1086        // should match the first rule, with the path match
1087        assert_eq!(resolved.backend, backend_one.as_backend_id(8910));
1088        assert!(!resolved.route.rules[resolved.rule].matches.is_empty());
1089
1090        let url = Url::from_str("http://example.com/users/123").unwrap();
1091        let headers = &http::HeaderMap::default();
1092        let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1093
1094        let resolved = assert_resolve_routes(&routes, request);
1095        // should match the first rule, with the path match
1096        assert_eq!(resolved.rule, 0);
1097        assert_eq!(resolved.backend, backend_one.as_backend_id(8910));
1098    }
1099
1100    #[test]
1101    fn test_resolve_query_match() {
1102        let backend_one = Service::kube("web", "svc1").unwrap();
1103        let backend_two = Service::kube("web", "svc2").unwrap();
1104
1105        let route = Route {
1106            id: Name::from_static("query-match"),
1107            hostnames: vec![Hostname::from_static("example.com").into()],
1108            ports: vec![],
1109            tags: Default::default(),
1110            rules: vec![
1111                RouteRule {
1112                    matches: vec![RouteMatch {
1113                        query_params: vec![
1114                            QueryParamMatch::Exact {
1115                                name: "qp1".to_string(),
1116                                value: "potato".to_string(),
1117                            },
1118                            QueryParamMatch::RegularExpression {
1119                                name: "qp2".to_string(),
1120                                value: Regex::from_str("foo.*bar").unwrap(),
1121                            },
1122                        ],
1123                        ..Default::default()
1124                    }],
1125                    backends: vec![BackendRef {
1126                        weight: 1,
1127                        service: backend_one.clone(),
1128                        port: Some(8910),
1129                    }],
1130                    ..Default::default()
1131                },
1132                RouteRule {
1133                    backends: vec![BackendRef {
1134                        weight: 1,
1135                        service: backend_two.clone(),
1136                        port: Some(8919),
1137                    }],
1138                    ..Default::default()
1139                },
1140            ],
1141        };
1142
1143        let routes = StaticConfig::new(vec![route], vec![]);
1144
1145        let wont_match = [
1146            "http://example.com?qp1=tomato",
1147            "http://example.com?qp1=potatooo",
1148            "http://example.com?qp2=barfoo",
1149            "http://example.com?qp2=fobar",
1150            "http://example.com?qp1=potat&qp2=foobar",
1151            "http://example.com?qp1=potato&qp2=fbar",
1152        ];
1153
1154        for url in wont_match {
1155            let url = Url::from_str(url).unwrap();
1156            let headers = &http::HeaderMap::default();
1157            let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1158
1159            let resolved = assert_resolve_routes(&routes, request);
1160            // should match the fallthrough rule
1161            assert_eq!(resolved.rule, 1);
1162            assert_eq!(resolved.backend, backend_two.as_backend_id(8919));
1163        }
1164
1165        let will_match = [
1166            "http://example.com?qp1=potato&qp2=foobar",
1167            "http://example.com?qp1=potato&qp2=foobazbar",
1168            "http://example.com?qp1=potato&qp2=fooooooooooooooobar",
1169        ];
1170
1171        for url in will_match {
1172            let url = Url::from_str(url).unwrap();
1173            let headers = &http::HeaderMap::default();
1174            let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1175
1176            let resolved = assert_resolve_routes(&routes, request);
1177            // should match one of the query matches
1178            assert_eq!(
1179                (resolved.rule, &resolved.backend),
1180                (0, &backend_one.as_backend_id(8910)),
1181                "should match the first rule: {url}"
1182            );
1183        }
1184    }
1185
1186    #[test]
1187    fn test_resolve_routes_resolves_ndots() {
1188        let backend = Service::kube("web", "svc1").unwrap();
1189
1190        let route = Route {
1191            id: Name::from_static("ndots-match"),
1192            hostnames: vec![Hostname::from_static("example.foo.bar.com").into()],
1193            ports: vec![],
1194            tags: Default::default(),
1195            rules: vec![RouteRule {
1196                matches: vec![],
1197                backends: vec![BackendRef {
1198                    weight: 1,
1199                    service: backend.clone(),
1200                    port: Some(8910),
1201                }],
1202                ..Default::default()
1203            }],
1204        };
1205
1206        let routes = StaticConfig::new(vec![route], vec![]);
1207
1208        let will_match = [
1209            "http://example",
1210            "http://example.foo",
1211            "http://example.foo.bar",
1212            "http://example.foo.bar.com",
1213        ];
1214        let will_match_hostnames = vec![
1215            Hostname::from_static("foo.bar.com"),
1216            Hostname::from_static("bar.com"),
1217            Hostname::from_static("com"),
1218        ];
1219
1220        for url in will_match {
1221            let url = crate::Url::from_str(url).unwrap();
1222            let headers = &http::HeaderMap::default();
1223            let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1224
1225            let resolved = resolve_routes(
1226                &routes,
1227                Trace::new(),
1228                request,
1229                None,
1230                &SearchConfig::new(3, will_match_hostnames.clone()),
1231            )
1232            .now_or_never()
1233            .unwrap()
1234            .unwrap();
1235
1236            // should match one of the query matches
1237            assert_eq!(
1238                (resolved.rule, &resolved.backend),
1239                (0, &backend.as_backend_id(8910)),
1240                "should match the first rule: {url}"
1241            );
1242        }
1243    }
1244
1245    #[test]
1246    fn test_resolve_routes_resolves_ndots_no_search() {
1247        let backend = Service::kube("web", "svc1").unwrap();
1248
1249        let will_match = [
1250            "http://example.com",
1251            "http://example.foo.com",
1252            "http://example.foo.bar.com",
1253        ];
1254
1255        let route = Route {
1256            id: Name::from_static("ndots-match"),
1257            hostnames: vec![
1258                Hostname::from_static("example.com").into(),
1259                Hostname::from_static("example.foo.com").into(),
1260                Hostname::from_static("example.foo.bar.com").into(),
1261            ],
1262            ports: vec![],
1263            tags: Default::default(),
1264            rules: vec![RouteRule {
1265                matches: vec![],
1266                backends: vec![BackendRef {
1267                    weight: 1,
1268                    service: backend.clone(),
1269                    port: Some(8910),
1270                }],
1271                ..Default::default()
1272            }],
1273        };
1274
1275        let routes = StaticConfig::new(vec![route], vec![]);
1276
1277        for url in will_match {
1278            let url = crate::Url::from_str(url).unwrap();
1279            let headers = &http::HeaderMap::default();
1280            let request = HttpRequest::from_parts(&http::Method::GET, &url, headers).unwrap();
1281
1282            let resolved = resolve_routes(
1283                &routes,
1284                Trace::new(),
1285                request,
1286                None,
1287                &SearchConfig::new(3, vec![]),
1288            )
1289            .now_or_never()
1290            .unwrap()
1291            .unwrap();
1292
1293            // should match one of the query matches
1294            assert_eq!(
1295                (resolved.rule, &resolved.backend),
1296                (0, &backend.as_backend_id(8910)),
1297                "should match the first rule: {url}"
1298            );
1299        }
1300    }
1301}