From 7d97afebf24c809ed5faa2c93b8148bcadd372127248f6b771e7629f7087c7d8 Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:45:35 -0500 Subject: [PATCH] - add: consul tags and metadata - update: login reply now requests for the character services from consul to build the server list --- auth-service/src/main.rs | 7 ++++- database-service/src/main.rs | 5 +++ packet-service/src/main.rs | 8 ++++- utils/src/consul_registration.rs | 7 +++++ utils/src/service_discovery.rs | 52 ++++++++++++++++++++++++-------- 5 files changed, 65 insertions(+), 14 deletions(-) diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index dc32c8b..72d5383 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use auth_service::auth::auth_service_server::AuthServiceServer; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; @@ -38,12 +39,16 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; @@ -55,7 +60,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); let db_address = db_nodes.get(0).unwrap(); - let db_url = format!("http://{}:{}", db_address.0, db_address.1); + let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); let database_client = DatabaseClient::connect(&db_url).await?; let full_addr = format!("{}:{}", &addr, port); diff --git a/database-service/src/main.rs b/database-service/src/main.rs index 0d55e32..923dd3e 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use database::database_service_server::DatabaseServiceServer; use database_service::database; use database_service::db::Database; @@ -38,12 +39,16 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index 0c353ff..9b64e3a 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::error::Error; use std::net::ToSocketAddrs; @@ -85,12 +86,17 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); + meta.insert("name".to_string(), "Athena".to_string()); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; @@ -102,7 +108,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); let auth_address = auth_node.get(0).unwrap(); - let auth_url = format!("http://{}:{}", auth_address.0, auth_address.1); + let auth_url = format!("http://{}:{}", auth_address.ServiceAddress, auth_address.ServicePort); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); let full_addr = format!("{}:{}", &addr, port); diff --git a/utils/src/consul_registration.rs b/utils/src/consul_registration.rs index a768540..23d9ed1 100644 --- a/utils/src/consul_registration.rs +++ b/utils/src/consul_registration.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use reqwest::Client; use serde::Serialize; use uuid::Uuid; @@ -8,6 +9,8 @@ struct ConsulRegistration { name: String, address: String, port: u16, + tags: Vec, + meta: HashMap, check: ConsulCheck, } @@ -28,6 +31,8 @@ pub async fn register_service( service_name: &str, service_address: &str, service_port: u16, + tags: Vec, + meta: HashMap, health_check_url: &str, ) -> Result<(), Box> { let registration = ConsulRegistration { @@ -35,6 +40,8 @@ pub async fn register_service( name: service_name.to_string(), address: service_address.to_string(), port: service_port, + tags, + meta, check: ConsulCheck { http: health_check_url.to_string(), interval: "10s".to_string(), // Health check interval diff --git a/utils/src/service_discovery.rs b/utils/src/service_discovery.rs index edea219..0b1d62a 100644 --- a/utils/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -1,12 +1,15 @@ +use std::collections::HashMap; use serde::Deserialize; #[derive(Debug, Deserialize)] -struct ServiceNode { - ServiceAddress: String, - ServicePort: u16, +pub struct ServiceNode { + pub ServiceAddress: String, + pub ServicePort: u16, + pub ServiceTags: Vec, + pub ServiceMeta: HashMap, } -pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result, Box> { +pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result, Box> { let client = reqwest::Client::new(); let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name); @@ -26,15 +29,40 @@ pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result // Deserialize the response into a Vec let nodes: Vec = response.json().await?; - // Map the nodes to (address, port) tuples - let addresses: Vec<(String, u16)> = nodes - .into_iter() - .map(|node| (node.ServiceAddress, node.ServicePort)) - .collect(); - - if addresses.is_empty() { + if nodes.is_empty() { Err(format!("No nodes found for service '{}'", service_name).into()) } else { - Ok(addresses) + Ok(nodes) } } + +// Example of filtering services with a specific tag +async fn get_services_with_tag( + service_name: &str, + tag: &str, + consul_url: &str, +) -> Result, Box> { + let url = format!("{}/v1/catalog/service/{}", consul_url, service_name); + + let client = reqwest::Client::new(); + let response = client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch service nodes for '{}': {}", + service_name, response.status() + ) + .into()); + } + + // Deserialize the response into a Vec + let nodes: Vec = response.json().await?; + + // Filter nodes that include the specified tag + let filtered_nodes = nodes + .into_iter() + .filter(|node| node.ServiceTags.contains(&tag.to_string())) + .collect(); + + Ok(filtered_nodes) +}