From 9121b7f88b70fea71cf42ea9dda4e23f16ac7bc97e6c4258fc7373893ff2883a Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:45:35 -0500 Subject: [PATCH] - add: consul tags and metadata - update: login reply now requests for the character services from consul to build the server list --- auth-service/src/main.rs | 7 +++- database-service/src/main.rs | 5 +++ packet-service/src/handlers/auth.rs | 30 +++++++++++++++-- packet-service/src/main.rs | 8 ++++- utils/src/consul_registration.rs | 7 ++++ utils/src/service_discovery.rs | 52 ++++++++++++++++++++++------- 6 files changed, 93 insertions(+), 16 deletions(-) diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index dc32c8b..72d5383 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use auth_service::auth::auth_service_server::AuthServiceServer; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; @@ -38,12 +39,16 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; @@ -55,7 +60,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); let db_address = db_nodes.get(0).unwrap(); - let db_url = format!("http://{}:{}", db_address.0, db_address.1); + let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); let database_client = DatabaseClient::connect(&db_url).await?; let full_addr = format!("{}:{}", &addr, port); diff --git a/database-service/src/main.rs b/database-service/src/main.rs index 0d55e32..923dd3e 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use database::database_service_server::DatabaseServiceServer; use database_service::database; use database_service::db::Database; @@ -38,12 +39,16 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs index 2907cf0..ded287c 100644 --- a/packet-service/src/handlers/auth.rs +++ b/packet-service/src/handlers/auth.rs @@ -1,11 +1,14 @@ use std::collections::HashMap; +use std::env; use tonic::{Code, Status}; use std::error::Error; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; +use utils::service_discovery; use crate::auth_client::AuthClient; +use crate::handlers::null_string::NullTerminatedString; use crate::packet::{send_packet, Packet, PacketPayload}; use crate::packet_type::PacketType; use crate::packets::cli_accept_req::CliAcceptReq; @@ -13,7 +16,7 @@ use crate::packets::cli_join_server_req::CliJoinServerReq; use crate::packets::cli_login_req::CliLoginReq; use crate::packets::{srv_accept_reply, srv_login_reply}; use crate::packets::srv_accept_reply::SrvAcceptReply; -use crate::packets::srv_login_reply::SrvLoginReply; +use crate::packets::srv_login_reply::{ServerInfo, SrvLoginReply}; pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { let data = CliAcceptReq::decode(packet.payload.as_slice()); @@ -50,7 +53,30 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut send_packet(stream, &response_packet).await?; } else { debug!("Successfully logged in"); - let data = SrvLoginReply { result: srv_login_reply::Result::Ok, right: 0, type_: 0, servers_info: Vec::new() }; + + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let servers = service_discovery::get_service_address(&consul_url, "character-service").await.unwrap_or_else(|err| { + warn!(err); + Vec::new() + }); + + if servers.len() == 0 { + let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + + let mut server_info: Vec = Vec::new(); + let mut id = 0; + for server in servers { + let name = server.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone(); + let is_test = server.ServiceTags.contains(&"test".to_string()) || server.ServiceTags.contains(&"staging".to_string()); + server_info.push(ServerInfo { test: u8::from(is_test), name: NullTerminatedString::new(&name), id}); + id = id + 1; + } + + let data = SrvLoginReply { result: srv_login_reply::Result::Ok, right: 0, type_: 0, servers_info: server_info }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index 0c353ff..9b64e3a 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::error::Error; use std::net::ToSocketAddrs; @@ -85,12 +86,17 @@ async fn main() -> Result<(), Box> { // Register service with Consul let service_id = consul_registration::generate_service_id(); + let tags = vec!["version-1.0".to_string()]; + let mut meta = HashMap::new(); + meta.insert("name".to_string(), "Athena".to_string()); consul_registration::register_service( &consul_url, service_id.as_str(), service_name.as_str(), service_address, service_port.parse().unwrap_or(50052), + tags, + meta, &health_check_url, ) .await?; @@ -102,7 +108,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); let auth_address = auth_node.get(0).unwrap(); - let auth_url = format!("http://{}:{}", auth_address.0, auth_address.1); + let auth_url = format!("http://{}:{}", auth_address.ServiceAddress, auth_address.ServicePort); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); let full_addr = format!("{}:{}", &addr, port); diff --git a/utils/src/consul_registration.rs b/utils/src/consul_registration.rs index a768540..23d9ed1 100644 --- a/utils/src/consul_registration.rs +++ b/utils/src/consul_registration.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use reqwest::Client; use serde::Serialize; use uuid::Uuid; @@ -8,6 +9,8 @@ struct ConsulRegistration { name: String, address: String, port: u16, + tags: Vec, + meta: HashMap, check: ConsulCheck, } @@ -28,6 +31,8 @@ pub async fn register_service( service_name: &str, service_address: &str, service_port: u16, + tags: Vec, + meta: HashMap, health_check_url: &str, ) -> Result<(), Box> { let registration = ConsulRegistration { @@ -35,6 +40,8 @@ pub async fn register_service( name: service_name.to_string(), address: service_address.to_string(), port: service_port, + tags, + meta, check: ConsulCheck { http: health_check_url.to_string(), interval: "10s".to_string(), // Health check interval diff --git a/utils/src/service_discovery.rs b/utils/src/service_discovery.rs index edea219..0b1d62a 100644 --- a/utils/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -1,12 +1,15 @@ +use std::collections::HashMap; use serde::Deserialize; #[derive(Debug, Deserialize)] -struct ServiceNode { - ServiceAddress: String, - ServicePort: u16, +pub struct ServiceNode { + pub ServiceAddress: String, + pub ServicePort: u16, + pub ServiceTags: Vec, + pub ServiceMeta: HashMap, } -pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result, Box> { +pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result, Box> { let client = reqwest::Client::new(); let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name); @@ -26,15 +29,40 @@ pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result // Deserialize the response into a Vec let nodes: Vec = response.json().await?; - // Map the nodes to (address, port) tuples - let addresses: Vec<(String, u16)> = nodes - .into_iter() - .map(|node| (node.ServiceAddress, node.ServicePort)) - .collect(); - - if addresses.is_empty() { + if nodes.is_empty() { Err(format!("No nodes found for service '{}'", service_name).into()) } else { - Ok(addresses) + Ok(nodes) } } + +// Example of filtering services with a specific tag +async fn get_services_with_tag( + service_name: &str, + tag: &str, + consul_url: &str, +) -> Result, Box> { + let url = format!("{}/v1/catalog/service/{}", consul_url, service_name); + + let client = reqwest::Client::new(); + let response = client.get(&url).send().await?; + + if !response.status().is_success() { + return Err(format!( + "Failed to fetch service nodes for '{}': {}", + service_name, response.status() + ) + .into()); + } + + // Deserialize the response into a Vec + let nodes: Vec = response.json().await?; + + // Filter nodes that include the specified tag + let filtered_nodes = nodes + .into_iter() + .filter(|node| node.ServiceTags.contains(&tag.to_string())) + .collect(); + + Ok(filtered_nodes) +}