diff --git a/.gitignore b/.gitignore index f027000..8f63e68 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ debug/ target/ +*/.env auth-service/.env database-service/.env diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 72d5383..f9010d2 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,19 +1,19 @@ -use std::collections::HashMap; use auth_service::auth::auth_service_server::AuthServiceServer; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; use auth_service::grpc::MyAuthService; use dotenv::dotenv; +use std::collections::HashMap; use std::env; use std::net::ToSocketAddrs; use std::str::FromStr; use tokio::{select, signal}; use tonic::transport::Server; -use tracing::{info, Level}; use tracing::log::debug; -use warp::Filter; +use tracing::{info, Level}; use utils::consul_registration; use utils::service_discovery::get_service_address; +use warp::Filter; #[tokio::main] async fn main() -> Result<(), Box> { @@ -54,10 +54,7 @@ async fn main() -> Result<(), Box> { .await?; // Start health-check endpoint - let health_route = warp::path!("health") - .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); - - tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + consul_registration::start_health_check(addr.as_str()).await?; let db_address = db_nodes.get(0).unwrap(); let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs index ded287c..2261e18 100644 --- a/packet-service/src/handlers/auth.rs +++ b/packet-service/src/handlers/auth.rs @@ -1,26 +1,30 @@ -use std::collections::HashMap; -use std::env; -use tonic::{Code, Status}; -use std::error::Error; -use std::sync::Arc; -use tokio::net::TcpStream; -use tokio::sync::Mutex; -use tracing::{debug, error, info, warn}; -use utils::service_discovery; use crate::auth_client::AuthClient; use crate::handlers::null_string::NullTerminatedString; use crate::packet::{send_packet, Packet, PacketPayload}; use crate::packet_type::PacketType; use crate::packets::cli_accept_req::CliAcceptReq; +use crate::packets::cli_channel_list_req::CliChannelListReq; use crate::packets::cli_join_server_req::CliJoinServerReq; use crate::packets::cli_login_req::CliLoginReq; -use crate::packets::{srv_accept_reply, srv_login_reply}; +use crate::packets::cli_srv_select_req::CliSrvSelectReq; use crate::packets::srv_accept_reply::SrvAcceptReply; +use crate::packets::srv_channel_list_reply::{ChannelInfo, SrvChannelListReply}; use crate::packets::srv_login_reply::{ServerInfo, SrvLoginReply}; +use crate::packets::srv_srv_select_reply::SrvSrvSelectReply; +use crate::packets::*; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::Mutex; +use tonic::{Code, Status}; +use tracing::{debug, error, info, warn}; +use utils::service_discovery; pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { - let data = CliAcceptReq::decode(packet.payload.as_slice()); - debug!("{:?}", data); + let request = CliAcceptReq::decode(packet.payload.as_slice()); + debug!("{:?}", request); // We need to do reply to this packet let data = SrvAcceptReply { result: srv_accept_reply::Result::Accepted, rand_value: 0 }; @@ -32,8 +36,8 @@ pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> } pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { - let data = CliJoinServerReq::decode(packet.payload.as_slice()); - debug!("{:?}", data); + let request = CliJoinServerReq::decode(packet.payload.as_slice()); + debug!("{:?}", request); Ok(()) } @@ -53,29 +57,35 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut send_packet(stream, &response_packet).await?; } else { debug!("Successfully logged in"); - + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let servers = service_discovery::get_service_address(&consul_url, "character-service").await.unwrap_or_else(|err| { warn!(err); Vec::new() }); - + if servers.len() == 0 { let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; return Ok(()); } - + let mut server_info: Vec = Vec::new(); let mut id = 0; for server in servers { - let name = server.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone(); + let mut name = server.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone(); let is_test = server.ServiceTags.contains(&"test".to_string()) || server.ServiceTags.contains(&"staging".to_string()); + if is_test { + name = format!("@{}", name); + } else { + name = format!(" {}", name); + } server_info.push(ServerInfo { test: u8::from(is_test), name: NullTerminatedString::new(&name), id}); id = id + 1; } - + debug!("Server info: {:?}", server_info); + let data = SrvLoginReply { result: srv_login_reply::Result::Ok, right: 0, type_: 0, servers_info: server_info }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; @@ -112,13 +122,51 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut } pub(crate) async fn handle_server_select_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { - let data = CliJoinServerReq::decode(packet.payload.as_slice()); - debug!("{:?}", data); + let request = CliSrvSelectReq::decode(packet.payload.as_slice()); + debug!("{:?}", request); + + let data = SrvSrvSelectReply { + result: srv_srv_select_reply::Result::Failed, + session_id: 0, + crypt_val: 0, + ip: NullTerminatedString::new(""), + port: 0, + }; + + let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?; + send_packet(stream, &response_packet).await?; Ok(()) } pub(crate) async fn handle_channel_list_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { - let data = CliJoinServerReq::decode(packet.payload.as_slice()); - debug!("{:?}", data); + let request = CliChannelListReq::decode(packet.payload.as_slice()); + debug!("{:?}", request); + + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let channels = service_discovery::get_service_address(&consul_url, "world-service").await.unwrap_or_else(|err| { + warn!(err); + Vec::new() + }); + + if channels.len() == 0 { + let data = SrvChannelListReply { id: request?.server_id, channels: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + + debug!("Server info: {:?}", channels); + let mut channel_info: Vec = Vec::new(); + let mut id = 0; + for channel in channels { + let name = format!("{}", channel.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone()); + channel_info.push(ChannelInfo { id: id, low_age: 0, high_age: 0, capacity: 0, name: NullTerminatedString::new(&name) }); + id = id + 1; + } + debug!("Channel info: {:?}", channel_info); + + let data = SrvChannelListReply { id: request?.server_id, channels: channel_info }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; Ok(()) } \ No newline at end of file diff --git a/packet-service/src/handlers/character.rs b/packet-service/src/handlers/character.rs new file mode 100644 index 0000000..e5739ee --- /dev/null +++ b/packet-service/src/handlers/character.rs @@ -0,0 +1,21 @@ +use crate::handlers::null_string::NullTerminatedString; +use crate::packet::{send_packet, Packet, PacketPayload}; +use crate::packet_type::PacketType; +use crate::packets::cli_char_list_req::CliCharListReq; +use crate::packets::*; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::Mutex; +use tonic::{Code, Status}; +use tracing::{debug, error, info, warn}; +use utils::service_discovery; + +pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { + let request = CliCharListReq::decode(packet.payload.as_slice()); + debug!("{:?}", request); + + Ok(()) +} \ No newline at end of file diff --git a/packet-service/src/handlers/mod.rs b/packet-service/src/handlers/mod.rs index 3d88be8..766bab5 100644 --- a/packet-service/src/handlers/mod.rs +++ b/packet-service/src/handlers/mod.rs @@ -1,2 +1,3 @@ +pub mod null_string; pub mod auth; -pub mod null_string; \ No newline at end of file +pub mod character; diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index 9b64e3a..c1d28e1 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -1,23 +1,23 @@ +use crate::auth_client::AuthClient; +use crate::bufferpool::BufferPool; +use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED}; +use crate::packet::Packet; +use dotenv::dotenv; use std::collections::HashMap; use std::env; use std::error::Error; use std::net::ToSocketAddrs; use std::str::FromStr; use std::sync::Arc; -use dotenv::dotenv; +use tokio::io::AsyncReadExt; use tokio::net::{TcpListener, TcpStream}; -use tokio::io::{AsyncReadExt}; -use tokio::{select, signal}; use tokio::sync::{Mutex, Semaphore}; -use tracing::{info, error, debug, warn}; +use tokio::{select, signal}; use tracing::Level; +use tracing::{debug, error, info, warn}; use utils::consul_registration; use utils::service_discovery::get_service_address; use warp::Filter; -use crate::auth_client::AuthClient; -use crate::bufferpool::BufferPool; -use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED}; -use crate::packet::Packet; mod packet_type; mod packet; @@ -88,7 +88,6 @@ async fn main() -> Result<(), Box> { let service_id = consul_registration::generate_service_id(); let tags = vec!["version-1.0".to_string()]; let mut meta = HashMap::new(); - meta.insert("name".to_string(), "Athena".to_string()); consul_registration::register_service( &consul_url, service_id.as_str(), @@ -102,17 +101,13 @@ async fn main() -> Result<(), Box> { .await?; // Start health-check endpoint - let health_route = warp::path!("health") - .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); - - tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + consul_registration::start_health_check(addr.as_str()).await?; let auth_address = auth_node.get(0).unwrap(); let auth_url = format!("http://{}:{}", auth_address.ServiceAddress, auth_address.ServicePort); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); let full_addr = format!("{}:{}", &addr, port); - // let address = full_addr.parse().expect("Invalid address"); tokio::spawn(async move { let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS)); diff --git a/packet-service/src/router.rs b/packet-service/src/router.rs index f7cd7ec..d378d2a 100644 --- a/packet-service/src/router.rs +++ b/packet-service/src/router.rs @@ -1,12 +1,12 @@ -use crate::packet::{Packet}; -use crate::handlers::{auth}; +use crate::auth_client::AuthClient; +use crate::handlers::*; +use crate::packet::Packet; use crate::packet_type::PacketType; use std::error::Error; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::Mutex; use tracing::{debug, warn}; -use crate::auth_client::AuthClient; pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: Arc>) -> Result<(), Box> { debug!("Routing packet: {:?}", packet); @@ -19,6 +19,9 @@ pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: A PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet).await, PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await, + // Character Stuff + PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet).await, + // 1 => chat::handle_chat(packet).await?, // 2 => movement::handle_movement(packet).await?, _ => { diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 8daf111..3869f9b 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -9,3 +9,5 @@ reqwest = { version = "0.12.9", features = ["json"] } tracing = "0.1" rand = "0.9.0-beta.1" uuid = { version = "1.11.0", features = ["v4"] } +warp = "0.3.7" +tokio = "1.41.1" diff --git a/utils/src/consul_registration.rs b/utils/src/consul_registration.rs index 23d9ed1..ca8a66a 100644 --- a/utils/src/consul_registration.rs +++ b/utils/src/consul_registration.rs @@ -1,7 +1,10 @@ -use std::collections::HashMap; use reqwest::Client; use serde::Serialize; +use std::collections::HashMap; +use std::env; +use std::net::ToSocketAddrs; use uuid::Uuid; +use warp::Filter; #[derive(Serialize)] struct ConsulRegistration { @@ -71,5 +74,18 @@ pub async fn deregister_service(consul_url: &str, service_id: &str) -> Result<() .await? .error_for_status()?; // Ensure response is successful + Ok(()) +} + +pub async fn start_health_check(service_address: &str) -> Result<(), Box> { + let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string()); + let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); + + // Start health-check endpoint + let health_route = warp::path!("health") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + + tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + Ok(()) } \ No newline at end of file