junction_api/kube/
backend.rs

1use std::collections::BTreeMap;
2use std::str::FromStr;
3
4use crate::backend::{Backend, LbPolicy};
5use crate::error::{Error, ErrorContext};
6use crate::{Name, Service};
7
8use k8s_openapi::api::core::v1 as core_v1;
9use kube::api::ObjectMeta;
10use kube::{Resource, ResourceExt};
11
12const LB_ANNOTATION: &str = "junctionlabs.io/backend.lb";
13
14fn lb_policy_annotation(port: u16) -> String {
15    format!("{LB_ANNOTATION}.{port}")
16}
17
18impl Backend {
19    /// Generate a partial [Service] from this backend.
20    ///
21    /// This service can be used to patch and overwrite an existing Service
22    /// using the `kube` crate or saved as json/yaml and used to patch an
23    /// existing service with `kubectl patch`.
24    pub fn to_service_patch(&self) -> core_v1::Service {
25        let mut svc = core_v1::Service {
26            metadata: ObjectMeta {
27                annotations: Some(BTreeMap::new()),
28                ..Default::default()
29            },
30            ..Default::default()
31        };
32
33        let lb_annotation = lb_policy_annotation(self.id.port);
34        let lb_json = serde_json::to_string(&self.lb)
35            .expect("Failed to serialize Backend. this is a bug in Junction, not your code");
36        svc.annotations_mut()
37            .insert(lb_annotation.to_string(), lb_json);
38
39        match &self.id.service {
40            Service::Dns(dns) => {
41                svc.spec = Some(core_v1::ServiceSpec {
42                    type_: Some("ExternalName".to_string()),
43                    external_name: Some(dns.hostname.to_string()),
44                    ..Default::default()
45                })
46            }
47            Service::Kube(service) => {
48                let meta = svc.meta_mut();
49                meta.name = Some(service.name.to_string());
50                meta.namespace = Some(service.namespace.to_string());
51
52                svc.spec = Some(core_v1::ServiceSpec {
53                    type_: Some("ClusterIP".to_string()),
54                    ports: Some(vec![core_v1::ServicePort {
55                        port: self.id.port as i32,
56                        protocol: Some("TCP".to_string()),
57                        ..Default::default()
58                    }]),
59                    ..Default::default()
60                })
61            }
62        };
63
64        svc
65    }
66
67    /// Read one or more [Backend]s from a Kubernetes [Service]. A backend will
68    /// be generated for every distinct port the [Service] is configured with.
69    ///
70    /// The type of [Backend] generated depends on the Service.
71    ///
72    /// - `ClusterIP` Services are translated to backends with a KubeService
73    ///   target and that use the `port` of the Service and the address of each
74    ///   endpoint. `ClusterIP` services must *not* be configured as headless
75    ///   services, so that endpoint information is available.
76    ///
77    /// - `ExternalName` Services are translated to backends with a Dns target,
78    ///   and uses the service port as the target port. If no port is specified,
79    ///   backends are generated for ports 80 and 443.
80    ///
81    /// All other Service types are currently unsupported.
82    pub fn from_service(svc: &core_v1::Service) -> Result<Vec<Self>, Error> {
83        let (namespace, name) = (
84            as_ref_or_else(&svc.meta().namespace, "missing namespace")
85                .with_fields("meta", "name")?,
86            as_ref_or_else(&svc.meta().name, "missing name").with_fields("meta", "name")?,
87        );
88
89        let spec = as_ref_or_else(&svc.spec, "missing spec").with_field("spec")?;
90        let svc_type = spec
91            .type_
92            .as_deref()
93            .ok_or_else(|| Error::new_static("missing type"))
94            .with_fields("spec", "type")?;
95
96        let mut backends = vec![];
97
98        // generate the target from the kube Service type.
99        let (service, svc_ports) = match svc_type {
100            "ClusterIP" => {
101                let name = Name::from_str(name).with_fields("meta", "name")?;
102                let namespace = Name::from_str(namespace).with_fields("meta", "namespace")?;
103                let service = Service::kube(&namespace, &name)?;
104
105                let svc_ports =
106                    as_ref_or_else(&spec.ports, "missing ports").with_fields("spec", "ports")?;
107
108                let mut ports = Vec::with_capacity(svc_ports.len());
109                for (i, svc_port) in svc_ports.iter().enumerate() {
110                    let port: u16 = convert_port(svc_port.port)
111                        .with_field("port")
112                        .with_field_index("ports", i)?;
113                    ports.push(port);
114                }
115
116                (service, ports)
117            }
118            "ExternalName" => {
119                let external_name = as_ref_or_else(&spec.external_name, "missing externalName")
120                    .with_fields("spec", "externalName")?;
121
122                let service = Service::dns(external_name).with_fields("spec", "externalName")?;
123                let svc_ports = spec.ports.as_deref().unwrap_or_default();
124
125                let mut ports = Vec::with_capacity(svc_ports.len());
126                for (i, svc_port) in svc_ports.iter().enumerate() {
127                    let port: u16 = convert_port(svc_port.port)
128                        .with_field("port")
129                        .with_field_index("ports", i)?;
130                    ports.push(port);
131                }
132
133                if ports.is_empty() {
134                    ports.extend([80, 443]);
135                }
136
137                (service, ports)
138            }
139            svc_type => return Err(Error::new(format!("{svc_type} Services are unsupported"))),
140        };
141
142        // generate a new Backend for every service port
143        for port in svc_ports {
144            let lb =
145                get_lb_policy(svc.annotations(), &lb_policy_annotation(port))?.unwrap_or_default();
146            backends.push(Backend {
147                id: crate::BackendId {
148                    service: service.clone(),
149                    port,
150                },
151                lb,
152            })
153        }
154
155        Ok(backends)
156    }
157}
158
159fn get_lb_policy(
160    annotations: &BTreeMap<String, String>,
161    key: &str,
162) -> Result<Option<LbPolicy>, Error> {
163    match annotations.get(key) {
164        Some(s) => {
165            let lb_policy = serde_json::from_str(s)
166                .map_err(|e| Error::new(format!("failed to deserialize {key}: {e}")))?;
167            Ok(Some(lb_policy))
168        }
169        None => Ok(None),
170    }
171}
172
173#[inline]
174fn convert_port(port: i32) -> Result<u16, Error> {
175    port.try_into()
176        .map_err(|_| Error::new(format!("port value '{port}' is out of range")))
177}
178
179#[inline]
180fn as_ref_or_else<'a, T>(f: &'a Option<T>, message: &'static str) -> Result<&'a T, Error> {
181    f.as_ref().ok_or_else(|| Error::new_static(message))
182}
183
184#[cfg(test)]
185mod test {
186    use k8s_openapi::api::core::v1 as core_v1;
187    use kube::api::ObjectMeta;
188
189    use crate::backend::{RequestHashPolicy, RequestHasher, RingHashParams};
190
191    use super::*;
192
193    macro_rules! annotations {
194        ($($k:expr => $v:expr),* $(,)*) => {{
195            let mut annotations = BTreeMap::new();
196            $(
197                annotations.insert($k.to_string(), $v.to_string());
198            )*
199            annotations
200        }}
201    }
202
203    const CLUSTER_IP: Option<&str> = Some("ClusterIP");
204    const EXTERNAL_NAME: Option<&str> = Some("ExternalName");
205
206    #[test]
207    fn test_to_service_patch() {
208        let backend = Backend {
209            id: Service::kube("bar", "foo").unwrap().as_backend_id(1212),
210            lb: LbPolicy::RoundRobin,
211        };
212        assert_eq!(
213            backend.to_service_patch(),
214            core_v1::Service {
215                metadata: ObjectMeta {
216                    namespace: Some("bar".to_string()),
217                    name: Some("foo".to_string()),
218                    annotations: Some(
219                        annotations! { "junctionlabs.io/backend.lb.1212" => r#"{"type":"RoundRobin"}"# }
220                    ),
221                    ..Default::default()
222                },
223                spec: Some(core_v1::ServiceSpec {
224                    type_: CLUSTER_IP.map(str::to_string),
225                    ports: Some(vec![core_v1::ServicePort {
226                        port: 1212,
227                        protocol: Some("TCP".to_string()),
228                        ..Default::default()
229                    }]),
230                    ..Default::default()
231                }),
232                status: None,
233            }
234        );
235
236        let backend = Backend {
237            id: Service::dns("example.com").unwrap().as_backend_id(4430),
238            lb: LbPolicy::RoundRobin,
239        };
240        assert_eq!(
241            backend.to_service_patch(),
242            core_v1::Service {
243                metadata: ObjectMeta {
244                    annotations: Some(
245                        annotations! { "junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"# }
246                    ),
247                    ..Default::default()
248                },
249                spec: Some(core_v1::ServiceSpec {
250                    type_: Some("ExternalName".to_string()),
251                    external_name: Some("example.com".to_string()),
252                    ..Default::default()
253                }),
254                status: None,
255            }
256        );
257    }
258
259    #[test]
260    fn test_from_clusterip() {
261        // should generate a backend for each port
262        let svc = core_v1::Service {
263            metadata: ObjectMeta {
264                namespace: Some("bar".to_string()),
265                name: Some("foo".to_string()),
266                ..Default::default()
267            },
268            spec: Some(core_v1::ServiceSpec {
269                type_: CLUSTER_IP.map(str::to_string),
270                ports: Some(vec![core_v1::ServicePort {
271                    port: 8910,
272                    protocol: Some("TCP".to_string()),
273                    ..Default::default()
274                }]),
275                ..Default::default()
276            }),
277            status: None,
278        };
279
280        assert_eq!(
281            Backend::from_service(&svc).unwrap(),
282            vec![Backend {
283                id: Service::kube("bar", "foo").unwrap().as_backend_id(8910),
284                lb: LbPolicy::Unspecified,
285            },]
286        );
287
288        // should error with no ports
289        let no_ports = core_v1::Service {
290            metadata: ObjectMeta {
291                namespace: Some("bar".to_string()),
292                name: Some("foo".to_string()),
293                ..Default::default()
294            },
295            spec: Some(core_v1::ServiceSpec {
296                type_: CLUSTER_IP.map(str::to_string),
297                ..Default::default()
298            }),
299            status: None,
300        };
301        assert!(Backend::from_service(&no_ports).is_err());
302
303        // multiple ports and some LB config, should generate different backends
304        // with different LB policies.
305        let svc = core_v1::Service {
306            metadata: ObjectMeta {
307                namespace: Some("bar".to_string()),
308                name: Some("foo".to_string()),
309                annotations: Some(annotations! {
310                    "junctionlabs.io/backend.lb.443" => r#"{"type":"RingHash", "min_ring_size": 1024, "hash_params": [{"type": "Header", "name": "x-user"}]}"#,
311                    "junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"#,
312                }),
313                ..Default::default()
314            },
315            spec: Some(core_v1::ServiceSpec {
316                type_: CLUSTER_IP.map(str::to_string),
317                ports: Some(vec![
318                    core_v1::ServicePort {
319                        name: Some("http".to_string()),
320                        port: 80,
321                        protocol: Some("TCP".to_string()),
322                        ..Default::default()
323                    },
324                    core_v1::ServicePort {
325                        name: Some("https".to_string()),
326                        port: 443,
327                        protocol: Some("TCP".to_string()),
328                        ..Default::default()
329                    },
330                    core_v1::ServicePort {
331                        name: Some("health".to_string()),
332                        port: 4430,
333                        protocol: Some("TCP".to_string()),
334                        ..Default::default()
335                    },
336                ]),
337                ..Default::default()
338            }),
339            status: None,
340        };
341
342        assert_eq!(
343            Backend::from_service(&svc).unwrap(),
344            vec![
345                Backend {
346                    id: Service::kube("bar", "foo").unwrap().as_backend_id(80),
347                    lb: LbPolicy::Unspecified,
348                },
349                Backend {
350                    id: Service::kube("bar", "foo").unwrap().as_backend_id(443),
351                    lb: LbPolicy::RingHash(RingHashParams {
352                        min_ring_size: 1024,
353                        hash_params: vec![RequestHashPolicy {
354                            terminal: false,
355                            hasher: RequestHasher::Header {
356                                name: "x-user".to_string()
357                            }
358                        }]
359                    }),
360                },
361                Backend {
362                    id: Service::kube("bar", "foo").unwrap().as_backend_id(4430),
363                    lb: LbPolicy::RoundRobin,
364                },
365            ]
366        )
367    }
368
369    #[test]
370    fn test_from_external_name() {
371        // without explicit ports, should generate backends for both 443 and 80.
372        // annotations should still get picked up.
373        let svc = core_v1::Service {
374            metadata: ObjectMeta {
375                namespace: Some("bar".to_string()),
376                name: Some("foo".to_string()),
377                annotations: Some(annotations! {
378                    "junctionlabs.io/backend.lb.443" => r#"{"type":"RoundRobin"}"#,
379                }),
380                ..Default::default()
381            },
382            spec: Some(core_v1::ServiceSpec {
383                type_: EXTERNAL_NAME.map(str::to_string),
384                external_name: Some("www.junctionlabs.io".to_string()),
385                ..Default::default()
386            }),
387            status: None,
388        };
389
390        assert_eq!(
391            Backend::from_service(&svc).unwrap(),
392            vec![
393                Backend {
394                    id: Service::dns("www.junctionlabs.io")
395                        .unwrap()
396                        .as_backend_id(80),
397                    lb: LbPolicy::Unspecified,
398                },
399                Backend {
400                    id: Service::dns("www.junctionlabs.io")
401                        .unwrap()
402                        .as_backend_id(443),
403                    lb: LbPolicy::RoundRobin,
404                },
405            ]
406        );
407
408        // with explicit ports, we should use the given port and pick up an lb policy
409        let svc = core_v1::Service {
410            metadata: ObjectMeta {
411                namespace: Some("bar".to_string()),
412                name: Some("foo".to_string()),
413                annotations: Some(annotations! {
414                    "junctionlabs.io/backend.lb.7777" => r#"{"type":"RoundRobin"}"#,
415                }),
416                ..Default::default()
417            },
418            spec: Some(core_v1::ServiceSpec {
419                type_: EXTERNAL_NAME.map(str::to_string),
420                external_name: Some("www.junctionlabs.io".to_string()),
421                ports: Some(vec![core_v1::ServicePort {
422                    port: 7777,
423                    protocol: Some("TCP".to_string()),
424                    ..Default::default()
425                }]),
426                ..Default::default()
427            }),
428            status: None,
429        };
430
431        assert_eq!(
432            Backend::from_service(&svc).unwrap(),
433            vec![Backend {
434                id: Service::dns("www.junctionlabs.io")
435                    .unwrap()
436                    .as_backend_id(7777),
437                lb: LbPolicy::RoundRobin,
438            },]
439        )
440    }
441
442    #[test]
443    fn test_svc_patch_roundtrip() {
444        let backend = Backend {
445            id: Service::kube("bar", "foo").unwrap().as_backend_id(8888),
446            lb: LbPolicy::RoundRobin,
447        };
448
449        assert_eq!(
450            Backend::from_service(&backend.to_service_patch()).unwrap(),
451            vec![backend.clone()]
452        )
453    }
454}