- removed: session-service - updated: moved health check out of consul registration - updated: get service info to pull the service from the default namespace for the service account - updated: the rest of the services to be able to handle the new database tables
124 lines
4.3 KiB
Rust
124 lines
4.3 KiB
Rust
use hickory_resolver::config::*;
|
|
use hickory_resolver::{Resolver, TokioAsyncResolver};
|
|
use std::net::SocketAddr;
|
|
use std::str::FromStr;
|
|
use kube::{Client, Api};
|
|
use k8s_openapi::api::core::v1::Service;
|
|
use std::collections::{BTreeMap};
|
|
use hickory_resolver::system_conf::read_system_conf;
|
|
use tracing::debug;
|
|
|
|
pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
|
|
let mut rc = ResolverConfig::new();
|
|
let url = consul_url.parse()?;
|
|
rc.add_name_server(NameServerConfig::new(url, Protocol::Tcp));
|
|
|
|
let resolver = TokioAsyncResolver::tokio(rc, ResolverOpts::default());
|
|
|
|
let srv_name = format!("_{}._{}.service.consul", service_name, service_protocol);
|
|
let srv_record = resolver.srv_lookup(&srv_name).await?;
|
|
|
|
let mut endpoints = Vec::new();
|
|
for record in srv_record {
|
|
let hostname = record.target();
|
|
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
|
|
for response in lookup_responses {
|
|
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
|
|
}
|
|
}
|
|
|
|
Ok(endpoints)
|
|
}
|
|
|
|
pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
|
|
let (config, options) = read_system_conf()?;
|
|
let resolver = TokioAsyncResolver::tokio(config, options);
|
|
|
|
let srv_name = format!("_{}._{}.{}", port_name, service_protocol, service_name);
|
|
let srv_record = resolver.srv_lookup(&srv_name).await?;
|
|
|
|
let mut endpoints = Vec::new();
|
|
for record in srv_record {
|
|
let hostname = record.target();
|
|
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
|
|
for response in lookup_responses {
|
|
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
|
|
}
|
|
}
|
|
|
|
Ok(endpoints)
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct ServiceInfo {
|
|
pub name: String,
|
|
pub namespace: String,
|
|
pub annotations: Option<BTreeMap<String, String>>,
|
|
pub labels: Option<BTreeMap<String, String>>,
|
|
}
|
|
|
|
pub async fn get_service_info(
|
|
namespace: &str,
|
|
service_name: &str,
|
|
) -> Result<ServiceInfo, Box<dyn std::error::Error + Send + Sync>> {
|
|
let client = Client::try_default().await?;
|
|
|
|
// Create an API object for services in the specified namespace
|
|
let services: Api<Service> = Api::default_namespaced(client);
|
|
|
|
debug!("Looking up service '{}'", service_name);
|
|
// Get the service object
|
|
let service = services.get(service_name).await?;
|
|
debug!("Got service: {:?}", service);
|
|
|
|
// Extract metadata
|
|
let name = service.metadata.name.unwrap_or_default();
|
|
let namespace = service.metadata.namespace.unwrap_or_default();
|
|
let annotations = service.metadata.annotations.clone();
|
|
let labels = service.metadata.labels.clone();
|
|
|
|
// Return the service info
|
|
Ok(ServiceInfo {
|
|
name,
|
|
namespace,
|
|
annotations,
|
|
labels,
|
|
})
|
|
}
|
|
|
|
pub async fn get_services_by_label(
|
|
namespace: &str,
|
|
label_selector: &str,
|
|
) -> Result<Vec<ServiceInfo>, Box<dyn std::error::Error>> {
|
|
let client = Client::try_default().await?;
|
|
|
|
// Create an API object for services in the specified namespace
|
|
let services: Api<Service> = Api::namespaced(client, namespace);
|
|
|
|
// Use ListParams to filter services by label
|
|
let list_params = kube::api::ListParams::default().labels(label_selector);
|
|
|
|
// List services that match the label selector
|
|
let service_list = services.list(&list_params).await?;
|
|
|
|
// Convert the list of services into a vector of ServiceInfo
|
|
let mut service_infos = Vec::new();
|
|
for service in service_list.items {
|
|
let name = service.metadata.name.clone().unwrap_or_default();
|
|
let namespace = service.metadata.namespace.clone().unwrap_or_default();
|
|
|
|
// Convert BTreeMap to HashMap for annotations and labels
|
|
let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect());
|
|
let labels = service.metadata.labels.map(|btree| btree.into_iter().collect());
|
|
|
|
service_infos.push(ServiceInfo {
|
|
name,
|
|
namespace,
|
|
annotations,
|
|
labels,
|
|
});
|
|
}
|
|
|
|
Ok(service_infos)
|
|
}
|