1use std::collections::BTreeMap;
2use std::str::FromStr;
3
4use crate::backend::{Backend, LbPolicy};
5use crate::error::{Error, ErrorContext};
6use crate::{Name, Service};
7
8use k8s_openapi::api::core::v1 as core_v1;
9use kube::api::ObjectMeta;
10use kube::{Resource, ResourceExt};
11
12const LB_ANNOTATION: &str = "junctionlabs.io/backend.lb";
13
14fn lb_policy_annotation(port: u16) -> String {
15 format!("{LB_ANNOTATION}.{port}")
16}
17
18impl Backend {
19 pub fn to_service_patch(&self) -> core_v1::Service {
25 let mut svc = core_v1::Service {
26 metadata: ObjectMeta {
27 annotations: Some(BTreeMap::new()),
28 ..Default::default()
29 },
30 ..Default::default()
31 };
32
33 let lb_annotation = lb_policy_annotation(self.id.port);
34 let lb_json = serde_json::to_string(&self.lb)
35 .expect("Failed to serialize Backend. this is a bug in Junction, not your code");
36 svc.annotations_mut()
37 .insert(lb_annotation.to_string(), lb_json);
38
39 match &self.id.service {
40 Service::Dns(dns) => {
41 svc.spec = Some(core_v1::ServiceSpec {
42 type_: Some("ExternalName".to_string()),
43 external_name: Some(dns.hostname.to_string()),
44 ..Default::default()
45 })
46 }
47 Service::Kube(service) => {
48 let meta = svc.meta_mut();
49 meta.name = Some(service.name.to_string());
50 meta.namespace = Some(service.namespace.to_string());
51
52 svc.spec = Some(core_v1::ServiceSpec {
53 type_: Some("ClusterIP".to_string()),
54 ports: Some(vec![core_v1::ServicePort {
55 port: self.id.port as i32,
56 protocol: Some("TCP".to_string()),
57 ..Default::default()
58 }]),
59 ..Default::default()
60 })
61 }
62 };
63
64 svc
65 }
66
67 pub fn from_service(svc: &core_v1::Service) -> Result<Vec<Self>, Error> {
83 let (namespace, name) = (
84 as_ref_or_else(&svc.meta().namespace, "missing namespace")
85 .with_fields("meta", "name")?,
86 as_ref_or_else(&svc.meta().name, "missing name").with_fields("meta", "name")?,
87 );
88
89 let spec = as_ref_or_else(&svc.spec, "missing spec").with_field("spec")?;
90 let svc_type = spec
91 .type_
92 .as_deref()
93 .ok_or_else(|| Error::new_static("missing type"))
94 .with_fields("spec", "type")?;
95
96 let mut backends = vec![];
97
98 let (service, svc_ports) = match svc_type {
100 "ClusterIP" => {
101 let name = Name::from_str(name).with_fields("meta", "name")?;
102 let namespace = Name::from_str(namespace).with_fields("meta", "namespace")?;
103 let service = Service::kube(&namespace, &name)?;
104
105 let svc_ports =
106 as_ref_or_else(&spec.ports, "missing ports").with_fields("spec", "ports")?;
107
108 let mut ports = Vec::with_capacity(svc_ports.len());
109 for (i, svc_port) in svc_ports.iter().enumerate() {
110 let port: u16 = convert_port(svc_port.port)
111 .with_field("port")
112 .with_field_index("ports", i)?;
113 ports.push(port);
114 }
115
116 (service, ports)
117 }
118 "ExternalName" => {
119 let external_name = as_ref_or_else(&spec.external_name, "missing externalName")
120 .with_fields("spec", "externalName")?;
121
122 let service = Service::dns(external_name).with_fields("spec", "externalName")?;
123 let svc_ports = spec.ports.as_deref().unwrap_or_default();
124
125 let mut ports = Vec::with_capacity(svc_ports.len());
126 for (i, svc_port) in svc_ports.iter().enumerate() {
127 let port: u16 = convert_port(svc_port.port)
128 .with_field("port")
129 .with_field_index("ports", i)?;
130 ports.push(port);
131 }
132
133 if ports.is_empty() {
134 ports.extend([80, 443]);
135 }
136
137 (service, ports)
138 }
139 svc_type => return Err(Error::new(format!("{svc_type} Services are unsupported"))),
140 };
141
142 for port in svc_ports {
144 let lb =
145 get_lb_policy(svc.annotations(), &lb_policy_annotation(port))?.unwrap_or_default();
146 backends.push(Backend {
147 id: crate::BackendId {
148 service: service.clone(),
149 port,
150 },
151 lb,
152 })
153 }
154
155 Ok(backends)
156 }
157}
158
159fn get_lb_policy(
160 annotations: &BTreeMap<String, String>,
161 key: &str,
162) -> Result<Option<LbPolicy>, Error> {
163 match annotations.get(key) {
164 Some(s) => {
165 let lb_policy = serde_json::from_str(s)
166 .map_err(|e| Error::new(format!("failed to deserialize {key}: {e}")))?;
167 Ok(Some(lb_policy))
168 }
169 None => Ok(None),
170 }
171}
172
173#[inline]
174fn convert_port(port: i32) -> Result<u16, Error> {
175 port.try_into()
176 .map_err(|_| Error::new(format!("port value '{port}' is out of range")))
177}
178
179#[inline]
180fn as_ref_or_else<'a, T>(f: &'a Option<T>, message: &'static str) -> Result<&'a T, Error> {
181 f.as_ref().ok_or_else(|| Error::new_static(message))
182}
183
184#[cfg(test)]
185mod test {
186 use k8s_openapi::api::core::v1 as core_v1;
187 use kube::api::ObjectMeta;
188
189 use crate::backend::{RequestHashPolicy, RequestHasher, RingHashParams};
190
191 use super::*;
192
193 macro_rules! annotations {
194 ($($k:expr => $v:expr),* $(,)*) => {{
195 let mut annotations = BTreeMap::new();
196 $(
197 annotations.insert($k.to_string(), $v.to_string());
198 )*
199 annotations
200 }}
201 }
202
203 const CLUSTER_IP: Option<&str> = Some("ClusterIP");
204 const EXTERNAL_NAME: Option<&str> = Some("ExternalName");
205
206 #[test]
207 fn test_to_service_patch() {
208 let backend = Backend {
209 id: Service::kube("bar", "foo").unwrap().as_backend_id(1212),
210 lb: LbPolicy::RoundRobin,
211 };
212 assert_eq!(
213 backend.to_service_patch(),
214 core_v1::Service {
215 metadata: ObjectMeta {
216 namespace: Some("bar".to_string()),
217 name: Some("foo".to_string()),
218 annotations: Some(
219 annotations! { "junctionlabs.io/backend.lb.1212" => r#"{"type":"RoundRobin"}"# }
220 ),
221 ..Default::default()
222 },
223 spec: Some(core_v1::ServiceSpec {
224 type_: CLUSTER_IP.map(str::to_string),
225 ports: Some(vec![core_v1::ServicePort {
226 port: 1212,
227 protocol: Some("TCP".to_string()),
228 ..Default::default()
229 }]),
230 ..Default::default()
231 }),
232 status: None,
233 }
234 );
235
236 let backend = Backend {
237 id: Service::dns("example.com").unwrap().as_backend_id(4430),
238 lb: LbPolicy::RoundRobin,
239 };
240 assert_eq!(
241 backend.to_service_patch(),
242 core_v1::Service {
243 metadata: ObjectMeta {
244 annotations: Some(
245 annotations! { "junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"# }
246 ),
247 ..Default::default()
248 },
249 spec: Some(core_v1::ServiceSpec {
250 type_: Some("ExternalName".to_string()),
251 external_name: Some("example.com".to_string()),
252 ..Default::default()
253 }),
254 status: None,
255 }
256 );
257 }
258
259 #[test]
260 fn test_from_clusterip() {
261 let svc = core_v1::Service {
263 metadata: ObjectMeta {
264 namespace: Some("bar".to_string()),
265 name: Some("foo".to_string()),
266 ..Default::default()
267 },
268 spec: Some(core_v1::ServiceSpec {
269 type_: CLUSTER_IP.map(str::to_string),
270 ports: Some(vec![core_v1::ServicePort {
271 port: 8910,
272 protocol: Some("TCP".to_string()),
273 ..Default::default()
274 }]),
275 ..Default::default()
276 }),
277 status: None,
278 };
279
280 assert_eq!(
281 Backend::from_service(&svc).unwrap(),
282 vec![Backend {
283 id: Service::kube("bar", "foo").unwrap().as_backend_id(8910),
284 lb: LbPolicy::Unspecified,
285 },]
286 );
287
288 let no_ports = core_v1::Service {
290 metadata: ObjectMeta {
291 namespace: Some("bar".to_string()),
292 name: Some("foo".to_string()),
293 ..Default::default()
294 },
295 spec: Some(core_v1::ServiceSpec {
296 type_: CLUSTER_IP.map(str::to_string),
297 ..Default::default()
298 }),
299 status: None,
300 };
301 assert!(Backend::from_service(&no_ports).is_err());
302
303 let svc = core_v1::Service {
306 metadata: ObjectMeta {
307 namespace: Some("bar".to_string()),
308 name: Some("foo".to_string()),
309 annotations: Some(annotations! {
310 "junctionlabs.io/backend.lb.443" => r#"{"type":"RingHash", "min_ring_size": 1024, "hash_params": [{"type": "Header", "name": "x-user"}]}"#,
311 "junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"#,
312 }),
313 ..Default::default()
314 },
315 spec: Some(core_v1::ServiceSpec {
316 type_: CLUSTER_IP.map(str::to_string),
317 ports: Some(vec![
318 core_v1::ServicePort {
319 name: Some("http".to_string()),
320 port: 80,
321 protocol: Some("TCP".to_string()),
322 ..Default::default()
323 },
324 core_v1::ServicePort {
325 name: Some("https".to_string()),
326 port: 443,
327 protocol: Some("TCP".to_string()),
328 ..Default::default()
329 },
330 core_v1::ServicePort {
331 name: Some("health".to_string()),
332 port: 4430,
333 protocol: Some("TCP".to_string()),
334 ..Default::default()
335 },
336 ]),
337 ..Default::default()
338 }),
339 status: None,
340 };
341
342 assert_eq!(
343 Backend::from_service(&svc).unwrap(),
344 vec![
345 Backend {
346 id: Service::kube("bar", "foo").unwrap().as_backend_id(80),
347 lb: LbPolicy::Unspecified,
348 },
349 Backend {
350 id: Service::kube("bar", "foo").unwrap().as_backend_id(443),
351 lb: LbPolicy::RingHash(RingHashParams {
352 min_ring_size: 1024,
353 hash_params: vec![RequestHashPolicy {
354 terminal: false,
355 hasher: RequestHasher::Header {
356 name: "x-user".to_string()
357 }
358 }]
359 }),
360 },
361 Backend {
362 id: Service::kube("bar", "foo").unwrap().as_backend_id(4430),
363 lb: LbPolicy::RoundRobin,
364 },
365 ]
366 )
367 }
368
369 #[test]
370 fn test_from_external_name() {
371 let svc = core_v1::Service {
374 metadata: ObjectMeta {
375 namespace: Some("bar".to_string()),
376 name: Some("foo".to_string()),
377 annotations: Some(annotations! {
378 "junctionlabs.io/backend.lb.443" => r#"{"type":"RoundRobin"}"#,
379 }),
380 ..Default::default()
381 },
382 spec: Some(core_v1::ServiceSpec {
383 type_: EXTERNAL_NAME.map(str::to_string),
384 external_name: Some("www.junctionlabs.io".to_string()),
385 ..Default::default()
386 }),
387 status: None,
388 };
389
390 assert_eq!(
391 Backend::from_service(&svc).unwrap(),
392 vec![
393 Backend {
394 id: Service::dns("www.junctionlabs.io")
395 .unwrap()
396 .as_backend_id(80),
397 lb: LbPolicy::Unspecified,
398 },
399 Backend {
400 id: Service::dns("www.junctionlabs.io")
401 .unwrap()
402 .as_backend_id(443),
403 lb: LbPolicy::RoundRobin,
404 },
405 ]
406 );
407
408 let svc = core_v1::Service {
410 metadata: ObjectMeta {
411 namespace: Some("bar".to_string()),
412 name: Some("foo".to_string()),
413 annotations: Some(annotations! {
414 "junctionlabs.io/backend.lb.7777" => r#"{"type":"RoundRobin"}"#,
415 }),
416 ..Default::default()
417 },
418 spec: Some(core_v1::ServiceSpec {
419 type_: EXTERNAL_NAME.map(str::to_string),
420 external_name: Some("www.junctionlabs.io".to_string()),
421 ports: Some(vec![core_v1::ServicePort {
422 port: 7777,
423 protocol: Some("TCP".to_string()),
424 ..Default::default()
425 }]),
426 ..Default::default()
427 }),
428 status: None,
429 };
430
431 assert_eq!(
432 Backend::from_service(&svc).unwrap(),
433 vec![Backend {
434 id: Service::dns("www.junctionlabs.io")
435 .unwrap()
436 .as_backend_id(7777),
437 lb: LbPolicy::RoundRobin,
438 },]
439 )
440 }
441
442 #[test]
443 fn test_svc_patch_roundtrip() {
444 let backend = Backend {
445 id: Service::kube("bar", "foo").unwrap().as_backend_id(8888),
446 lb: LbPolicy::RoundRobin,
447 };
448
449 assert_eq!(
450 Backend::from_service(&backend.to_service_patch()).unwrap(),
451 vec![backend.clone()]
452 )
453 }
454}