From b6f2d3f45657354a8f76a950aafa5e5b19c66eeaa57805ca61e2917b613d2599 Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Fri, 7 Mar 2025 21:03:15 -0500 Subject: [PATCH] - chore: ran cargo fix on the codebase --- api-service/build.rs | 5 +- api-service/src/axum_gateway.rs | 19 +- api-service/src/main.rs | 1 - auth-service/build.rs | 10 +- auth-service/src/database_client.rs | 62 ++++-- auth-service/src/grpc.rs | 102 ++++++--- auth-service/src/jwt.rs | 16 +- auth-service/src/lib.rs | 4 +- auth-service/src/main.rs | 38 ++-- .../src/mocks/database_client_mock.rs | 2 +- auth-service/src/users.rs | 25 ++- auth-service/tests/integration.rs | 25 ++- character-service/build.rs | 15 +- character-service/src/character_db_client.rs | 59 ++++-- character-service/src/character_service.rs | 112 +++++++--- character-service/src/main.rs | 33 +-- database-service/build.rs | 8 +- database-service/src/characters.rs | 84 ++++++-- database-service/src/db.rs | 4 +- .../src/grpc/character_service.rs | 21 +- database-service/src/grpc/database_service.rs | 2 +- database-service/src/grpc/mod.rs | 4 +- database-service/src/grpc/user_service.rs | 29 ++- database-service/src/lib.rs | 4 +- database-service/src/main.rs | 37 ++-- database-service/src/users.rs | 94 +++++++-- database-service/tests/get_user.rs | 4 +- database-service/tests/integration.rs | 2 +- database-service/tests/redis_cache.rs | 6 +- launcher/src/launcher.rs | 15 +- launcher/src/main.rs | 6 +- packet-service/Cargo.toml | 2 +- packet-service/build.rs | 9 +- packet-service/src/auth_client.rs | 40 +++- packet-service/src/bufferpool.rs | 2 +- packet-service/src/character_client.rs | 36 +++- packet-service/src/connection_service.rs | 14 +- packet-service/src/dataconsts.rs | 2 +- packet-service/src/enums.rs | 1 - packet-service/src/handlers/auth.rs | 196 ++++++++++++++---- packet-service/src/handlers/character.rs | 120 ++++++++--- packet-service/src/handlers/world.rs | 53 ++++- packet-service/src/main.rs | 61 ++++-- packet-service/src/metrics.rs | 5 +- packet-service/src/packet.rs | 13 +- packet-service/src/router.rs | 137 ++++++++++-- packet-service/src/types.rs | 6 +- session-service/build.rs | 15 +- session-service/src/main.rs | 39 ++-- session-service/src/session_service.rs | 39 ++-- utils/Cargo.toml | 17 +- utils/src/consul_registration.rs | 26 ++- utils/src/lib.rs | 2 +- utils/src/null_string.rs | 11 +- utils/src/redis_cache.rs | 92 ++++---- utils/src/service_discovery.rs | 24 ++- utils/src/signal_handler.rs | 8 +- world-service/build.rs | 7 +- world-service/src/main.rs | 22 +- 59 files changed, 1324 insertions(+), 523 deletions(-) diff --git a/api-service/build.rs b/api-service/build.rs index 728974a..2497004 100644 --- a/api-service/build.rs +++ b/api-service/build.rs @@ -1,6 +1,9 @@ fn main() -> Result<(), Box> { tonic_build::configure() .compile_well_known_types(true) - .compile_protos(&["../proto/common.proto", "../proto/auth.proto"], &["../proto"])?; + .compile_protos( + &["../proto/common.proto", "../proto/auth.proto"], + &["../proto"], + )?; Ok(()) } diff --git a/api-service/src/axum_gateway.rs b/api-service/src/axum_gateway.rs index cb6f859..2d74c48 100644 --- a/api-service/src/axum_gateway.rs +++ b/api-service/src/axum_gateway.rs @@ -1,15 +1,12 @@ use axum::extract::{ConnectInfo, State}; +use axum::http::Method; use axum::{routing::post, Json, Router}; use serde::{Deserialize, Serialize}; use std::env; use std::net::SocketAddr; use std::sync::Arc; -use axum::http::Method; use tonic::transport::Channel; use tower_http::cors::{Any, CorsLayer}; -use tower_http::trace::TraceLayer; -use tower::ServiceBuilder; - use auth::auth_service_client::AuthServiceClient; use auth::{LoginRequest, RegisterRequest}; @@ -60,7 +57,7 @@ async fn login_handler( let request = tonic::Request::new(LoginRequest { username: payload.username.clone(), password: payload.password.clone(), - ip_address + ip_address, }); let mut client = grpc_client.lock().await; // Lock the mutex to get mutable access @@ -99,11 +96,10 @@ async fn register_handler( Err(e) => { error!("gRPC Login call failed: {}", e); Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR) - }, + } } } - pub async fn serve_rest_api( grpc_client: Arc>>, ) -> Result<(), Box> { @@ -123,9 +119,12 @@ pub async fn serve_rest_api( let listener = tokio::net::TcpListener::bind(format!("{}:{}", addr, port)) .await .unwrap(); - axum::serve(listener, app.into_make_service_with_connect_info::()) - .await - .unwrap(); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .unwrap(); Ok(()) } diff --git a/api-service/src/main.rs b/api-service/src/main.rs index 0fc72a8..5d6b5e4 100644 --- a/api-service/src/main.rs +++ b/api-service/src/main.rs @@ -5,7 +5,6 @@ use std::env; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; -use tokio::{select, signal}; use tracing::{info, Level}; use utils::consul_registration; use utils::service_discovery::get_service_address; diff --git a/auth-service/build.rs b/auth-service/build.rs index e000692..a653b13 100644 --- a/auth-service/build.rs +++ b/auth-service/build.rs @@ -6,11 +6,17 @@ fn main() { .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") .compile_protos(&["../proto/auth.proto"], &["../proto"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); - + // gRPC Client code tonic_build::configure() .build_server(false) // Generate gRPC client code .compile_well_known_types(true) - .compile_protos(&["../proto/user_db_api.proto", "../proto/session_service_api.proto"], &["../proto"]) + .compile_protos( + &[ + "../proto/user_db_api.proto", + "../proto/session_service_api.proto", + ], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/auth-service/src/database_client.rs b/auth-service/src/database_client.rs index 51e2ff5..393532d 100644 --- a/auth-service/src/database_client.rs +++ b/auth-service/src/database_client.rs @@ -1,4 +1,7 @@ -use crate::database::{user_service_client::UserServiceClient, CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse}; +use crate::database::{ + user_service_client::UserServiceClient, CreateUserRequest, CreateUserResponse, + GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse, +}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use std::error::Error; @@ -7,14 +10,43 @@ use tonic::transport::Channel; #[async_trait] pub trait DatabaseClientTrait: Sized { async fn connect(endpoint: &str) -> Result>; - async fn get_user_by_userid(&mut self, user_id: i32) -> Result>; - async fn get_user_by_username(&mut self, user_id: &str) -> Result>; - async fn get_user_by_email(&mut self, email: &str) -> Result>; - async fn create_user(&mut self, username: &str, email: &str, password: &str) -> Result>; - async fn store_password_reset(&mut self, email: &str, reset_token: &str, expires_at: DateTime) -> Result<(), Box>; - async fn get_password_reset(&self, reset_token: &str) -> Result, Box>; - async fn delete_password_reset(&self, reset_token: &str) -> Result<(), Box>; - async fn update_user_password(&self, email: &str, hashed_password: &str) -> Result<(), Box>; + async fn get_user_by_userid( + &mut self, + user_id: i32, + ) -> Result>; + async fn get_user_by_username( + &mut self, + user_id: &str, + ) -> Result>; + async fn get_user_by_email( + &mut self, + email: &str, + ) -> Result>; + async fn create_user( + &mut self, + username: &str, + email: &str, + password: &str, + ) -> Result>; + async fn store_password_reset( + &mut self, + email: &str, + reset_token: &str, + expires_at: DateTime, + ) -> Result<(), Box>; + async fn get_password_reset( + &self, + reset_token: &str, + ) -> Result, Box>; + async fn delete_password_reset( + &self, + reset_token: &str, + ) -> Result<(), Box>; + async fn update_user_password( + &self, + email: &str, + hashed_password: &str, + ) -> Result<(), Box>; } #[derive(Clone)] pub struct DatabaseClient { @@ -39,9 +71,7 @@ impl DatabaseClientTrait for DatabaseClient { &mut self, user_id: i32, ) -> Result> { - let request = tonic::Request::new(GetUserRequest { - user_id, - }); + let request = tonic::Request::new(GetUserRequest { user_id }); let response = self.client.get_user(request).await?; Ok(response.into_inner()) } @@ -65,7 +95,12 @@ impl DatabaseClientTrait for DatabaseClient { Ok(response.into_inner()) } - async fn create_user(&mut self, username: &str, email: &str, password: &str) -> Result> { + async fn create_user( + &mut self, + username: &str, + email: &str, + password: &str, + ) -> Result> { let request = tonic::Request::new(CreateUserRequest { username: username.to_string(), email: email.to_string(), @@ -105,5 +140,4 @@ impl DatabaseClientTrait for DatabaseClient { ) -> Result<(), Box> { Ok(()) } - } diff --git a/auth-service/src/grpc.rs b/auth-service/src/grpc.rs index cf2b0bd..4b9e627 100644 --- a/auth-service/src/grpc.rs +++ b/auth-service/src/grpc.rs @@ -1,14 +1,18 @@ -use std::sync::Arc; use crate::auth::auth_service_server::AuthService; -use crate::auth::{LoginRequest, LoginResponse, PasswordResetRequest, PasswordResetResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest, ResetPasswordResponse, ValidateTokenRequest, ValidateTokenResponse, ValidateSessionRequest, ValidateSessionResponse, LogoutRequest}; -use crate::common::{Empty}; +use crate::auth::{ + LoginRequest, LoginResponse, LogoutRequest, PasswordResetRequest, PasswordResetResponse, + RegisterRequest, RegisterResponse, ResetPasswordRequest, ResetPasswordResponse, + ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse, +}; +use crate::common::Empty; use crate::database_client::{DatabaseClient, DatabaseClientTrait}; -use crate::session::session_service_client::SessionServiceClient; -use crate::session::{CreateSessionRequest, GetSessionRequest, DeleteSessionRequest}; use crate::jwt::{generate_token, validate_token}; +use crate::session::session_service_client::SessionServiceClient; +use crate::session::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest}; use crate::users::{hash_password, verify_user}; use chrono::{Duration, Utc}; use rand::Rng; +use std::sync::Arc; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, warn}; @@ -27,11 +31,19 @@ impl AuthService for MyAuthService { info!("Login attempt for username: {}", req.username); - if let Some(user) = verify_user(self.db_client.as_ref().clone(), &req.username, &req.password).await { + if let Some(user) = verify_user( + self.db_client.as_ref().clone(), + &req.username, + &req.password, + ) + .await + { let user_id = user.user_id.to_string(); let session_id = uuid::Uuid::new_v4().to_string(); let response = self - .session_client.as_ref().clone() + .session_client + .as_ref() + .clone() .create_session(CreateSessionRequest { session_id: session_id.clone(), user_id: user.user_id, @@ -40,7 +52,7 @@ impl AuthService for MyAuthService { ip_address: req.ip_address.to_string(), }) .await; - + let session = match response { Ok(session) => session, Err(_) => return Err(Status::internal("Session creation failed")), @@ -49,26 +61,29 @@ impl AuthService for MyAuthService { let token = generate_token(&user_id, &&session_id.clone(), user.roles) .map_err(|_| Status::internal("Token generation failed"))?; - + info!("Login successful for username: {}", req.username); - Ok(Response::new(LoginResponse { token, user_id, session_id })) + Ok(Response::new(LoginResponse { + token, + user_id, + session_id, + })) } else { warn!("Invalid login attempt for username: {}", req.username); Err(Status::unauthenticated("Invalid credentials")) } } - - async fn logout( - &self, - request: Request, - ) -> Result, Status> { + + async fn logout(&self, request: Request) -> Result, Status> { let req = request.into_inner(); - self.session_client.as_ref().clone() + self.session_client + .as_ref() + .clone() .delete_session(DeleteSessionRequest { session_id: req.session_id.clone(), }) .await?; - + Ok(Response::new(Empty {})) } @@ -81,7 +96,9 @@ impl AuthService for MyAuthService { match validate_token(&req.token) { Ok(user_data) => { let response = self - .session_client.as_ref().clone() + .session_client + .as_ref() + .clone() .get_session(GetSessionRequest { session_id: user_data.1.clone(), }) @@ -104,8 +121,7 @@ impl AuthService for MyAuthService { })) } } - - }, + } Err(_) => Ok(Response::new(ValidateTokenResponse { valid: false, user_id: "".to_string(), @@ -120,7 +136,9 @@ impl AuthService for MyAuthService { ) -> Result, Status> { let req = request.into_inner(); let response = self - .session_client.as_ref().clone() + .session_client + .as_ref() + .clone() .get_session(GetSessionRequest { session_id: req.session_id, }) @@ -144,7 +162,9 @@ impl AuthService for MyAuthService { ) -> Result, Status> { let req = request.into_inner(); let response = self - .session_client.as_ref().clone() + .session_client + .as_ref() + .clone() .refresh_session(GetSessionRequest { session_id: req.session_id, }) @@ -172,7 +192,11 @@ impl AuthService for MyAuthService { let hashed_password = hash_password(&req.password); // Create user in the database - let result = self.db_client.as_ref().clone().create_user(&req.username, &req.email, &hashed_password) + let result = self + .db_client + .as_ref() + .clone() + .create_user(&req.username, &req.email, &hashed_password) .await; match result { @@ -193,7 +217,12 @@ impl AuthService for MyAuthService { ) -> Result, Status> { let email = request.into_inner().email; - let user = self.db_client.as_ref().clone().get_user_by_email(&email).await; + let user = self + .db_client + .as_ref() + .clone() + .get_user_by_email(&email) + .await; // Check if the email exists if user.ok().is_some() { @@ -203,12 +232,14 @@ impl AuthService for MyAuthService { .take(32) .map(char::from) .collect(); - + // Set token expiration (e.g., 1 hour) let expires_at = Utc::now() + Duration::hours(1); - + // Store the reset token in the database - self.db_client.as_ref().clone() + self.db_client + .as_ref() + .clone() .store_password_reset(&email, &reset_token, expires_at) .await .map_err(|e| Status::internal(format!("Database error: {}", e)))?; @@ -238,27 +269,32 @@ impl AuthService for MyAuthService { let req = request.into_inner(); // Validate the reset token - if let Some(password_reset) = self.db_client.clone().get_password_reset(&req.reset_token).await - .map_err(|e| Status::internal(format!("Database error: {}", e)))? { + if let Some(password_reset) = self + .db_client + .clone() + .get_password_reset(&req.reset_token) + .await + .map_err(|e| Status::internal(format!("Database error: {}", e)))? + { if password_reset.expires_at < Utc::now() { return Err(Status::unauthenticated("Token expired")); } - + // Hash the new password let hashed_password = hash_password(&req.new_password); - + // Update the user's password self.db_client .update_user_password(&password_reset.email, &hashed_password) .await .map_err(|e| Status::internal(format!("Database error: {}", e)))?; - + // Delete the reset token self.db_client .delete_password_reset(&req.reset_token) .await .map_err(|e| Status::internal(format!("Database error: {}", e)))?; - + Ok(Response::new(ResetPasswordResponse { message: "Password successfully reset".to_string(), })) diff --git a/auth-service/src/jwt.rs b/auth-service/src/jwt.rs index d217dde..5df36ff 100644 --- a/auth-service/src/jwt.rs +++ b/auth-service/src/jwt.rs @@ -4,13 +4,17 @@ use std::env; #[derive(Debug, Serialize, Deserialize)] struct Claims { - sub: String, // Subject (user ID) + sub: String, // Subject (user ID) session_id: String, // Session ID roles: Vec, // Roles/permissions - exp: usize, // Expiration time + exp: usize, // Expiration time } -pub fn generate_token(user_id: &str, session_id: &str, roles: Vec) -> Result { +pub fn generate_token( + user_id: &str, + session_id: &str, + roles: Vec, +) -> Result { let secret = env::var("JWT_SECRET").expect("JWT_SECRET must be set"); let expiration = chrono::Utc::now() .checked_add_signed(chrono::Duration::days(1)) @@ -24,7 +28,11 @@ pub fn generate_token(user_id: &str, session_id: &str, roles: Vec) -> Re exp: expiration, }; - encode(&Header::default(), &claims, &EncodingKey::from_secret(secret.as_ref())) + encode( + &Header::default(), + &claims, + &EncodingKey::from_secret(secret.as_ref()), + ) } pub fn validate_token(token: &str) -> Result<(String, String), jsonwebtoken::errors::Error> { diff --git a/auth-service/src/lib.rs b/auth-service/src/lib.rs index 5e76fa2..a1524d4 100644 --- a/auth-service/src/lib.rs +++ b/auth-service/src/lib.rs @@ -1,6 +1,6 @@ +pub mod database_client; pub mod grpc; pub mod jwt; -pub mod database_client; pub mod users; @@ -20,4 +20,4 @@ pub mod session { } #[cfg(test)] -pub mod mocks; \ No newline at end of file +pub mod mocks; diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 3d2fcd8..c9cffa9 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,14 +1,13 @@ use auth_service::auth::auth_service_server::AuthServiceServer; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; -use auth_service::session::session_service_client::SessionServiceClient; use auth_service::grpc::MyAuthService; +use auth_service::session::session_service_client::SessionServiceClient; use dotenv::dotenv; use std::collections::HashMap; use std::env; use std::str::FromStr; use std::sync::Arc; -use tokio::{select, signal}; use tonic::transport::Server; use tracing::{info, Level}; use utils::consul_registration; @@ -20,7 +19,10 @@ async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); // Set the gRPC server address @@ -41,7 +43,7 @@ async fn main() -> Result<(), Box> { let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); let version = env!("CARGO_PKG_VERSION").to_string(); let tags = vec![version]; - let mut meta = HashMap::new(); + let meta = HashMap::new(); consul_registration::register_service( &consul_url, service_id.as_str(), @@ -52,36 +54,46 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; let db_address = db_nodes.get(0).unwrap(); - let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); + let db_url = format!( + "http://{}:{}", + db_address.ServiceAddress, db_address.ServicePort + ); let db_client = Arc::new(DatabaseClient::connect(&db_url).await?); - + let session_address = session_nodes.get(0).unwrap(); - let session_address = format!("http://{}:{}", session_address.ServiceAddress, session_address.ServicePort); + let session_address = format!( + "http://{}:{}", + session_address.ServiceAddress, session_address.ServicePort + ); let session_client = Arc::new(SessionServiceClient::connect(session_address).await?); let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); let auth_service = MyAuthService { db_client, - session_client + session_client, }; println!("Authentication Service running on {}", addr); // Start the gRPC server - tokio::spawn(Server::builder() - .add_service(AuthServiceServer::new(auth_service)) - .serve(address)); + tokio::spawn( + Server::builder() + .add_service(AuthServiceServer::new(auth_service)) + .serve(address), + ); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); info!("service {} deregistered", service_name); Ok(()) } diff --git a/auth-service/src/mocks/database_client_mock.rs b/auth-service/src/mocks/database_client_mock.rs index 9057215..466a199 100644 --- a/auth-service/src/mocks/database_client_mock.rs +++ b/auth-service/src/mocks/database_client_mock.rs @@ -27,4 +27,4 @@ impl Clone for MockDatabaseClient { fn clone(&self) -> Self { MockDatabaseClient::new() // Create a new mock instance } -} \ No newline at end of file +} diff --git a/auth-service/src/users.rs b/auth-service/src/users.rs index 309c334..2efde31 100644 --- a/auth-service/src/users.rs +++ b/auth-service/src/users.rs @@ -1,27 +1,32 @@ -use crate::database_client::DatabaseClientTrait; use crate::database::GetUserResponse; +use crate::database_client::DatabaseClientTrait; use argon2::{ - password_hash::{ - rand_core::OsRng, - PasswordHash, PasswordHasher, PasswordVerifier, SaltString - }, - Argon2 + password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString}, + Argon2, }; pub fn hash_password(password: &str) -> String { let salt = SaltString::generate(&mut OsRng); let argon2 = Argon2::default(); - argon2.hash_password(password.as_ref(), &salt).unwrap().to_string() + argon2 + .hash_password(password.as_ref(), &salt) + .unwrap() + .to_string() } pub fn verify_password(password: &str, hash: &str) -> bool { let parsed_hash = PasswordHash::new(&hash).unwrap(); - Argon2::default().verify_password(password.as_bytes(), &parsed_hash).is_ok() + Argon2::default() + .verify_password(password.as_bytes(), &parsed_hash) + .is_ok() } -pub async fn verify_user(mut db_client: T, - username: &str, password: &str) -> Option { +pub async fn verify_user( + mut db_client: T, + username: &str, + password: &str, +) -> Option { let user = db_client.get_user_by_username(username).await.ok()?; if verify_password(password, &user.hashed_password) { diff --git a/auth-service/tests/integration.rs b/auth-service/tests/integration.rs index a999fc3..909499a 100644 --- a/auth-service/tests/integration.rs +++ b/auth-service/tests/integration.rs @@ -3,12 +3,11 @@ mod tests { use dotenv::dotenv; // use auth_service::mocks::database_client_mock::MockDatabaseClient; - #[tokio::test] async fn test_login() { // dotenv().ok(); // let mut db_client = MockDatabaseClient::new(); - // + // // db_client // .expect_get_user_by_username() // .with(mockall::predicate::eq("test")) @@ -20,21 +19,21 @@ mod tests { // hashed_password: "test".to_string(), // }) // }); - // - // + // + // // let auth_service = MyAuthService { // db_client, // }; - // + // // // Create a test LoginRequest // let request = Request::new(LoginRequest { // username: "test".into(), // password: "test".into(), // }); - // + // // // Call the login method // let response = auth_service.login(request).await.unwrap().into_inner(); - // + // // // Verify the response // assert!(!response.token.is_empty()); // assert_eq!(response.user_id, "1"); // Replace with the expected user ID @@ -45,22 +44,22 @@ mod tests { dotenv().ok(); // let addr = std::env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1:50052".to_string()); // let db_client = DatabaseClient::connect(&addr).await.unwrap(); - // + // // let auth_service = MyAuthService { // db_client, // }; - // + // // // Generate a token for testing // let token = jwt::generate_token("123", Vec::from(["".to_string()])).unwrap(); - // + // // // Create a ValidateTokenRequest // let request = Request::new(ValidateTokenRequest { token }); - // + // // // Call the validate_token method // let response = auth_service.validate_token(request).await.unwrap().into_inner(); - // + // // // Verify the response // assert!(response.valid); // assert_eq!(response.user_id, "123"); } -} \ No newline at end of file +} diff --git a/character-service/build.rs b/character-service/build.rs index ef399ad..8d30228 100644 --- a/character-service/build.rs +++ b/character-service/build.rs @@ -4,13 +4,22 @@ fn main() { .build_server(true) // Generate gRPC server code .compile_well_known_types(true) .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile_protos(&["../proto/character_common.proto", "../proto/character.proto"], &["../proto"]) + .compile_protos( + &[ + "../proto/character_common.proto", + "../proto/character.proto", + ], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); - + // gRPC Client code tonic_build::configure() .build_server(false) // Generate gRPC client code .compile_well_known_types(true) - .compile_protos(&["../proto/character_db_api.proto", "../proto/auth.proto"], &["../proto"]) + .compile_protos( + &["../proto/character_db_api.proto", "../proto/auth.proto"], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/character-service/src/character_db_client.rs b/character-service/src/character_db_client.rs index 7a1b66f..bb4e2bc 100644 --- a/character-service/src/character_db_client.rs +++ b/character-service/src/character_db_client.rs @@ -1,7 +1,11 @@ -use tonic::transport::Channel; -use serde::{Deserialize, Serialize}; use crate::database::character_db_service_client::CharacterDbServiceClient; -use crate::database::{CharacterRequest, Character, CharacterListRequest, CharacterListResponse, CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, DeleteCharacterResponse}; +use crate::database::{ + Character, CharacterListRequest, CharacterListResponse, CharacterRequest, + CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, + DeleteCharacterResponse, +}; +use serde::{Deserialize, Serialize}; +use tonic::transport::Channel; #[derive(Clone)] pub struct CharacterDbClient { @@ -71,7 +75,11 @@ impl CharacterDbClient { Ok(Self { client }) } - pub async fn get_character(&mut self, user_id: &str, char_id: &str) -> Result> { + pub async fn get_character( + &mut self, + user_id: &str, + char_id: &str, + ) -> Result> { let request = tonic::Request::new(CharacterRequest { user_id: user_id.parse().unwrap(), character_id: char_id.parse().unwrap(), @@ -80,7 +88,10 @@ impl CharacterDbClient { Ok(response.into_inner()) } - pub async fn get_character_list(&mut self, user_id: &str) -> Result> { + pub async fn get_character_list( + &mut self, + user_id: &str, + ) -> Result> { let request = tonic::Request::new(CharacterListRequest { user_id: user_id.parse().unwrap(), }); @@ -88,12 +99,20 @@ impl CharacterDbClient { Ok(response.into_inner()) } - pub async fn create_character(&mut self, user_id: &str, name: &str, race: i32, face: i32, hair: i32, stone: i32) -> Result> { + pub async fn create_character( + &mut self, + user_id: &str, + name: &str, + race: i32, + face: i32, + hair: i32, + stone: i32, + ) -> Result> { let mut hatid = 221; if 0 == race { hatid = 222; } - + let inventory = vec![ Item { item_id: 30, @@ -135,7 +154,7 @@ impl CharacterDbClient { slot: 12, }, ]; - + let stats = Stats { job: 0, str: 10, @@ -160,8 +179,19 @@ impl CharacterDbClient { pat_cooldown_time: 0, }; - let looks = Looks {race, face, hair, stone}; - let position = Position {map_id: 20, x: 5200.00, y: 5200.00, z: 1.0, spawn_id: 1}; + let looks = Looks { + race, + face, + hair, + stone, + }; + let position = Position { + map_id: 20, + x: 5200.00, + y: 5200.00, + z: 1.0, + spawn_id: 1, + }; let request = tonic::Request::new(CreateCharacterRequest { user_id: user_id.parse().unwrap(), @@ -175,7 +205,12 @@ impl CharacterDbClient { Ok(response.into_inner()) } - pub async fn delete_character(&mut self, user_id: &str, char_id: &str, delete_type: i32) -> Result> { + pub async fn delete_character( + &mut self, + user_id: &str, + char_id: &str, + delete_type: i32, + ) -> Result> { let request = tonic::Request::new(DeleteCharacterRequest { user_id: user_id.parse().unwrap(), character_id: char_id.parse().unwrap(), @@ -184,4 +219,4 @@ impl CharacterDbClient { let response = self.client.delete_character(request).await?; Ok(response.into_inner()) } -} \ No newline at end of file +} diff --git a/character-service/src/character_service.rs b/character-service/src/character_service.rs index 800c28d..90ff37d 100644 --- a/character-service/src/character_service.rs +++ b/character-service/src/character_service.rs @@ -1,11 +1,14 @@ -use std::sync::Arc; -use tracing::debug; -use tonic::{Request, Response, Status}; -use utils::null_string::NullTerminatedString; use crate::character_db_client::CharacterDbClient; use crate::character_service::character::character_service_server::CharacterService; -use crate::character_service::character::{CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, DeleteCharacterResponse, GetCharacterListRequest, GetCharacterListResponse, GetCharacterRequest, GetCharacterResponse}; -use crate::character_service::character_common::{Character, CharacterFull, Looks, Stats}; +use crate::character_service::character::{ + CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, + DeleteCharacterResponse, GetCharacterListRequest, GetCharacterListResponse, + GetCharacterRequest, GetCharacterResponse, +}; +use crate::character_service::character_common::{Character, CharacterFull}; +use std::sync::Arc; +use tonic::{Request, Response, Status}; +use tracing::debug; pub mod character_common { tonic::include_proto!("character_common"); @@ -20,60 +23,103 @@ pub struct MyCharacterService { #[tonic::async_trait] impl CharacterService for MyCharacterService { - async fn get_character_list(&self, request: Request) -> Result, Status> { + async fn get_character_list( + &self, + request: Request, + ) -> Result, Status> { let req = request.into_inner(); let user_id = req.user_id; debug!("Character list for User ID: {}", user_id); - let character_list = self.character_db_client.as_ref().clone().get_character_list(&user_id).await + let character_list = self + .character_db_client + .as_ref() + .clone() + .get_character_list(&user_id) + .await .map_err(|_| Status::aborted("Unable to get character list"))?; debug!("{:?}", character_list.characters); let mut characters: Vec = vec![]; for character in character_list.characters { - characters.push( - Character { - character_id: character.id.to_string(), - name: character.name, - last_played: 0, - delete_time: character.deleted_at.parse().unwrap_or_default(), - stats: serde_json::from_str(&character.stats).unwrap(), - looks: serde_json::from_str(&character.looks).unwrap(), - items: serde_json::from_str(&character.inventory).unwrap(), - } - ) + characters.push(Character { + character_id: character.id.to_string(), + name: character.name, + last_played: 0, + delete_time: character.deleted_at.parse().unwrap_or_default(), + stats: serde_json::from_str(&character.stats).unwrap(), + looks: serde_json::from_str(&character.looks).unwrap(), + items: serde_json::from_str(&character.inventory).unwrap(), + }) } let response = GetCharacterListResponse { characters }; Ok(Response::new(response)) } - async fn create_character(&self, request: Request) -> Result, Status> { + async fn create_character( + &self, + request: Request, + ) -> Result, Status> { let req = request.into_inner(); debug!("{:?}", req); - - let create_character_response = self.character_db_client.as_ref().clone().create_character(&req.user_id, &req.name, req.race, req.face, req.hair, req.stone) + + let create_character_response = self + .character_db_client + .as_ref() + .clone() + .create_character( + &req.user_id, + &req.name, + req.race, + req.face, + req.hair, + req.stone, + ) .await .map_err(|_| Status::aborted("Unable to create character"))?; - - let response = CreateCharacterResponse { result: create_character_response.result }; + + let response = CreateCharacterResponse { + result: create_character_response.result, + }; Ok(Response::new(response)) } - async fn delete_character(&self, request: Request) -> Result, Status> { + async fn delete_character( + &self, + request: Request, + ) -> Result, Status> { let req = request.into_inner(); debug!("{:?}", req); - let delete_character_response = self.character_db_client.as_ref().clone().delete_character(&req.user_id, &req.char_id, req.delete_type).await.map_err(|_| Status::not_found("Character not found"))?; - let response = DeleteCharacterResponse { remaining_time: delete_character_response.remaining_time, name: delete_character_response.name }; + let delete_character_response = self + .character_db_client + .as_ref() + .clone() + .delete_character(&req.user_id, &req.char_id, req.delete_type) + .await + .map_err(|_| Status::not_found("Character not found"))?; + let response = DeleteCharacterResponse { + remaining_time: delete_character_response.remaining_time, + name: delete_character_response.name, + }; Ok(Response::new(response)) } - async fn get_character(&self, request: Request) -> Result, Status> { + async fn get_character( + &self, + request: Request, + ) -> Result, Status> { let req = request.into_inner(); debug!("{:?}", req); - let get_character_response = self.character_db_client.as_ref().clone().get_character(&req.user_id, &req.char_id).await.map_err(|_| Status::not_found("Character not found"))?; + let get_character_response = self + .character_db_client + .as_ref() + .clone() + .get_character(&req.user_id, &req.char_id) + .await + .map_err(|_| Status::not_found("Character not found"))?; let character = CharacterFull { character_id: get_character_response.id.to_string(), @@ -84,8 +130,10 @@ impl CharacterService for MyCharacterService { skills: serde_json::from_str(&get_character_response.skills).unwrap(), items: serde_json::from_str(&get_character_response.inventory).unwrap(), }; - - let response = GetCharacterResponse { character: Some(character) }; + + let response = GetCharacterResponse { + character: Some(character), + }; Ok(Response::new(response)) } -} \ No newline at end of file +} diff --git a/character-service/src/main.rs b/character-service/src/main.rs index 1e111ae..b711e46 100644 --- a/character-service/src/main.rs +++ b/character-service/src/main.rs @@ -1,27 +1,29 @@ -mod character_service; mod character_db_client; +mod character_service; pub mod database { tonic::include_proto!("character_db_api"); } +use crate::character_db_client::CharacterDbClient; +use crate::character_service::character::character_service_server::CharacterServiceServer; +use crate::character_service::MyCharacterService; use dotenv::dotenv; use std::collections::HashMap; use std::env; use std::str::FromStr; use std::sync::Arc; -use tokio::{select, signal}; -use tracing::{info, Level}; +use tracing::Level; use utils::consul_registration; use utils::service_discovery::get_service_address; -use crate::character_db_client::CharacterDbClient; -use crate::character_service::character::character_service_server::CharacterServiceServer; -use crate::character_service::MyCharacterService; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); // Set the gRPC server address @@ -31,7 +33,8 @@ async fn main() -> Result<(), Box> { let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string()); - let service_address = env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let service_address = + env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let service_port = port.clone(); let health_check_url = format!("http://{}:{}/health", service_address, health_port); let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); @@ -53,7 +56,7 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; @@ -61,10 +64,13 @@ async fn main() -> Result<(), Box> { let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); let db_address = db_nodes.get(0).unwrap(); - let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); + let db_url = format!( + "http://{}:{}", + db_address.ServiceAddress, db_address.ServicePort + ); let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?); let character_service = MyCharacterService { - character_db_client + character_db_client, }; tonic::transport::Server::builder() @@ -72,9 +78,10 @@ async fn main() -> Result<(), Box> { .serve(address) .await?; - utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); Ok(()) } diff --git a/database-service/build.rs b/database-service/build.rs index 0a7196c..bdaa362 100644 --- a/database-service/build.rs +++ b/database-service/build.rs @@ -3,6 +3,12 @@ fn main() { .build_server(true) .compile_well_known_types(true) .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile_protos(&["../proto/user_db_api.proto", "../proto/character_db_api.proto"], &["../proto"]) + .compile_protos( + &[ + "../proto/user_db_api.proto", + "../proto/character_db_api.proto", + ], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/database-service/src/characters.rs b/database-service/src/characters.rs index 7950e11..9d1eec2 100644 --- a/database-service/src/characters.rs +++ b/database-service/src/characters.rs @@ -1,8 +1,8 @@ +use serde::{Deserialize, Serialize}; use sqlx::{FromRow, Row}; -use utils::redis_cache::{Cache, RedisCache}; // Import RedisCache -use serde::{Serialize, Deserialize}; use std::sync::Arc; use tokio::sync::Mutex; +use utils::redis_cache::{Cache, RedisCache}; // Import RedisCache #[derive(Debug, FromRow, Serialize, Deserialize)] pub struct Character { @@ -34,7 +34,14 @@ impl CharacterRepository { let cache_key = format!("character:{}", character_id); // Try fetching from Redis cache - if let Some(character) = self.cache.lock().await.get::(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? { + if let Some(character) = self + .cache + .lock() + .await + .get::(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)? + { return Ok(character); } @@ -49,11 +56,24 @@ impl CharacterRepository { .await?; // Cache result - self.cache.lock().await.set(&cache_key, &character, 300).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .set(&cache_key, &character, 300) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(character) } - pub async fn create_character(&self, user_id: i32, name: &str, inventory: serde_json::Value, stats: serde_json::Value, looks: serde_json::Value, position: serde_json::Value) -> Result { + pub async fn create_character( + &self, + user_id: i32, + name: &str, + inventory: serde_json::Value, + stats: serde_json::Value, + looks: serde_json::Value, + position: serde_json::Value, + ) -> Result { let result = sqlx::query( "INSERT INTO characters (user_id, name, inventory, stats, looks, position, created_at, updated_at, is_active) \ VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW(), true) RETURNING id", @@ -69,35 +89,62 @@ impl CharacterRepository { // Invalidate cache let cache_key = format!("character:user:{}", user_id); - self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .delete(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(result.get("id")) } - pub async fn delete_character(&self, character_id: i32, delete_type: i32) -> Result { - let mut query = "UPDATE characters SET updated_at = NOW(), deleted_at = NOW() + '24 hours' WHERE id = $1 RETURNING user_id, extract(epoch from (deleted_at - now()))::BIGINT as deleted_at"; + pub async fn delete_character( + &self, + character_id: i32, + delete_type: i32, + ) -> Result { + let mut query = "UPDATE characters SET updated_at = NOW(), deleted_at = NOW() + '24 hours' WHERE id = $1 RETURNING user_id, extract(epoch from (deleted_at - now()))::BIGINT as deleted_at"; if 0 == delete_type { query = "UPDATE characters SET updated_at = NOW(), deleted_at = null WHERE id = $1 RETURNING user_id, 0::BIGINT as deleted_at"; } - let result = sqlx::query( - query, - ) + let result = sqlx::query(query) .bind(character_id) .fetch_one(&self.pool) .await?; // Invalidate cache let cache_key = format!("character:user:{}", result.get::("user_id")); - self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .delete(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; let cache_key = format!("character:{}", character_id); - self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .delete(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(result.get::("deleted_at")) } - pub async fn get_characters_by_user(&self, user_id: i32) -> Result, sqlx::Error> { + pub async fn get_characters_by_user( + &self, + user_id: i32, + ) -> Result, sqlx::Error> { let cache_key = format!("character:user:{}", user_id); // Try fetching from Redis cache - if let Some(characters) = self.cache.lock().await.get::>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? { + if let Some(characters) = self + .cache + .lock() + .await + .get::>(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)? + { return Ok(characters); } @@ -110,7 +157,12 @@ impl CharacterRepository { .await?; // Cache result - self.cache.lock().await.set(&cache_key, &characters, 300).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .set(&cache_key, &characters, 300) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(characters) } } diff --git a/database-service/src/db.rs b/database-service/src/db.rs index 9686201..2f81af6 100644 --- a/database-service/src/db.rs +++ b/database-service/src/db.rs @@ -1,9 +1,9 @@ -use crate::users::UserRepository; use crate::characters::CharacterRepository; -use utils::redis_cache::RedisCache; +use crate::users::UserRepository; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::Mutex; +use utils::redis_cache::RedisCache; pub struct Database { pub user_repo: Arc, diff --git a/database-service/src/grpc/character_service.rs b/database-service/src/grpc/character_service.rs index ebd6a56..32ad762 100644 --- a/database-service/src/grpc/character_service.rs +++ b/database-service/src/grpc/character_service.rs @@ -1,7 +1,10 @@ -use serde_json::Value::Null; -use crate::grpc::{Character, CharacterRequest, CharacterListRequest, CharacterListResponse, CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, DeleteCharacterResponse}; use crate::grpc::character_db_service_server::CharacterDbService; use crate::grpc::database_service::MyDatabaseService; +use crate::grpc::{ + Character, CharacterListRequest, CharacterListResponse, CharacterRequest, + CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, + DeleteCharacterResponse, +}; use tonic::{Request, Response, Status}; #[tonic::async_trait] @@ -18,11 +21,11 @@ impl CharacterDbService for MyDatabaseService { .await .map_err(|_| Status::not_found("Character not found"))?; - let mut deleted_at= "".to_string(); + let mut deleted_at = "".to_string(); if character.deleted_at.is_some() { deleted_at = character.deleted_at.unwrap().to_string(); } - + let response = Character { id: character.id, user_id: character.user_id, @@ -52,12 +55,11 @@ impl CharacterDbService for MyDatabaseService { .get_characters_by_user(req.user_id) .await .map_err(|_| Status::not_found("Character not found"))?; - + let mut character_list: Vec = Vec::new(); - for character in characters { - let mut deleted_at= "".to_string(); + let mut deleted_at = "".to_string(); if character.deleted_at.is_some() { deleted_at = character.deleted_at.unwrap_or_default().to_string(); } @@ -91,7 +93,7 @@ impl CharacterDbService for MyDatabaseService { ) -> Result, Status> { let req = request.into_inner(); let repo = &self.db.character_repo; - + //todo: we need to check if the character name exists already let character_id = repo @@ -126,7 +128,8 @@ impl CharacterDbService for MyDatabaseService { let req = request.into_inner(); let repo = &self.db.character_repo; - let time_left_in_seconds = repo.delete_character(req.character_id, req.delete_type) + let time_left_in_seconds = repo + .delete_character(req.character_id, req.delete_type) .await .map_err(|_| Status::internal("Failed to delete character"))?; diff --git a/database-service/src/grpc/database_service.rs b/database-service/src/grpc/database_service.rs index e12d691..1f49c7d 100644 --- a/database-service/src/grpc/database_service.rs +++ b/database-service/src/grpc/database_service.rs @@ -4,4 +4,4 @@ use std::sync::Arc; #[derive(Clone)] pub struct MyDatabaseService { pub db: Arc, // Use the Database struct from users.rs -} \ No newline at end of file +} diff --git a/database-service/src/grpc/mod.rs b/database-service/src/grpc/mod.rs index e43987d..09e3ca8 100644 --- a/database-service/src/grpc/mod.rs +++ b/database-service/src/grpc/mod.rs @@ -1,6 +1,6 @@ +mod character_service; pub mod database_service; pub mod user_service; -mod character_service; tonic::include_proto!("user_db_api"); -tonic::include_proto!("character_db_api"); \ No newline at end of file +tonic::include_proto!("character_db_api"); diff --git a/database-service/src/grpc/user_service.rs b/database-service/src/grpc/user_service.rs index 01ef0d9..28c522f 100644 --- a/database-service/src/grpc/user_service.rs +++ b/database-service/src/grpc/user_service.rs @@ -1,6 +1,9 @@ -use crate::grpc::{CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse}; -use crate::grpc::user_service_server::UserService; use crate::grpc::database_service::MyDatabaseService; +use crate::grpc::user_service_server::UserService; +use crate::grpc::{ + CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, + GetUserRequest, GetUserResponse, +}; use tonic::{Request, Response, Status}; #[tonic::async_trait] @@ -11,7 +14,10 @@ impl UserService for MyDatabaseService { ) -> Result, Status> { let req = request.into_inner(); - let user = self.db.user_repo.get_user_by_id(req.user_id) + let user = self + .db + .user_repo + .get_user_by_id(req.user_id) .await .map_err(|_| Status::not_found("User not found"))?; @@ -30,7 +36,10 @@ impl UserService for MyDatabaseService { ) -> Result, Status> { let req = request.into_inner(); - let user_id = self.db.user_repo.create_user(&req.username, &req.email, &req.hashed_password) + let user_id = self + .db + .user_repo + .create_user(&req.username, &req.email, &req.hashed_password) .await .map_err(|_| Status::internal("Failed to create user"))?; @@ -44,7 +53,10 @@ impl UserService for MyDatabaseService { ) -> Result, Status> { let req = request.into_inner(); - let user = self.db.user_repo.get_user_by_username(&req.username) + let user = self + .db + .user_repo + .get_user_by_username(&req.username) .await .map_err(|_| Status::not_found("User not found"))?; @@ -63,7 +75,10 @@ impl UserService for MyDatabaseService { ) -> Result, Status> { let req = request.into_inner(); - let user = self.db.user_repo.get_user_by_email(&req.email) + let user = self + .db + .user_repo + .get_user_by_email(&req.email) .await .map_err(|_| Status::not_found("User not found"))?; @@ -75,4 +90,4 @@ impl UserService for MyDatabaseService { roles: user.roles.unwrap_or_else(Vec::new), })) } -} \ No newline at end of file +} diff --git a/database-service/src/lib.rs b/database-service/src/lib.rs index 2b328f7..0f8b22c 100644 --- a/database-service/src/lib.rs +++ b/database-service/src/lib.rs @@ -1,4 +1,4 @@ -pub mod users; pub mod characters; pub mod db; -pub mod grpc; \ No newline at end of file +pub mod grpc; +pub mod users; diff --git a/database-service/src/main.rs b/database-service/src/main.rs index b28629a..ce4b73b 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -1,25 +1,27 @@ -use std::collections::HashMap; use database_service::db::Database; +use database_service::grpc::character_db_service_server::CharacterDbServiceServer; use database_service::grpc::database_service::MyDatabaseService; use database_service::grpc::user_service_server::UserServiceServer; -use database_service::grpc::character_db_service_server::CharacterDbServiceServer; -use utils::redis_cache::RedisCache; use dotenv::dotenv; use sqlx::postgres::PgPoolOptions; +use std::collections::HashMap; use std::env; use std::str::FromStr; use std::sync::Arc; -use tokio::{select, signal}; use tokio::sync::Mutex; use tonic::transport::Server; use tracing::{info, Level}; use utils::consul_registration; +use utils::redis_cache::RedisCache; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); @@ -27,14 +29,16 @@ async fn main() -> Result<(), Box> { let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8080".to_string()); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string()); - let service_address = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let service_address = + env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let service_port = port.clone(); let health_check_url = format!("http://{}:{}/health", service_address, health_port); - + // Register service with Consul let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); let version = env!("CARGO_PKG_VERSION").to_string(); @@ -50,7 +54,7 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; consul_registration::start_health_check(addr.as_str()).await?; @@ -62,20 +66,23 @@ async fn main() -> Result<(), Box> { .await .expect("Failed to create PostgreSQL connection pool"); - let redis_cache = Arc::new(Mutex::new(RedisCache::new(&redis_url))); let db = Arc::new(Database::new(pool, redis_cache)); let my_service = MyDatabaseService { db }; // Pass `shared_cache` into services as needed info!("Database Service running on {}", address); - tokio::spawn(Server::builder() - .add_service(UserServiceServer::new(my_service.clone())) - .add_service(CharacterDbServiceServer::new(my_service)) - .serve(address)); + tokio::spawn( + Server::builder() + .add_service(UserServiceServer::new(my_service.clone())) + .add_service(CharacterDbServiceServer::new(my_service)) + .serve(address), + ); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); info!("service {} deregistered", service_name); Ok(()) } diff --git a/database-service/src/users.rs b/database-service/src/users.rs index 5125d4c..4fa12bb 100644 --- a/database-service/src/users.rs +++ b/database-service/src/users.rs @@ -1,9 +1,8 @@ +use serde::{Deserialize, Serialize}; use sqlx::{FromRow, Row}; -use utils::redis_cache::{RedisCache, Cache}; // Import RedisCache and Cache Trait -use serde::{Serialize, Deserialize}; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{debug}; +use utils::redis_cache::{Cache, RedisCache}; // Import RedisCache and Cache Trait #[derive(Debug, FromRow, Serialize, Deserialize)] pub struct User { @@ -29,7 +28,14 @@ impl UserRepository { pub async fn get_user_by_id(&self, user_id: i32) -> Result { let cache_key = format!("user:{}", user_id); - if let Some(user) = self.cache.lock().await.get::(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? { + if let Some(user) = self + .cache + .lock() + .await + .get::(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)? + { return Ok(user); } @@ -40,14 +46,26 @@ impl UserRepository { .fetch_one(&self.pool) .await?; - self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .set(&cache_key, &user, 300) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(user) } pub async fn get_user_by_username(&self, username: &str) -> Result { let cache_key = format!("user:username:{}", username); - if let Some(user) = self.cache.lock().await.get::(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? { + if let Some(user) = self + .cache + .lock() + .await + .get::(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)? + { return Ok(user); } @@ -58,14 +76,26 @@ impl UserRepository { .fetch_one(&self.pool) .await?; - self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .set(&cache_key, &user, 300) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(user) } pub async fn get_user_by_email(&self, email: &str) -> Result { let cache_key = format!("user:email:{}", email); - if let Some(user) = self.cache.lock().await.get::(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? { + if let Some(user) = self + .cache + .lock() + .await + .get::(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)? + { return Ok(user); } @@ -76,11 +106,21 @@ impl UserRepository { .fetch_one(&self.pool) .await?; - self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .set(&cache_key, &user, 300) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(user) } - pub async fn create_user(&self, username: &str, email: &str, hashed_password: &str) -> Result { + pub async fn create_user( + &self, + username: &str, + email: &str, + hashed_password: &str, + ) -> Result { let row = sqlx::query( r#" INSERT INTO users (username, email, hashed_password) @@ -88,26 +128,33 @@ impl UserRepository { RETURNING id "#, ) - .bind(username) - .bind(email) - .bind(hashed_password) - .fetch_one(&self.pool) - .await?; + .bind(username) + .bind(email) + .bind(hashed_password) + .fetch_one(&self.pool) + .await?; Ok(row.get(0)) } - pub async fn update_user_email(&self, user_id: i32, new_email: &str) -> Result<(), sqlx::Error> { - sqlx::query( - "UPDATE users SET email = $1, updated_at = NOW() WHERE id = $2", - ) + pub async fn update_user_email( + &self, + user_id: i32, + new_email: &str, + ) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE users SET email = $1, updated_at = NOW() WHERE id = $2") .bind(new_email) .bind(user_id) .execute(&self.pool) .await?; let cache_key = format!("user:{}", user_id); - self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .delete(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(()) } @@ -118,7 +165,12 @@ impl UserRepository { .await?; let cache_key = format!("user:{}", user_id); - self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?; + self.cache + .lock() + .await + .delete(&cache_key) + .await + .map_err(|_| sqlx::Error::RowNotFound)?; Ok(()) } } diff --git a/database-service/tests/get_user.rs b/database-service/tests/get_user.rs index be93613..fa41325 100644 --- a/database-service/tests/get_user.rs +++ b/database-service/tests/get_user.rs @@ -4,7 +4,7 @@ use tokio; async fn test_get_user() { // // Set up a temporary in-memory PostgreSQL database // let pool = PgPool::connect("postgres://user:password@localhost/test_database").await.unwrap(); - // + // // // Create the test table // pool.execute( // r#" @@ -20,7 +20,7 @@ async fn test_get_user() { // ) // .await // .unwrap(); - // + // // // Test the `get_user` function // let user = get_user(&pool, "123").await.unwrap(); // assert_eq!(user.user_id, "123"); diff --git a/database-service/tests/integration.rs b/database-service/tests/integration.rs index aa554c1..b6b2c22 100644 --- a/database-service/tests/integration.rs +++ b/database-service/tests/integration.rs @@ -9,4 +9,4 @@ mod tests { // let db = Database::new(&database_url).await; // assert!(db.health_check().await); } -} \ No newline at end of file +} diff --git a/database-service/tests/redis_cache.rs b/database-service/tests/redis_cache.rs index 077490c..074f4b0 100644 --- a/database-service/tests/redis_cache.rs +++ b/database-service/tests/redis_cache.rs @@ -3,13 +3,13 @@ async fn test_redis_cache() { // dotenv().ok(); // let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); // let cache = RedisCache::new(&redis_url); - // + // // let key = &"test_key".to_string(); // let value = "test_value"; - // + // // // Test setting a value // cache.set(key, &value, 10).await.unwrap(); - // + // // // Test getting the value // let cached_value: Option = cache.get(key).await.unwrap(); // assert_eq!(cached_value, Some("test_value".to_string())); diff --git a/launcher/src/launcher.rs b/launcher/src/launcher.rs index 470f7ea..80b7239 100644 --- a/launcher/src/launcher.rs +++ b/launcher/src/launcher.rs @@ -1,7 +1,6 @@ -use crate::{format_shell_command, wait_for_keypress}; +use crate::format_shell_command; use std::borrow::Cow; -use std::env; -use std::process::{exit, Command, Stdio}; +use std::process::{Command, Stdio}; use tracing::{debug, error, info, warn}; use url::Url; @@ -15,7 +14,7 @@ fn create_command() -> Command { wait_for_keypress(); exit(1); } - + let mut command = Command::new("./TRose.exe"); command } @@ -23,7 +22,13 @@ fn create_command() -> Command { #[cfg(target_os = "linux")] fn create_command() -> Command { let mut command = Command::new("bottles-cli"); - command.arg("run").arg("-p").arg("TRose").arg("-b").arg("OsIRose").arg("--args"); + command + .arg("run") + .arg("-p") + .arg("TRose") + .arg("-b") + .arg("OsIRose") + .arg("--args"); command } diff --git a/launcher/src/main.rs b/launcher/src/main.rs index 4c4a9ab..a8758cc 100644 --- a/launcher/src/main.rs +++ b/launcher/src/main.rs @@ -1,6 +1,6 @@ mod launcher; -use std::process::{Command}; +use std::process::Command; use std::{env, io}; use tracing::{debug, error, info, Level}; use url::Url; @@ -25,7 +25,7 @@ fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); if args.len() < 2 { error!("Usage: launcher "); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "url"))? + return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "url"))?; } let uri = &args[1]; @@ -44,7 +44,7 @@ fn main() -> Result<(), Box> { "update" => Commands::Update, other => return Err(format!("Unknown method: {}", other).into()), }; - + match action { Commands::Launch => { debug!("launching with URL {}", uri); diff --git a/packet-service/Cargo.toml b/packet-service/Cargo.toml index 421c545..2832317 100644 --- a/packet-service/Cargo.toml +++ b/packet-service/Cargo.toml @@ -14,7 +14,7 @@ serde = { version = "1.0", features = ["derive"] } bytes = { version = "1.8.0", features = ["std", "serde"] } tracing = "0.1" tracing-subscriber = "0.3.18" -bincode = { version = "2.0.0-rc.3", features = ["derive", "serde"] } +bincode = { version = "2.0.0", features = ["derive", "serde"] } thiserror = "2.0.3" lazy_static = "1.5.0" prometheus = "0.13.4" diff --git a/packet-service/build.rs b/packet-service/build.rs index 96ba673..feec78b 100644 --- a/packet-service/build.rs +++ b/packet-service/build.rs @@ -4,6 +4,13 @@ fn main() { .build_server(false) // Generate gRPC server code? .compile_well_known_types(true) .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile_protos(&["../proto/auth.proto", "../proto/character.proto", "../proto/character_common.proto"], &["../proto"]) + .compile_protos( + &[ + "../proto/auth.proto", + "../proto/character.proto", + "../proto/character_common.proto", + ], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/packet-service/src/auth_client.rs b/packet-service/src/auth_client.rs index ba54516..083ea8b 100644 --- a/packet-service/src/auth_client.rs +++ b/packet-service/src/auth_client.rs @@ -1,6 +1,9 @@ use crate::auth::auth_service_client::AuthServiceClient; -use crate::auth::{LoginRequest, LoginResponse, LogoutRequest, ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse}; -use crate::common::{Empty}; +use crate::auth::{ + LoginRequest, LoginResponse, LogoutRequest, ValidateSessionRequest, ValidateSessionResponse, + ValidateTokenRequest, ValidateTokenResponse, +}; +use crate::common::Empty; use tonic::transport::Channel; #[derive(Clone, Debug)] @@ -14,7 +17,12 @@ impl AuthClient { Ok(AuthClient { client }) } - pub async fn login(&mut self, username: &str, password: &str, ip_address: &str) -> Result> { + pub async fn login( + &mut self, + username: &str, + password: &str, + ip_address: &str, + ) -> Result> { let request = LoginRequest { username: username.to_string(), password: password.to_string(), @@ -25,34 +33,46 @@ impl AuthClient { Ok(response.into_inner()) } - pub async fn login_token(&mut self, token: &str) -> Result> { + pub async fn login_token( + &mut self, + token: &str, + ) -> Result> { let request = ValidateTokenRequest { - token: token.to_string() + token: token.to_string(), }; let response = self.client.validate_token(request).await?; Ok(response.into_inner()) } - pub async fn validate_session(&mut self, session_id: &str) -> Result> { + pub async fn validate_session( + &mut self, + session_id: &str, + ) -> Result> { let request = ValidateSessionRequest { - session_id: session_id.to_string() + session_id: session_id.to_string(), }; let response = self.client.validate_session(request).await?; Ok(response.into_inner()) } - pub async fn refresh_session(&mut self, session_id: &str) -> Result> { + pub async fn refresh_session( + &mut self, + session_id: &str, + ) -> Result> { let request = ValidateSessionRequest { - session_id: session_id.to_string() + session_id: session_id.to_string(), }; let response = self.client.refresh_session(request).await?; Ok(response.into_inner()) } - pub async fn logout(&mut self, session_id: &str) -> Result> { + pub async fn logout( + &mut self, + session_id: &str, + ) -> Result> { let request = LogoutRequest { session_id: session_id.to_string(), }; diff --git a/packet-service/src/bufferpool.rs b/packet-service/src/bufferpool.rs index 7d6d35c..ea4b1e0 100644 --- a/packet-service/src/bufferpool.rs +++ b/packet-service/src/bufferpool.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; use std::sync::Arc; -use tokio::sync::{Semaphore, Mutex}; +use tokio::sync::{Mutex, Semaphore}; const MAX_PACKET_SIZE: usize = 0xFFF; diff --git a/packet-service/src/character_client.rs b/packet-service/src/character_client.rs index 4311110..e76727c 100644 --- a/packet-service/src/character_client.rs +++ b/packet-service/src/character_client.rs @@ -1,5 +1,9 @@ use crate::character::character_service_client::CharacterServiceClient; -use crate::character::{CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, DeleteCharacterResponse, GetCharacterListRequest, GetCharacterListResponse, GetCharacterRequest, GetCharacterResponse}; +use crate::character::{ + CreateCharacterRequest, CreateCharacterResponse, DeleteCharacterRequest, + DeleteCharacterResponse, GetCharacterListRequest, GetCharacterListResponse, + GetCharacterRequest, GetCharacterResponse, +}; use tonic::transport::Channel; use utils::null_string::NullTerminatedString; @@ -14,7 +18,10 @@ impl CharacterClient { Ok(CharacterClient { client }) } - pub async fn get_character_list(&mut self, user_id: &str) -> Result> { + pub async fn get_character_list( + &mut self, + user_id: &str, + ) -> Result> { let request = GetCharacterListRequest { user_id: user_id.to_string(), }; @@ -23,7 +30,15 @@ impl CharacterClient { Ok(response.into_inner()) } - pub async fn create_character(&mut self, user_id: &str, name: NullTerminatedString, race: u8, face: u8, hair: u8, stone: u8) -> Result> { + pub async fn create_character( + &mut self, + user_id: &str, + name: NullTerminatedString, + race: u8, + face: u8, + hair: u8, + stone: u8, + ) -> Result> { let request = CreateCharacterRequest { user_id: user_id.to_string(), name: name.0, @@ -37,18 +52,27 @@ impl CharacterClient { Ok(response.into_inner()) } - pub async fn delete_character(&mut self, user_id: &str, char_id: &str, delete_type: i32) -> Result> { + pub async fn delete_character( + &mut self, + user_id: &str, + char_id: &str, + delete_type: i32, + ) -> Result> { let request = DeleteCharacterRequest { user_id: user_id.to_string(), char_id: char_id.to_string(), - delete_type + delete_type, }; let response = self.client.delete_character(request).await?; Ok(response.into_inner()) } - pub async fn get_character(&mut self, user_id: &str, char_id: u8) -> Result> { + pub async fn get_character( + &mut self, + user_id: &str, + char_id: u8, + ) -> Result> { let request = GetCharacterRequest { user_id: user_id.to_string(), char_id: char_id.to_string(), diff --git a/packet-service/src/connection_service.rs b/packet-service/src/connection_service.rs index b4a9eab..2b29f19 100644 --- a/packet-service/src/connection_service.rs +++ b/packet-service/src/connection_service.rs @@ -1,7 +1,7 @@ +use crate::connection_state::ConnectionState; use dashmap::DashMap; use std::sync::Arc; use uuid::Uuid; -use crate::connection_state::ConnectionState; #[derive(Clone, Debug)] pub struct ConnectionService { @@ -17,15 +17,21 @@ impl ConnectionService { pub fn add_connection(&self) -> String { let connection_id = Uuid::new_v4().to_string(); - self.connections.insert(connection_id.clone(), ConnectionState::new()); + self.connections + .insert(connection_id.clone(), ConnectionState::new()); connection_id } pub fn get_connection(&self, connection_id: &str) -> Option { - self.connections.get(connection_id).map(|entry| entry.clone()) + self.connections + .get(connection_id) + .map(|entry| entry.clone()) } - pub fn get_connection_mut(&self, connection_id: &str) -> Option> { + pub fn get_connection_mut( + &self, + connection_id: &str, + ) -> Option> { self.connections.get_mut(connection_id) } diff --git a/packet-service/src/dataconsts.rs b/packet-service/src/dataconsts.rs index 2f76731..a48068f 100644 --- a/packet-service/src/dataconsts.rs +++ b/packet-service/src/dataconsts.rs @@ -1,4 +1,4 @@ -use crate::enums::{EquippedPosition, BulletType, RidingItem}; +use crate::enums::{BulletType, EquippedPosition, RidingItem}; pub(crate) const MIN_SELL_TYPE: u32 = 1; pub(crate) const MAX_SELL_TYPE: u32 = 11; diff --git a/packet-service/src/enums.rs b/packet-service/src/enums.rs index 1de6644..f85f4ad 100644 --- a/packet-service/src/enums.rs +++ b/packet-service/src/enums.rs @@ -74,7 +74,6 @@ impl EquippedPosition { } } - #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum MoveMode { diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs index d2da4bd..237ab48 100644 --- a/packet-service/src/handlers/auth.rs +++ b/packet-service/src/handlers/auth.rs @@ -26,7 +26,13 @@ use tracing::{debug, error, info, warn}; use utils::null_string::NullTerminatedString; use utils::service_discovery; -pub(crate) async fn handle_alive_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_alive_req( + stream: &mut TcpStream, + packet: Packet, + auth_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { if let Some(mut state) = connection_service.get_connection(&connection_id) { let session_id = state.session_id.clone().unwrap(); debug!("Attempting to refresh session {}", session_id); @@ -42,15 +48,27 @@ pub(crate) async fn handle_alive_req(stream: &mut TcpStream, packet: Packet, aut } } -pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { - let data = SrvAcceptReply { result: srv_accept_reply::Result::Accepted, rand_value: 0 }; +pub(crate) async fn handle_accept_req( + stream: &mut TcpStream, + packet: Packet, +) -> Result<(), Box> { + let data = SrvAcceptReply { + result: srv_accept_reply::Result::Accepted, + rand_value: 0, + }; let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?; send_packet(stream, &response_packet).await?; Ok(()) } -pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_join_server_req( + stream: &mut TcpStream, + packet: Packet, + auth_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { let request = CliJoinServerTokenReq::decode(packet.payload.as_slice()); debug!("{:?}", request); @@ -61,13 +79,21 @@ pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packe if (!session.valid) { warn!("Invalid session ID: {}", session_id); - let data = SrvJoinServerReply { result: srv_join_server_reply::Result::Failed, id: 0, pay_flag: 0 }; + let data = SrvJoinServerReply { + result: srv_join_server_reply::Result::Failed, + id: 0, + pay_flag: 0, + }; let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?; send_packet(stream, &response_packet).await?; return Err("Session not valid".into()); } - let data = SrvJoinServerReply { result: srv_join_server_reply::Result::Ok, id: 1, pay_flag: 0 }; + let data = SrvJoinServerReply { + result: srv_join_server_reply::Result::Ok, + id: 1, + pay_flag: 0, + }; let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?; send_packet(stream, &response_packet).await?; Ok(()) @@ -76,7 +102,13 @@ pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packe } } -pub(crate) async fn handle_logout_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_logout_req( + stream: &mut TcpStream, + packet: Packet, + auth_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { if let Some(mut state) = connection_service.get_connection(&connection_id) { let session_id = state.session_id.clone().unwrap(); let mut auth_client = auth_client.lock().await; @@ -92,8 +124,18 @@ pub(crate) async fn handle_logout_req(stream: &mut TcpStream, packet: Packet, au } } -pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc>, connection_service: Arc, connection_id: String, addr: SocketAddr) -> Result<(), Box> { - debug!("decoding packet payload of size {}", packet.payload.as_slice().len()); +pub(crate) async fn handle_login_req( + stream: &mut TcpStream, + packet: Packet, + auth_client: Arc>, + connection_service: Arc, + connection_id: String, + addr: SocketAddr, +) -> Result<(), Box> { + debug!( + "decoding packet payload of size {}", + packet.payload.as_slice().len() + ); let data = CliLoginTokenReq::decode(packet.payload.as_slice())?; debug!("{:?}", data); @@ -103,7 +145,12 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut if response.valid == false { info!("Login failed: Invalid credentials"); - let data = SrvLoginReply { result: srv_login_reply::Result::UnknownAccount, right: 0, type_: 0, servers_info: Vec::new() }; + let data = SrvLoginReply { + result: srv_login_reply::Result::UnknownAccount, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } else { @@ -114,14 +161,23 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut state.session_id = Some(response.session_id); } - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let servers = service_discovery::get_service_address(&consul_url, "character-service").await.unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); + let consul_url = + env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let servers = + service_discovery::get_service_address(&consul_url, "character-service") + .await + .unwrap_or_else(|err| { + warn!(err); + Vec::new() + }); if servers.len() == 0 { - let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let data = SrvLoginReply { + result: srv_login_reply::Result::Failed, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; return Ok(()); @@ -130,19 +186,33 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut let mut server_info: Vec = Vec::new(); let mut id = 0; for server in servers { - let mut name = server.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone(); - let is_test = server.ServiceTags.contains(&"test".to_string()) || server.ServiceTags.contains(&"staging".to_string()); + let mut name = server + .ServiceMeta + .get("name") + .unwrap_or(&"".to_string()) + .clone(); + let is_test = server.ServiceTags.contains(&"test".to_string()) + || server.ServiceTags.contains(&"staging".to_string()); if is_test { name = format!("@{}", name); } else { name = format!(" {}", name); } - server_info.push(ServerInfo { test: u8::from(is_test), name: NullTerminatedString::new(&name), id }); + server_info.push(ServerInfo { + test: u8::from(is_test), + name: NullTerminatedString::new(&name), + id, + }); id = id + 1; } debug!("Server info: {:?}", server_info); - let data = SrvLoginReply { result: srv_login_reply::Result::Ok, right: 0, type_: 0, servers_info: server_info }; + let data = SrvLoginReply { + result: srv_login_reply::Result::Ok, + right: 0, + type_: 0, + servers_info: server_info, + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } @@ -153,19 +223,34 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut Code::Unauthenticated => { info!("Login failed: Invalid credentials"); - let data = SrvLoginReply { result: srv_login_reply::Result::UnknownAccount, right: 0, type_: 0, servers_info: Vec::new() }; + let data = SrvLoginReply { + result: srv_login_reply::Result::UnknownAccount, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } Code::Unavailable => { warn!("Login failed: Service is unavailable"); - let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let data = SrvLoginReply { + result: srv_login_reply::Result::Failed, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } _ => { error!("Unexpected error: {}", tonic_status.message()); - let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let data = SrvLoginReply { + result: srv_login_reply::Result::Failed, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; send_packet(stream, &response_packet).await?; } @@ -177,21 +262,30 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut Ok(()) } -pub(crate) async fn handle_server_select_req(stream: &mut TcpStream, packet: Packet, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_server_select_req( + stream: &mut TcpStream, + packet: Packet, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { let request = CliSrvSelectReq::decode(packet.payload.as_slice())?; debug!("{:?}", request); if let Some(mut state) = connection_service.get_connection_mut(&connection_id) { - state.additional_data.insert("server".to_string(), request.server_id.to_string()); - state.additional_data.insert("channel".to_string(), request.channel_id.to_string()); + state + .additional_data + .insert("server".to_string(), request.server_id.to_string()); + state + .additional_data + .insert("channel".to_string(), request.channel_id.to_string()); } let data = SrvSrvSelectReply { result: srv_srv_select_reply::Result::Ok, - session_id: 0, // Client should already have this value - crypt_val: 0, // This is only for the old encryption + session_id: 0, // Client should already have this value + crypt_val: 0, // This is only for the old encryption ip: NullTerminatedString::new(""), // If this is empty, the client should stay connected (requires client change) - port: 0, // See comment about ip above + port: 0, // See comment about ip above }; let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?; @@ -199,18 +293,26 @@ pub(crate) async fn handle_server_select_req(stream: &mut TcpStream, packet: Pac Ok(()) } -pub(crate) async fn handle_channel_list_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { +pub(crate) async fn handle_channel_list_req( + stream: &mut TcpStream, + packet: Packet, +) -> Result<(), Box> { let request = CliChannelListReq::decode(packet.payload.as_slice()); debug!("{:?}", request); let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let channels = service_discovery::get_service_address(&consul_url, "world-service").await.unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); + let channels = service_discovery::get_service_address(&consul_url, "world-service") + .await + .unwrap_or_else(|err| { + warn!(err); + Vec::new() + }); if channels.len() == 0 { - let data = SrvChannelListReply { id: request?.server_id, channels: Vec::new() }; + let data = SrvChannelListReply { + id: request?.server_id, + channels: Vec::new(), + }; let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; send_packet(stream, &response_packet).await?; return Ok(()); @@ -220,14 +322,30 @@ pub(crate) async fn handle_channel_list_req(stream: &mut TcpStream, packet: Pack let mut channel_info: Vec = Vec::new(); let mut id = 1; for channel in channels { - let name = format!("{}", channel.ServiceMeta.get("name").unwrap_or(&"".to_string()).clone()); - channel_info.push(ChannelInfo { id: id, low_age: 0, high_age: 0, capacity: 0, name: NullTerminatedString::new(&name) }); + let name = format!( + "{}", + channel + .ServiceMeta + .get("name") + .unwrap_or(&"".to_string()) + .clone() + ); + channel_info.push(ChannelInfo { + id: id, + low_age: 0, + high_age: 0, + capacity: 0, + name: NullTerminatedString::new(&name), + }); id = id + 1; } debug!("Channel info: {:?}", channel_info); - let data = SrvChannelListReply { id: request?.server_id, channels: channel_info }; + let data = SrvChannelListReply { + id: request?.server_id, + channels: channel_info, + }; let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; send_packet(stream, &response_packet).await?; Ok(()) -} \ No newline at end of file +} diff --git a/packet-service/src/handlers/character.rs b/packet-service/src/handlers/character.rs index 2d7aca8..b02b7dc 100644 --- a/packet-service/src/handlers/character.rs +++ b/packet-service/src/handlers/character.rs @@ -30,9 +30,15 @@ pub(crate) fn convert_slot(slot: i32) -> srv_char_list_reply::EquippedPosition { } } -pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet, character_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { - use crate::packets::srv_char_list_reply::*; +pub(crate) async fn handle_char_list_req( + stream: &mut TcpStream, + packet: Packet, + character_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_char_list_req::*; + use crate::packets::srv_char_list_reply::*; let request = CliCharListReq::decode(packet.payload.as_slice()); debug!("{:?}", request); @@ -40,16 +46,21 @@ pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet, let session_id; if let Some(mut state) = connection_service.get_connection(&connection_id) { user_id = state.user_id.expect("Missing user id in connection state"); - session_id = state.session_id.expect("Missing session id in connection state"); + session_id = state + .session_id + .expect("Missing session id in connection state"); } // query the character service for the character list for this user let mut character_client = character_client.lock().await; - let character_list = character_client.get_character_list(&user_id.to_string()).await?; + let character_list = character_client + .get_character_list(&user_id.to_string()) + .await?; let mut characters = vec![]; let mut character_id_list: Vec = Vec::new(); for character in character_list.characters { - let mut item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = core::array::from_fn(|i| EquippedItem::default()); + let mut item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = + core::array::from_fn(|i| EquippedItem::default()); for item in character.items { if item.slot < MAX_VISIBLE_ITEMS as i32 { @@ -89,7 +100,13 @@ pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet, Ok(()) } -pub(crate) async fn handle_create_char_req(stream: &mut TcpStream, packet: Packet, character_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_create_char_req( + stream: &mut TcpStream, + packet: Packet, + character_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_create_char_req::*; use crate::packets::srv_create_char_reply::*; let request = CliCreateCharReq::decode(packet.payload.as_slice())?; @@ -99,14 +116,24 @@ pub(crate) async fn handle_create_char_req(stream: &mut TcpStream, packet: Packe let session_id; if let Some(mut state) = connection_service.get_connection(&connection_id) { user_id = state.user_id.expect("Missing user id in connection state"); - session_id = state.session_id.expect("Missing session id in connection state"); + session_id = state + .session_id + .expect("Missing session id in connection state"); } // send the data to the character service to create the character let mut character_client = character_client.lock().await; - let create_character_response = character_client.create_character(&user_id.to_string(), request.name, request.race, request.face, request.hair, request.stone).await?; - let result = match create_character_response.result - { + let create_character_response = character_client + .create_character( + &user_id.to_string(), + request.name, + request.race, + request.face, + request.hair, + request.stone, + ) + .await?; + let result = match create_character_response.result { 0 => srv_create_char_reply::Result::Ok, 1 => srv_create_char_reply::Result::Failed, 2 => srv_create_char_reply::Result::NameTaken, @@ -116,14 +143,23 @@ pub(crate) async fn handle_create_char_req(stream: &mut TcpStream, packet: Packe _ => srv_create_char_reply::Result::Failed, }; - let data = SrvCreateCharReply { result, platininum: 0 }; + let data = SrvCreateCharReply { + result, + platininum: 0, + }; let response_packet = Packet::new(PacketType::PakccCreateCharReply, &data)?; send_packet(stream, &response_packet).await?; Ok(()) } -pub(crate) async fn handle_delete_char_req(stream: &mut TcpStream, packet: Packet, character_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_delete_char_req( + stream: &mut TcpStream, + packet: Packet, + character_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_delete_char_req::*; use crate::packets::srv_delete_char_reply::*; let request = CliDeleteCharReq::decode(packet.payload.as_slice())?; @@ -135,12 +171,20 @@ pub(crate) async fn handle_delete_char_req(stream: &mut TcpStream, packet: Packe if let Some(mut state) = connection_service.get_connection(&connection_id) { user_id = state.user_id.expect("Missing user id in connection state"); - session_id = state.session_id.expect("Missing session id in connection state"); + session_id = state + .session_id + .expect("Missing session id in connection state"); character_id_list = state.character_list.expect("Missing character id list"); } let mut character_client = character_client.lock().await; - let delete_response = character_client.delete_character(&user_id.to_string(), &character_id_list[request.char_id as usize].to_string(), request.is_delete as i32).await?; + let delete_response = character_client + .delete_character( + &user_id.to_string(), + &character_id_list[request.char_id as usize].to_string(), + request.is_delete as i32, + ) + .await?; let character_name = request.name; let data = SrvDeleteCharReply { @@ -153,13 +197,19 @@ pub(crate) async fn handle_delete_char_req(stream: &mut TcpStream, packet: Packe Ok(()) } -pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packet, character_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_select_char_req( + stream: &mut TcpStream, + packet: Packet, + character_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_select_char_req::*; - use crate::packets::srv_switch_server::*; - use crate::packets::srv_select_char_reply::*; + use crate::packets::srv_billing_message::*; use crate::packets::srv_inventory_data::*; use crate::packets::srv_quest_data::*; - use crate::packets::srv_billing_message::*; + use crate::packets::srv_select_char_reply::*; + use crate::packets::srv_switch_server::*; use crate::types::{HotbarItem, StatusEffect}; let request = CliSelectCharReq::decode(packet.payload.as_slice())?; debug!("{:?}", request); @@ -168,7 +218,10 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe let mut character_id_list: Vec = Vec::new(); if let Some(mut state) = connection_service.get_connection_mut(&connection_id) { user_id = state.user_id.expect("Missing user id in connection state"); - character_id_list = state.character_list.clone().expect("Missing character id list"); + character_id_list = state + .character_list + .clone() + .expect("Missing character id list"); state.character_id = Some(request.char_id as i8); } @@ -182,7 +235,12 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe send_packet(stream, &response_packet).await?; let mut character_client = character_client.lock().await; - let character_data = character_client.get_character(&user_id.to_string(), character_id_list[request.char_id as usize] as u8).await?; + let character_data = character_client + .get_character( + &user_id.to_string(), + character_id_list[request.char_id as usize] as u8, + ) + .await?; let character = character_data.character.unwrap_or_default(); @@ -191,15 +249,17 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe let stats = character.stats.unwrap(); let skills = character.skills; let items = character.items; - let mut equipped_item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = core::array::from_fn(|i| EquippedItem::default()); - let mut inventory: [srv_inventory_data::Item; (MAX_ITEMS as usize)] = core::array::from_fn(|i| srv_inventory_data::Item::default()); + let mut equipped_item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = + core::array::from_fn(|i| EquippedItem::default()); + let mut inventory: [srv_inventory_data::Item; (MAX_ITEMS as usize)] = + core::array::from_fn(|i| srv_inventory_data::Item::default()); let mut skill_list: [u16; (MAX_SKILL_COUNT as usize)] = core::array::from_fn(|i| { if i < skills.len() { return skills[i] as u16; } - 0 + 0 }); - + for item in items { if item.slot < MAX_VISIBLE_ITEMS as i32 { let slot = convert_slot(item.slot) as usize; @@ -229,8 +289,10 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe }; } - let mut effect_list: [StatusEffect; (MAX_STATUS_EFFECTS as usize)] = core::array::from_fn(|i| StatusEffect::default()); - let mut hotbar_list: [HotbarItem; (MAX_HOTBAR_ITEMS as usize)] = core::array::from_fn(|i| HotbarItem::default()); + let mut effect_list: [StatusEffect; (MAX_STATUS_EFFECTS as usize)] = + core::array::from_fn(|i| StatusEffect::default()); + let mut hotbar_list: [HotbarItem; (MAX_HOTBAR_ITEMS as usize)] = + core::array::from_fn(|i| HotbarItem::default()); let data = SrvSelectCharReply { race: looks.race as u8, map: position.map_id as u16, @@ -289,8 +351,10 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe send_packet(stream, &response_packet).await?; // Now we need to build the Quest data - let mut quests: [srv_quest_data::Quest; (MAX_QUESTS as usize)] = core::array::from_fn(|i| srv_quest_data::Quest::default()); - let mut wishlist: [srv_quest_data::Item; (MAX_WISHLIST as usize)] = core::array::from_fn(|i| srv_quest_data::Item::default()); + let mut quests: [srv_quest_data::Quest; (MAX_QUESTS as usize)] = + core::array::from_fn(|i| srv_quest_data::Quest::default()); + let mut wishlist: [srv_quest_data::Item; (MAX_WISHLIST as usize)] = + core::array::from_fn(|i| srv_quest_data::Item::default()); let data = SrvQuestData { episodes: [0; (MAX_CONDITIONS_EPISODE as usize)], diff --git a/packet-service/src/handlers/world.rs b/packet-service/src/handlers/world.rs index 0227678..d216b53 100644 --- a/packet-service/src/handlers/world.rs +++ b/packet-service/src/handlers/world.rs @@ -14,7 +14,13 @@ fn distance(x1: f64, y1: f64, x2: f64, y2: f64) -> u16 { dist.round() as u16 } -pub(crate) async fn handle_change_map_req(stream: &mut TcpStream, packet: Packet, character_client: Arc>, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_change_map_req( + stream: &mut TcpStream, + packet: Packet, + character_client: Arc>, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_change_map_req::*; use crate::packets::srv_change_map_reply::*; let request = CliChangeMapReq::decode(packet.payload.as_slice())?; @@ -26,13 +32,25 @@ pub(crate) async fn handle_change_map_req(stream: &mut TcpStream, packet: Packet let session_id; if let Some(mut state) = connection_service.get_connection(&connection_id) { user_id = state.user_id.expect("Missing user id in connection state"); - session_id = state.session_id.expect("Missing session id in connection state"); - char_id = state.character_id.expect("Missing character id in connection state"); - character_id_list = state.character_list.clone().expect("Missing character id list"); + session_id = state + .session_id + .expect("Missing session id in connection state"); + char_id = state + .character_id + .expect("Missing character id in connection state"); + character_id_list = state + .character_list + .clone() + .expect("Missing character id list"); } let mut character_client = character_client.lock().await; - let character_data = character_client.get_character(&user_id.to_string(), character_id_list[char_id as usize] as u8).await?; + let character_data = character_client + .get_character( + &user_id.to_string(), + character_id_list[char_id as usize] as u8, + ) + .await?; let character = character_data.character.unwrap_or_default(); let stats = character.stats.unwrap(); @@ -61,7 +79,12 @@ pub(crate) async fn handle_change_map_req(stream: &mut TcpStream, packet: Packet Ok(()) } -pub(crate) async fn handle_mouse_cmd_req(stream: &mut TcpStream, packet: Packet, connection_service: Arc, connection_id: String) -> Result<(), Box> { +pub(crate) async fn handle_mouse_cmd_req( + stream: &mut TcpStream, + packet: Packet, + connection_service: Arc, + connection_id: String, +) -> Result<(), Box> { use crate::packets::cli_mouse_cmd::*; use crate::packets::srv_mouse_cmd::*; let request = CliMouseCmd::decode(packet.payload.as_slice())?; @@ -70,14 +93,24 @@ pub(crate) async fn handle_mouse_cmd_req(stream: &mut TcpStream, packet: Packet, let mut char_id = 0; let mut character_id_list: Vec = Vec::new(); if let Some(mut state) = connection_service.get_connection(&connection_id) { - char_id = state.character_id.expect("Missing character id in connection state"); - character_id_list = state.character_list.clone().expect("Missing character id list"); + char_id = state + .character_id + .expect("Missing character id in connection state"); + character_id_list = state + .character_list + .clone() + .expect("Missing character id list"); } let data = SrvMouseCmd { id: character_id_list[char_id as usize] as u16, target_id: request.target_id, - distance: distance(520000 as f64, 520000 as f64, request.x as f64, request.y as f64), + distance: distance( + 520000 as f64, + 520000 as f64, + request.x as f64, + request.y as f64, + ), x: request.x, y: request.y, z: request.z, @@ -85,4 +118,4 @@ pub(crate) async fn handle_mouse_cmd_req(stream: &mut TcpStream, packet: Packet, let response_packet = Packet::new(PacketType::PakwcMouseCmd, &data)?; send_packet(stream, &response_packet).await?; Ok(()) -} \ No newline at end of file +} diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index f45c16d..355cc21 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -1,7 +1,10 @@ use crate::auth_client::AuthClient; use crate::bufferpool::BufferPool; +use crate::character_client::CharacterClient; +use crate::connection_service::ConnectionService; use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED}; use crate::packet::Packet; +use crate::router::PacketRouter; use dotenv::dotenv; use std::collections::HashMap; use std::env; @@ -18,24 +21,21 @@ use tracing::{debug, error, info, warn}; use utils::consul_registration; use utils::service_discovery::get_service_address; use warp::Filter; -use crate::character_client::CharacterClient; -use crate::connection_service::ConnectionService; -use crate::router::PacketRouter; -mod packet_type; -mod packet; -mod router; -mod packets; -mod enums; -mod dataconsts; -mod types; -mod handlers; -mod bufferpool; -mod metrics; mod auth_client; +mod bufferpool; mod character_client; -mod connection_state; mod connection_service; +mod connection_state; +mod dataconsts; +mod enums; +mod handlers; +mod metrics; +mod packet; +mod packet_type; +mod packets; +mod router; +mod types; pub mod common { tonic::include_proto!("common"); @@ -57,7 +57,10 @@ const MAX_CONCURRENT_CONNECTIONS: usize = 100; async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); // Set the gRPC server address @@ -67,7 +70,8 @@ async fn main() -> Result<(), Box> { let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string()); - let service_address = env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let service_address = + env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let service_port = port.clone(); let health_check_url = format!("http://{}:{}/health", service_address, health_port); let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); @@ -89,17 +93,23 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; let auth_address = auth_node.get(0).unwrap(); - let auth_url = format!("http://{}:{}", auth_address.ServiceAddress, auth_address.ServicePort); + let auth_url = format!( + "http://{}:{}", + auth_address.ServiceAddress, auth_address.ServicePort + ); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); let character_address = character_node.get(0).unwrap(); - let character_url = format!("http://{}:{}", character_address.ServiceAddress, character_address.ServicePort); + let character_url = format!( + "http://{}:{}", + character_address.ServiceAddress, character_address.ServicePort + ); let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?)); let full_addr = format!("{}:{}", &addr, port); @@ -130,17 +140,24 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { let _permit = permit; let connection_id = packet_router.connection_service.add_connection(); - if let Err(e) = packet_router.handle_connection(&mut socket, pool, connection_id.clone()).await { + if let Err(e) = packet_router + .handle_connection(&mut socket, pool, connection_id.clone()) + .await + { error!("Error handling connection: {}", e); } - packet_router.connection_service.remove_connection(&connection_id); + packet_router + .connection_service + .remove_connection(&connection_id); }); } }); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); info!("service {} deregistered", service_name); Ok(()) } diff --git a/packet-service/src/metrics.rs b/packet-service/src/metrics.rs index 5ea483c..b920656 100644 --- a/packet-service/src/metrics.rs +++ b/packet-service/src/metrics.rs @@ -1,5 +1,8 @@ -use prometheus::{Encoder, TextEncoder, Counter, Gauge, Histogram, register_counter, register_gauge, register_histogram}; use lazy_static::lazy_static; +use prometheus::{ + register_counter, register_gauge, register_histogram, Counter, Encoder, Gauge, Histogram, + TextEncoder, +}; lazy_static! { // Counter to track the number of packets received diff --git a/packet-service/src/packet.rs b/packet-service/src/packet.rs index 02070e2..a6704e3 100644 --- a/packet-service/src/packet.rs +++ b/packet-service/src/packet.rs @@ -13,15 +13,15 @@ pub struct Packet { pub payload: Vec, } -pub trait PacketPayload: Encode + Decode { - fn encode(&self) -> Result, Box> { +pub trait PacketPayload { + fn encode(&self) -> Result, Box> where Self: Encode { let config = bincode::config::standard().with_fixed_int_encoding(); Ok(bincode::encode_to_vec(self, config)?) } fn decode(data: &[u8]) -> Result> where - Self: Sized, + Self: Sized, Self: Decode<()> { let config = bincode::config::standard().with_fixed_int_encoding(); Ok(bincode::decode_from_slice(data, config)?.0) @@ -29,7 +29,10 @@ pub trait PacketPayload: Encode + Decode { } impl Packet { - pub fn new(packet_type: PacketType, payload: &T) -> Result> { + pub fn new( + packet_type: PacketType, + payload: &T, + ) -> Result> { let encoded_payload = ::encode(payload)?; let packet_size = (6 + encoded_payload.len()) as u16; // let packet_crc = crc::crc16::checksum_x25(&encoded_payload); @@ -97,7 +100,7 @@ impl Packet { raw } - pub fn parse(&self) -> Result> { + pub fn parse>(&self) -> Result> { ::decode(&self.payload) } } diff --git a/packet-service/src/router.rs b/packet-service/src/router.rs index cbedba2..99dfd3f 100644 --- a/packet-service/src/router.rs +++ b/packet-service/src/router.rs @@ -21,7 +21,12 @@ pub struct PacketRouter { } impl PacketRouter { - pub async fn handle_connection(&self, stream: &mut TcpStream, pool: Arc, connection_id: String) -> Result<(), Box> { + pub async fn handle_connection( + &self, + stream: &mut TcpStream, + pool: Arc, + connection_id: String, + ) -> Result<(), Box> { ACTIVE_CONNECTIONS.inc(); while let Some(mut buffer) = pool.acquire().await { // Read data into the buffer @@ -46,7 +51,8 @@ impl PacketRouter { Ok(packet) => { debug!("Parsed Packet: {:?}", packet); // Handle the parsed packet (route it, process it, etc.) - self.route_packet(stream, packet, connection_id.clone()).await?; + self.route_packet(stream, packet, connection_id.clone()) + .await?; } Err(e) => warn!("Failed to parse packet: {}", e), } @@ -67,28 +73,131 @@ impl PacketRouter { Ok(()) } - pub async fn route_packet(&self, stream: &mut TcpStream, packet: Packet, connection_id: String) -> Result<(), Box> { + pub async fn route_packet( + &self, + stream: &mut TcpStream, + packet: Packet, + connection_id: String, + ) -> Result<(), Box> { debug!("Routing packet: {:?}", packet); match packet.packet_type { // Generic Server Packets - PacketType::PakcsAlive => auth::handle_alive_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, + PacketType::PakcsAlive => { + auth::handle_alive_req( + stream, + packet, + self.auth_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await, - PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, + PacketType::PakcsJoinServerTokenReq => { + auth::handle_join_server_req( + stream, + packet, + self.auth_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } // Login Packets - PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await, - PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, - PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await, + PacketType::PakcsLoginTokenReq => { + auth::handle_login_req( + stream, + packet, + self.auth_client.clone(), + self.connection_service.clone(), + connection_id, + stream.peer_addr()?, + ) + .await + } + PacketType::PakcsLogoutReq => { + auth::handle_logout_req( + stream, + packet, + self.auth_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } + PacketType::PakcsSrvSelectReq => { + auth::handle_server_select_req( + stream, + packet, + self.connection_service.clone(), + connection_id, + ) + .await + } PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await, // Character Packets - PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, - PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, - PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, - PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, + PacketType::PakcsCharListReq => { + character::handle_char_list_req( + stream, + packet, + self.character_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } + PacketType::PakcsCreateCharReq => { + character::handle_create_char_req( + stream, + packet, + self.character_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } + PacketType::PakcsDeleteCharReq => { + character::handle_delete_char_req( + stream, + packet, + self.character_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } + PacketType::PakcsSelectCharReq => { + character::handle_select_char_req( + stream, + packet, + self.character_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } // World Packets - PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, - PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await, + PacketType::PakcsChangeMapReq => { + world::handle_change_map_req( + stream, + packet, + self.character_client.clone(), + self.connection_service.clone(), + connection_id, + ) + .await + } + PacketType::PakcsMouseCmd => { + world::handle_mouse_cmd_req( + stream, + packet, + self.connection_service.clone(), + connection_id, + ) + .await + } // 1 => chat::handle_chat(packet).await?, // 2 => movement::handle_movement(packet).await?, diff --git a/packet-service/src/types.rs b/packet-service/src/types.rs index fa23e82..7fe0a88 100644 --- a/packet-service/src/types.rs +++ b/packet-service/src/types.rs @@ -1,5 +1,5 @@ -use std::time::{Duration}; -use bincode::{Encode, Decode}; +use bincode::{Decode, Encode}; +use std::time::Duration; // `HotbarItem` structure converted to Rust. #[derive(Debug, Clone, Copy, Encode, Decode, Default)] @@ -10,7 +10,7 @@ pub(crate) struct HotbarItem { // `Skill` structure converted to Rust. #[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode, Default)] -pub(crate) struct Skill { +pub(crate) struct Skill { id: u16, level: u8, } diff --git a/session-service/build.rs b/session-service/build.rs index ca2df77..499ce87 100644 --- a/session-service/build.rs +++ b/session-service/build.rs @@ -4,13 +4,22 @@ fn main() { .build_server(true) // Generate gRPC server code .compile_well_known_types(true) .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile_protos(&["../proto/common.proto", "../proto/session_service_api.proto"], &["../proto"]) + .compile_protos( + &[ + "../proto/common.proto", + "../proto/session_service_api.proto", + ], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); - + // gRPC Client code tonic_build::configure() .build_server(false) .compile_well_known_types(true) - .compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto"], &["../proto"]) + .compile_protos( + &["../proto/user_db_api.proto", "../proto/auth.proto"], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/session-service/src/main.rs b/session-service/src/main.rs index 7bc06be..d8148ab 100644 --- a/session-service/src/main.rs +++ b/session-service/src/main.rs @@ -1,19 +1,17 @@ mod session_service; +use crate::api::session_service_server::SessionServiceServer; +use crate::session_service::SessionServiceImpl; use dotenv::dotenv; use std::collections::HashMap; use std::env; use std::str::FromStr; use std::sync::Arc; -use tokio::{select, signal}; use tokio::sync::Mutex; use tonic::transport::Server; -use tracing::{info, Level}; +use tracing::Level; use utils::consul_registration; use utils::redis_cache::RedisCache; -use utils::service_discovery::get_service_address; -use crate::api::session_service_server::SessionServiceServer; -use crate::session_service::SessionServiceImpl; pub mod common { tonic::include_proto!("common"); @@ -27,18 +25,23 @@ pub mod api { async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string()); let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string()); - - let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + + let redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string()); - let service_address = env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let service_address = + env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let service_port = port.clone(); let health_check_url = format!("http://{}:{}/health", service_address, health_port); @@ -57,7 +60,7 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; @@ -65,16 +68,18 @@ async fn main() -> Result<(), Box> { let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); let redis_cache = Arc::new(Mutex::new(RedisCache::new(&redis_url))); - let session_service = SessionServiceImpl { - redis: redis_cache - }; + let session_service = SessionServiceImpl { redis: redis_cache }; - tokio::spawn(Server::builder() - .add_service(SessionServiceServer::new(session_service)) - .serve(address)); + tokio::spawn( + Server::builder() + .add_service(SessionServiceServer::new(session_service)) + .serve(address), + ); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); Ok(()) } diff --git a/session-service/src/session_service.rs b/session-service/src/session_service.rs index 12836e7..6925286 100644 --- a/session-service/src/session_service.rs +++ b/session-service/src/session_service.rs @@ -1,12 +1,10 @@ +use crate::api::session_service_server::SessionService; +use crate::api::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest, SessionResponse}; +use crate::common::Empty; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::Mutex; use tonic::{Request, Response, Status}; -use serde::{Serialize, Deserialize}; -use crate::api::{ - CreateSessionRequest, SessionResponse, GetSessionRequest, DeleteSessionRequest, -}; -use crate::common::{Empty}; -use crate::api::session_service_server::SessionService; use utils::redis_cache::{Cache, RedisCache}; #[derive(Serialize, Deserialize, Debug)] @@ -39,10 +37,13 @@ impl SessionService for SessionServiceImpl { }; let session_id = req.session_id; - let session_data = serde_json::to_string(&session).map_err(|_| Status::internal("Failed to serialize session"))?; + let session_data = serde_json::to_string(&session) + .map_err(|_| Status::internal("Failed to serialize session"))?; let conn = self.redis.lock().await; - conn.set(&session_id.clone(), &session_data, 300).await.map_err(|_| Status::internal("Failed to store session in Redis"))?; + conn.set(&session_id.clone(), &session_data, 300) + .await + .map_err(|_| Status::internal("Failed to store session in Redis"))?; let response = SessionResponse { session_id, @@ -63,8 +64,13 @@ impl SessionService for SessionServiceImpl { let req = request.into_inner(); let conn = self.redis.lock().await; - if let Some(session_data) = conn.get::(&req.session_id).await.map_err(|_| Status::internal("Failed to fetch session from Redis"))? { - let session: Session = serde_json::from_str(&session_data).map_err(|_| Status::internal("Failed to deserialize session"))?; + if let Some(session_data) = conn + .get::(&req.session_id) + .await + .map_err(|_| Status::internal("Failed to fetch session from Redis"))? + { + let session: Session = serde_json::from_str(&session_data) + .map_err(|_| Status::internal("Failed to deserialize session"))?; let response = SessionResponse { session_id: req.session_id, @@ -88,7 +94,9 @@ impl SessionService for SessionServiceImpl { let req = request.into_inner(); let mut conn = self.redis.lock().await; - conn.delete(&req.session_id).await.map_err(|_| Status::internal("Failed to delete session from Redis"))?; + conn.delete(&req.session_id) + .await + .map_err(|_| Status::internal("Failed to delete session from Redis"))?; Ok(Response::new(Empty {})) } @@ -99,9 +107,14 @@ impl SessionService for SessionServiceImpl { let req = request.into_inner(); let conn = self.redis.lock().await; - if let Some(session_data) = conn.get::(&req.session_id).await.map_err(|_| Status::internal("Failed to fetch session from Redis"))? { + if let Some(session_data) = conn + .get::(&req.session_id) + .await + .map_err(|_| Status::internal("Failed to fetch session from Redis"))? + { let _ = conn.update(&req.session_id, Some(&session_data), Some(300)); - let session: Session = serde_json::from_str(&session_data).map_err(|_| Status::internal("Failed to deserialize session"))?; + let session: Session = serde_json::from_str(&session_data) + .map_err(|_| Status::internal("Failed to deserialize session"))?; let response = SessionResponse { session_id: req.session_id, diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 2a32ba9..a72ab04 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -5,14 +5,13 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"] } -reqwest = { version = "0.12.9", features = ["json"] } +reqwest = { version = "0.12.12", features = ["json"] } tracing = "0.1" -rand = "0.9.0-beta.1" -uuid = { version = "1.11.0", features = ["v4"] } +uuid = { version = "1.15.1", features = ["v4"] } warp = "0.3.7" -tokio = { version = "1.41.1", features = ["full"] } -bincode = { version = "2.0.0-rc.3", features = ["derive", "serde"] } -redis = "0.27.5" -deadpool-redis = "0.18.0" -async-trait = "0.1.83" -serde_json = "1.0.133" +tokio = { version = "1.43.0", features = ["full"] } +bincode = { version = "2.0.0", features = ["derive", "serde"] } +redis = "0.29.1" +deadpool-redis = "0.20.0" +async-trait = "0.1.87" +serde_json = "1.0.140" diff --git a/utils/src/consul_registration.rs b/utils/src/consul_registration.rs index 3871d8b..7de1ef5 100644 --- a/utils/src/consul_registration.rs +++ b/utils/src/consul_registration.rs @@ -2,8 +2,8 @@ use reqwest::Client; use serde::Serialize; use std::collections::HashMap; use std::env; -use std::net::ToSocketAddrs; use std::fs; +use std::net::ToSocketAddrs; use std::path::Path; use uuid::Uuid; use warp::Filter; @@ -52,8 +52,6 @@ pub fn get_or_generate_service_id(package_name: &str) -> String { service_id } - - pub async fn register_service( consul_url: &str, service_id: &str, @@ -64,7 +62,6 @@ pub async fn register_service( mut meta: HashMap, health_check_url: &str, ) -> Result<(), Box> { - meta.insert("version".to_string(), VERSION.to_string()); let registration = ConsulRegistration { id: service_id.to_string(), @@ -92,9 +89,13 @@ pub async fn register_service( Ok(()) } -pub async fn deregister_service(consul_url: &str, service_id: &str) -> Result<(), Box> { +pub async fn deregister_service( + consul_url: &str, + service_id: &str, +) -> Result<(), Box> { let client = Client::new(); - let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_id); + let consul_deregister_url = + format!("{}/v1/agent/service/deregister/{}", consul_url, service_id); client .put(&consul_deregister_url) @@ -115,7 +116,14 @@ pub async fn start_health_check(service_address: &str) -> Result<(), Box Decode for NullTerminatedString { fn decode(decoder: &mut D) -> Result { let mut buffer = Vec::new(); let mut byte = [0u8; 1]; diff --git a/utils/src/redis_cache.rs b/utils/src/redis_cache.rs index 29950c9..142846c 100644 --- a/utils/src/redis_cache.rs +++ b/utils/src/redis_cache.rs @@ -23,7 +23,7 @@ pub trait Cache { &self, key: &String, ) -> Result, redis::RedisError>; - + async fn delete(&mut self, key: &str) -> redis::RedisResult<()>; } @@ -34,7 +34,9 @@ pub struct RedisCache { impl RedisCache { pub fn new(redis_url: &str) -> Self { let cfg = Config::from_url(redis_url); - let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool"); + let pool = cfg + .create_pool(Some(Runtime::Tokio1)) + .expect("Failed to create Redis pool"); RedisCache { pool } } } @@ -47,20 +49,20 @@ impl Cache for RedisCache { value: &T, ttl: u64, ) -> Result<(), redis::RedisError> { - let mut conn = self.pool.get().await - .map_err(|err| { - redis::RedisError::from(( - redis::ErrorKind::IoError, - "Failed to get Redis connection", - format!("{:?}", err), - )) - })?; - let serialized_value = serde_json::to_string(value) - .map_err(|err| RedisError::from(( + let mut conn = self.pool.get().await.map_err(|err| { + redis::RedisError::from(( + redis::ErrorKind::IoError, + "Failed to get Redis connection", + format!("{:?}", err), + )) + })?; + let serialized_value = serde_json::to_string(value).map_err(|err| { + RedisError::from(( redis::ErrorKind::IoError, "Serialization error", format!("Serialization error: {}", err), - )))?; + )) + })?; if ttl > 0 { conn.set_ex(key, serialized_value, ttl).await } else { @@ -68,22 +70,25 @@ impl Cache for RedisCache { } } - async fn get Deserialize<'de> + Send + Sync>(&self, key: &String) -> Result, redis::RedisError> { - let mut conn = self.pool.get().await - .map_err(|err| { - redis::RedisError::from(( - redis::ErrorKind::IoError, - "Failed to get Redis connection", - format!("{:?}", err), - )) - })?; + async fn get Deserialize<'de> + Send + Sync>( + &self, + key: &String, + ) -> Result, redis::RedisError> { + let mut conn = self.pool.get().await.map_err(|err| { + redis::RedisError::from(( + redis::ErrorKind::IoError, + "Failed to get Redis connection", + format!("{:?}", err), + )) + })?; if let Some(serialized_value) = conn.get::<_, Option>(key).await? { - let deserialized_value = serde_json::from_str(&serialized_value) - .map_err(|err| RedisError::from(( + let deserialized_value = serde_json::from_str(&serialized_value).map_err(|err| { + RedisError::from(( redis::ErrorKind::IoError, "Deserialization error", format!("Deserialization error: {}", err), - )))?; + )) + })?; Ok(Some(deserialized_value)) } else { Ok(None) @@ -96,22 +101,22 @@ impl Cache for RedisCache { value: Option<&T>, ttl: Option, ) -> Result<(), redis::RedisError> { - let mut conn = self.pool.get().await - .map_err(|err| { - redis::RedisError::from(( - redis::ErrorKind::IoError, - "Failed to get Redis connection", - format!("{:?}", err), - )) - })?; + let mut conn = self.pool.get().await.map_err(|err| { + redis::RedisError::from(( + redis::ErrorKind::IoError, + "Failed to get Redis connection", + format!("{:?}", err), + )) + })?; let serialized_value; if value.is_some() { - serialized_value = serde_json::to_string(&value) - .map_err(|err| RedisError::from(( + serialized_value = serde_json::to_string(&value).map_err(|err| { + RedisError::from(( redis::ErrorKind::IoError, "Serialization error", format!("Serialization error: {}", err), - )))?; + )) + })?; } else { serialized_value = conn.get(key).await?; } @@ -123,14 +128,13 @@ impl Cache for RedisCache { } async fn delete(&mut self, key: &str) -> redis::RedisResult<()> { - let mut conn = self.pool.get().await - .map_err(|err| { - redis::RedisError::from(( - redis::ErrorKind::IoError, - "Failed to get Redis connection", - format!("{:?}", err), - )) - })?; + let mut conn = self.pool.get().await.map_err(|err| { + redis::RedisError::from(( + redis::ErrorKind::IoError, + "Failed to get Redis connection", + format!("{:?}", err), + )) + })?; conn.del(key).await } } diff --git a/utils/src/service_discovery.rs b/utils/src/service_discovery.rs index 4dfdb97..308a514 100644 --- a/utils/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -1,5 +1,5 @@ -use std::collections::HashMap; use serde::Deserialize; +use std::collections::HashMap; #[derive(Debug, Deserialize)] pub struct ServiceNode { @@ -9,23 +9,24 @@ pub struct ServiceNode { pub ServiceMeta: HashMap, } -pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result, Box> { +pub async fn get_service_address( + consul_url: &str, + service_name: &str, +) -> Result, Box> { let client = reqwest::Client::new(); let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name); - let response = client - .get(&consul_service_url) - .send() - .await?; + let response = client.get(&consul_service_url).send().await?; if !response.status().is_success() { return Err(format!( "Failed to fetch service nodes for '{}': {}", - service_name, response.status() + service_name, + response.status() ) - .into()); + .into()); } - + // Deserialize the response into a Vec let nodes: Vec = response.json().await?; @@ -48,9 +49,10 @@ async fn get_services_with_tag( if !response.status().is_success() { return Err(format!( "Failed to fetch service nodes for '{}': {}", - service_name, response.status() + service_name, + response.status() ) - .into()); + .into()); } // Deserialize the response into a Vec diff --git a/utils/src/signal_handler.rs b/utils/src/signal_handler.rs index 81fee40..121961a 100644 --- a/utils/src/signal_handler.rs +++ b/utils/src/signal_handler.rs @@ -16,13 +16,15 @@ async fn terminate_signal() { #[cfg(unix)] { use tokio::signal::unix::{signal, SignalKind}; - let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler"); + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler"); sigterm.recv().await; } #[cfg(windows)] { - let mut ctrlbreak = signal::windows::ctrl_break().expect("Failed to set up CTRL_BREAK handler"); + let mut ctrlbreak = + signal::windows::ctrl_break().expect("Failed to set up CTRL_BREAK handler"); ctrlbreak.recv().await; } -} \ No newline at end of file +} diff --git a/world-service/build.rs b/world-service/build.rs index 5bec92a..1ca8db6 100644 --- a/world-service/build.rs +++ b/world-service/build.rs @@ -6,11 +6,14 @@ fn main() { .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") .compile_protos(&["../proto/world.proto"], &["../proto"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); - + // gRPC Client code tonic_build::configure() .build_server(false) // Generate gRPC client code .compile_well_known_types(true) - .compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto"], &["../proto"]) + .compile_protos( + &["../proto/user_db_api.proto", "../proto/auth.proto"], + &["../proto"], + ) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/world-service/src/main.rs b/world-service/src/main.rs index 5985d99..59bc91c 100644 --- a/world-service/src/main.rs +++ b/world-service/src/main.rs @@ -2,8 +2,7 @@ use dotenv::dotenv; use std::collections::HashMap; use std::env; use std::str::FromStr; -use tokio::{select, signal}; -use tracing::{info, Level}; +use tracing::Level; use utils::consul_registration; use utils::service_discovery::get_service_address; @@ -11,7 +10,10 @@ use utils::service_discovery::get_service_address; async fn main() -> Result<(), Box> { dotenv().ok(); tracing_subscriber::fmt() - .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) + .with_max_level( + Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) + .unwrap_or_else(|_| Level::INFO), + ) .init(); // Set the gRPC server address @@ -21,7 +23,8 @@ async fn main() -> Result<(), Box> { let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string()); - let service_address = env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let service_address = + env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let service_port = port.clone(); let health_check_url = format!("http://{}:{}/health", service_address, health_port); let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); @@ -43,15 +46,20 @@ async fn main() -> Result<(), Box> { meta, &health_check_url, ) - .await?; + .await?; // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; let db_address = db_nodes.get(0).unwrap(); - let db_url = format!("http://{}:{}", db_address.ServiceAddress, db_address.ServicePort); + let db_url = format!( + "http://{}:{}", + db_address.ServiceAddress, db_address.ServicePort + ); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()).await.expect(""); + consul_registration::deregister_service(&consul_url, service_id.as_str()) + .await + .expect(""); Ok(()) }