From f4bc414ebd787474a95fd2c34ed3bb31a483708538254e2fd7b260038b12c83b Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Tue, 18 Mar 2025 02:00:11 -0400 Subject: [PATCH] - update: code update to use kube api instead of consul --- api-service/src/main.rs | 39 +---- auth-service/src/main.rs | 58 +------ character-service/src/main.rs | 37 +---- charts/osirose-new/values.yaml | 20 +-- database-service/src/main.rs | 35 +---- packet-service/src/handlers/auth.rs | 189 +++++++++++------------ packet-service/src/handlers/character.rs | 4 +- packet-service/src/main.rs | 48 +----- scripts/build_and_push.py | 2 +- session-service/src/main.rs | 30 +--- utils/Cargo.toml | 2 + utils/src/service_discovery.rs | 136 +++++++++------- world-service/src/main.rs | 50 +----- 13 files changed, 197 insertions(+), 453 deletions(-) diff --git a/api-service/src/main.rs b/api-service/src/main.rs index 3d06bff..54e8b29 100644 --- a/api-service/src/main.rs +++ b/api-service/src/main.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{info, Level}; use utils::consul_registration; -use utils::service_discovery::get_service_address; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; mod axum_gateway; @@ -24,41 +24,13 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8079".to_string()); - - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "api-service".to_string()); - let service_address = env::var("API_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(8080), - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; - let auth_node = get_service_address(&consul_url, "auth-service").await?; + let auth_node = get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?; let auth_address = auth_node.get(0).unwrap(); - let auth_service_address = format!( - "http://{}:{}", - auth_address.ServiceAddress, auth_address.ServicePort - ); + let auth_service_address = format!("http://{}", auth_address); // Connect to the gRPC auth-service let grpc_client = AuthServiceClient::connect(auth_service_address.to_string()).await?; @@ -69,10 +41,5 @@ async fn main() -> Result<(), Box> { tokio::spawn(axum_gateway::serve_rest_api(grpc_client)); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 16cbb5e..b8f0f35 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -12,7 +12,7 @@ use tonic::transport::Server; use tracing::{debug, info, Level}; use utils::consul_registration; use utils::multi_service_load_balancer::{LoadBalancingStrategy, MultiServiceLoadBalancer}; -use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns}; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -29,59 +29,12 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string()); - - let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string()); - let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string()); - let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string()); - let consul_url = format!("http://{}:{}", consul_address, consul_port); - let consul_dns_url = format!("{}:{}", consul_address, consul_dns_port); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string()); - let service_address = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - let lb = MultiServiceLoadBalancer::new(&consul_dns_url, LoadBalancingStrategy::RoundRobin); - - let mut db_url = "".to_string(); - match lb.get_endpoint("database-service", "grpc").await? { - Some(endpoint) => { - db_url = format!("http://{}", endpoint); - }, - None => { - println!("No endpoints available for database-service"); - } - } - - let mut session_service_address = "".to_string(); - match lb.get_endpoint("session-service", "grpc").await? { - Some(endpoint) => { - session_service_address = format!("http://{}", endpoint); - }, - None => { - println!("No endpoints available for session-service"); - } - } + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); + let session_service_address = format!("http://{}",get_kube_service_endpoints_by_dns("session-service","tcp","session-service").await?.get(0).unwrap()); let db_client = Arc::new(DatabaseClient::connect(&db_url).await?); let session_client = Arc::new(SessionServiceClient::connect(session_service_address).await?); - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; - let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); let auth_service = MyAuthService { @@ -103,10 +56,5 @@ async fn main() -> Result<(), Box> { ); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/character-service/src/main.rs b/character-service/src/main.rs index cac965e..e8dd7b1 100644 --- a/character-service/src/main.rs +++ b/character-service/src/main.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use std::sync::Arc; use tracing::Level; use utils::consul_registration; -use utils::service_discovery::get_service_address; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -29,40 +29,11 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("CHARACTER_SERVICE_PORT").unwrap_or_else(|_| "50053".to_string()); + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string()); - let service_address = - env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let db_nodes = get_service_address(&consul_url, "database-service").await?; - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let mut meta = HashMap::new(); - meta.insert("name".to_string(), "Rose".to_string()); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); - let db_address = db_nodes.get(0).unwrap(); - let db_url = format!( - "http://{}:{}", - db_address.ServiceAddress, db_address.ServicePort - ); let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?); let character_service = MyCharacterService { character_db_client, @@ -77,9 +48,5 @@ async fn main() -> Result<(), Box> { .await?; utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) } diff --git a/charts/osirose-new/values.yaml b/charts/osirose-new/values.yaml index c7ba3ba..73a33b7 100644 --- a/charts/osirose-new/values.yaml +++ b/charts/osirose-new/values.yaml @@ -6,17 +6,17 @@ autoscaling: global: env: - LOG_LEVEL: "info" + LOG_LEVEL: "debug" APP_ENV: "dev" DATABASE_URL: "" # This is a placeholder. Will be dynamically constructed + REDIS_URL: "redis://valkey:6379/0" + LISTEN_ADDR: "0.0.0.0" services: - name: api-service replicas: 1 image: api-service:latest port: 8080 - env: - SERVICE_NAME: "api-service" tcp: enabled: true portName: api-service @@ -33,8 +33,6 @@ services: replicas: 1 image: auth-service:latest port: 50051 - env: - SERVICE_NAME: "auth-service" tcp: enabled: true portName: auth-service @@ -48,8 +46,6 @@ services: replicas: 1 image: character-service:latest port: 50053 - env: - SERVICE_NAME: "character-service" tcp: enabled: true portName: character-service @@ -63,8 +59,6 @@ services: replicas: 1 image: database-service:latest port: 50052 - env: - SERVICE_NAME: "database-service" tcp: enabled: true portName: database-service @@ -78,8 +72,6 @@ services: replicas: 1 image: packet-service:latest port: 29000 - env: - SERVICE_NAME: "packet-service" tcp: enabled: true portName: game-packet-service @@ -93,8 +85,6 @@ services: replicas: 1 image: session-service:latest port: 50055 - env: - SERVICE_NAME: "session-service" tcp: enabled: true portName: session-service @@ -108,8 +98,6 @@ services: replicas: 1 image: world-service:latest port: 50054 - env: - SERVICE_NAME: "world-service" tcp: enabled: true portName: world-service @@ -120,7 +108,7 @@ services: enabled: false tests: - enabled: true + enabled: false services: - name: api-service testCommand: ["curl", "-f", "http://api-service:8080/health"] diff --git a/database-service/src/main.rs b/database-service/src/main.rs index 3668113..1cb1d02 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -27,35 +27,8 @@ async fn main() -> Result<(), Box> { let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string()); - let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let redis_url = - std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string()); - let service_address = - env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; - - consul_registration::start_health_check(addr.as_str()).await?; + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); @@ -83,9 +56,5 @@ async fn main() -> Result<(), Box> { ); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs index 9394690..870cd05 100644 --- a/packet-service/src/handlers/auth.rs +++ b/packet-service/src/handlers/auth.rs @@ -25,6 +25,7 @@ use tonic::{Code, Status}; use tracing::{debug, error, info, warn}; use utils::null_string::NullTerminatedString; use utils::service_discovery; +use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info}; pub(crate) async fn handle_alive_req( stream: &mut TcpStream, @@ -161,60 +162,59 @@ pub(crate) async fn handle_login_req( state.session_id = Some(response.session_id.parse().unwrap()); } - let consul_url = - env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let servers = - service_discovery::get_service_address(&consul_url, "character-service") - .await - .unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); - - if servers.len() == 0 { - let data = SrvLoginReply { - result: srv_login_reply::Result::Failed, - right: 0, - type_: 0, - servers_info: Vec::new(), - }; - let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; - send_packet(stream, &response_packet).await?; - return Ok(()); - } - - let mut server_info: Vec = Vec::new(); let mut id = 0; - for server in servers { - let mut name = server - .ServiceMeta - .get("name") - .unwrap_or(&"".to_string()) - .clone(); - let is_test = server.ServiceTags.contains(&"test".to_string()) - || server.ServiceTags.contains(&"staging".to_string()); - if is_test { - name = format!("@{}", name); - } else { - name = format!(" {}", name); - } - server_info.push(ServerInfo { - test: u8::from(is_test), - name: NullTerminatedString::new(&name), - id, - }); - id = id + 1; - } - debug!("Server info: {:?}", server_info); + let mut server_info: Vec = Vec::new(); + match get_service_info("default", "character-service").await { + Ok(service_info) => { + if let Some(annotations) = service_info.annotations { + let mut server_name = "".to_string(); + let mut is_test = false; + for (key, value) in annotations { + match key.as_str() { + "name" => { + server_name = value; + } + "tags" => { - let data = SrvLoginReply { - result: srv_login_reply::Result::Ok, - right: 0, - type_: 0, - servers_info: server_info, - }; - let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; - send_packet(stream, &response_packet).await?; + } + _ => {} + } + } + + if is_test { + server_name = format!("@{}", server_name); + } else { + server_name = format!(" {}", server_name); + } + + server_info.push(ServerInfo { + test: u8::from(is_test), + name: NullTerminatedString::new(&server_name), + id, + }); + + let data = SrvLoginReply { + result: srv_login_reply::Result::Ok, + right: 0, + type_: 0, + servers_info: server_info, + }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + } + Err(err) => { + let data = SrvLoginReply { + result: srv_login_reply::Result::Failed, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + } } } Err(status) => { @@ -300,52 +300,47 @@ pub(crate) async fn handle_channel_list_req( let request = CliChannelListReq::decode(packet.payload.as_slice()); debug!("{:?}", request); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let channels = service_discovery::get_service_address(&consul_url, "world-service") - .await - .unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); - - if channels.len() == 0 { - let data = SrvChannelListReply { - id: request?.server_id, - channels: Vec::new(), - }; - let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; - send_packet(stream, &response_packet).await?; - return Ok(()); - } - - debug!("Server info: {:?}", channels); - let mut channel_info: Vec = Vec::new(); let mut id = 1; - for channel in channels { - let name = format!( - "{}", - channel - .ServiceMeta - .get("name") - .unwrap_or(&"".to_string()) - .clone() - ); - channel_info.push(ChannelInfo { - id: id, - low_age: 0, - high_age: 0, - capacity: 0, - name: NullTerminatedString::new(&name), - }); - id = id + 1; - } - debug!("Channel info: {:?}", channel_info); + let mut channel_info: Vec = Vec::new(); + match get_service_info("default", "world-service").await { + Ok(service_info) => { + if let Some(annotations) = service_info.annotations { + let mut server_name = "".to_string(); + for (key, value) in annotations { + match key.as_str() { + "name" => { + server_name = value; + } + "tags" => {} + _ => {} + } + } + channel_info.push(ChannelInfo { + id: id, + low_age: 0, + high_age: 0, + capacity: 0, + name: NullTerminatedString::new(&server_name), + }); + id = id + 1; - let data = SrvChannelListReply { - id: request?.server_id, - channels: channel_info, - }; - let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; - send_packet(stream, &response_packet).await?; + let data = SrvChannelListReply { + id: request?.server_id, + channels: channel_info, + }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; + } + } + Err(err) => { + let data = SrvChannelListReply { + id: request?.server_id, + channels: Vec::new(), + }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + } Ok(()) } diff --git a/packet-service/src/handlers/character.rs b/packet-service/src/handlers/character.rs index 354695a..ca1462c 100644 --- a/packet-service/src/handlers/character.rs +++ b/packet-service/src/handlers/character.rs @@ -292,9 +292,9 @@ pub(crate) async fn handle_select_char_req( for item in items { if item.slot < MAX_VISIBLE_ITEMS as i32 { - let slot = convert_type_to_body_part(item.slot) as usize - 2; + let slot = convert_type_to_body_part(item.slot) as isize - 2; if slot >= 0 { - equipped_item_list[slot] = EquippedItem { + equipped_item_list[slot as usize] = EquippedItem { id: item.item_id as u16, gem_opt: item.gem_option as u16, socket: item.socket as i8, diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index f33d816..0c1b364 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -21,7 +21,7 @@ use tokio::{select, signal}; use tracing::Level; use tracing::{debug, error, info, warn}; use utils::consul_registration; -use utils::service_discovery::get_service_address; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; use warp::Filter; mod auth_client; @@ -69,50 +69,13 @@ async fn main() -> Result<(), Box> { let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string()); let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string()); - - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string()); - let service_address = - env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - let auth_node = get_service_address(&consul_url, "auth-service").await?; - let character_node = get_service_address(&consul_url, "character-service").await?; - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "tcp".to_string()]; - let mut meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; + let auth_url = format!("http://{}",get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?.get(0).unwrap()); + let character_url = format!("http://{}",get_kube_service_endpoints_by_dns("character-service","tcp","character-service").await?.get(0).unwrap()); // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; - let auth_address = auth_node.get(0).unwrap(); - let auth_url = format!( - "http://{}:{}", - auth_address.ServiceAddress, auth_address.ServicePort - ); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); - - let character_address = character_node.get(0).unwrap(); - let character_url = format!( - "http://{}:{}", - character_address.ServiceAddress, character_address.ServicePort - ); let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?)); let full_addr = format!("{}:{}", &addr, port); @@ -160,10 +123,5 @@ async fn main() -> Result<(), Box> { prometheus_exporter::start(binding.parse().unwrap()).unwrap(); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/scripts/build_and_push.py b/scripts/build_and_push.py index 2873801..df85507 100644 --- a/scripts/build_and_push.py +++ b/scripts/build_and_push.py @@ -14,7 +14,7 @@ dockerfile_paths = [ ] common_tag = "latest" -version_tag = "v0.1.0" +version_tag = "v0.1.1" image_tag_prefix = "gitea.azgstudio.com/raven/" build_context = "../" diff --git a/session-service/src/main.rs b/session-service/src/main.rs index bce8341..5d2b515 100644 --- a/session-service/src/main.rs +++ b/session-service/src/main.rs @@ -34,32 +34,8 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string()); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let redis_url = - std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string()); - let service_address = - env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50055), - tags, - meta, - None, - None, - ) - .await?; let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); @@ -77,9 +53,5 @@ async fn main() -> Result<(), Box> { ); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 72dbab9..56acc24 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -17,3 +17,5 @@ async-trait = "0.1.87" serde_json = "1.0.140" hickory-resolver = "0.24.4" rand = "0.8.5" +kube = { version = "0.99.0", features = ["derive"] } +k8s-openapi = { version = "0.24.0", features = ["v1_32"] } diff --git a/utils/src/service_discovery.rs b/utils/src/service_discovery.rs index 82471e2..b318c4c 100644 --- a/utils/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -1,11 +1,11 @@ use hickory_resolver::config::*; use hickory_resolver::{Resolver, TokioAsyncResolver}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; -use tokio::runtime::Runtime; -use tracing::log::debug; +use kube::{Client, Api}; +use k8s_openapi::api::core::v1::Service; +use std::collections::{BTreeMap}; +use hickory_resolver::system_conf::read_system_conf; pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result, Box> { let mut rc = ResolverConfig::new(); @@ -29,68 +29,92 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s Ok(endpoints) } -#[derive(Debug, Deserialize)] -pub struct ServiceNode { - pub ServiceAddress: String, - pub ServicePort: u16, - pub ServiceTags: Vec, - pub ServiceMeta: HashMap, +pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result, Box> { + let (config, options) = read_system_conf()?; + let resolver = TokioAsyncResolver::tokio(config, options); + + let srv_name = format!("_{}._{}._{}", port_name, service_protocol, service_name); + let srv_record = resolver.srv_lookup(&srv_name).await?; + + let mut endpoints = Vec::new(); + for record in srv_record { + let hostname = record.target(); + let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?; + for response in lookup_responses { + endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?); + } + } + + Ok(endpoints) } -pub async fn get_service_address( - consul_url: &str, +#[derive(Debug)] +pub struct ServiceInfo { + pub name: String, + pub namespace: String, + pub annotations: Option>, + pub labels: Option>, +} + +pub async fn get_service_info( + namespace: &str, service_name: &str, -) -> Result, Box> { - let client = reqwest::Client::new(); - let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name); +) -> Result> { + let client = Client::try_default().await?; - let response = client.get(&consul_service_url).send().await?; + // Create an API object for services in the specified namespace + let services: Api = Api::namespaced(client, namespace); - if !response.status().is_success() { - return Err(format!( - "Failed to fetch service nodes for '{}': {}", - service_name, - response.status() - ) - .into()); - } + // Get the service object + let service = services.get(service_name).await?; - // Deserialize the response into a Vec - let nodes: Vec = response.json().await?; + // Extract metadata + let name = service.metadata.name.unwrap_or_default(); + let namespace = service.metadata.namespace.unwrap_or_default(); + let annotations = service.metadata.annotations.clone(); + let labels = service.metadata.labels.clone(); - if nodes.is_empty() { - Err(format!("No nodes found for service '{}'", service_name).into()) - } else { - Ok(nodes) - } + // Return the service info + Ok(ServiceInfo { + name, + namespace, + annotations, + labels, + }) } -async fn get_services_with_tag( - service_name: &str, - tag: &str, - consul_url: &str, -) -> Result, Box> { - let url = format!("{}/v1/catalog/service/{}", consul_url, service_name); - let client = reqwest::Client::new(); - let response = client.get(&url).send().await?; +pub async fn get_services_by_label( + namespace: &str, + label_selector: &str, +) -> Result, Box> { + let client = Client::try_default().await?; - if !response.status().is_success() { - return Err(format!( - "Failed to fetch service nodes for '{}': {}", - service_name, - response.status() - ) - .into()); + // Create an API object for services in the specified namespace + let services: Api = Api::namespaced(client, namespace); + + // Use ListParams to filter services by label + let list_params = kube::api::ListParams::default().labels(label_selector); + + // List services that match the label selector + let service_list = services.list(&list_params).await?; + + // Convert the list of services into a vector of ServiceInfo + let mut service_infos = Vec::new(); + for service in service_list.items { + let name = service.metadata.name.clone().unwrap_or_default(); + let namespace = service.metadata.namespace.clone().unwrap_or_default(); + + // Convert BTreeMap to HashMap for annotations and labels + let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect()); + let labels = service.metadata.labels.map(|btree| btree.into_iter().collect()); + + service_infos.push(ServiceInfo { + name, + namespace, + annotations, + labels, + }); } - // Deserialize the response into a Vec - let nodes: Vec = response.json().await?; - - // Filter nodes that include the specified tag - let filtered_nodes = nodes - .into_iter() - .filter(|node| node.ServiceTags.contains(&tag.to_string())) - .collect(); - - Ok(filtered_nodes) + Ok(service_infos) } diff --git a/world-service/src/main.rs b/world-service/src/main.rs index 14369c6..a342d7f 100644 --- a/world-service/src/main.rs +++ b/world-service/src/main.rs @@ -4,7 +4,7 @@ use std::env; use std::str::FromStr; use tracing::{debug, Level}; use utils::consul_registration; -use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns}; +use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -19,56 +19,10 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("WORLD_SERVICE_PORT").unwrap_or_else(|_| "50054".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string()); - - // let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string()); - let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string()); - let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string()); - let consul_url = format!("http://{}:{}", consul_address, consul_port); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string()); - let service_address = - env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone().parse().unwrap_or(50054); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - let db_nodes = get_service_address(&consul_url, "database-service").await?; - - let temp_db_nodes = get_service_endpoints_by_dns(&format!("{}:{}", consul_address, consul_dns_port), "grpc", "database-service").await?; - debug!("Temp DB Nodes: {:?}", temp_db_nodes); + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let mut meta = HashMap::new(); - meta.insert("name".to_string(), "Athena".to_string()); - consul_registration::start_health_check(addr.as_str()).await?; - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port, - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; - - // Start health-check endpoint - - let db_address = temp_db_nodes.get(0).unwrap(); - let db_url = format!( - "http://{}", - db_address - ); - debug!("DB URL: {:?}", db_url); - utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) }