Add comprehensive documentation and unit tests

Documentation:
- Add detailed README files for all services (auth, character, database, launcher, packet, utils, world)
- Create API documentation for the database service with detailed endpoint specifications
- Document database schema and relationships
- Add service architecture overviews and configuration instructions

Unit Tests:
- Implement comprehensive test suite for database repositories (user, character, session)
- Add gRPC service tests for database interactions
- Create tests for packet service components (bufferpool, connection, packets)
- Add utility service tests (health check, logging, load balancer, redis cache, service discovery)
- Implement auth service user tests
- Add character service tests

Code Structure:
- Reorganize test files into a more consistent structure
- Create a dedicated tests crate for integration testing
- Add test helpers and mock implementations for easier testing
This commit is contained in:
2025-04-09 13:29:38 -04:00
parent d47d5f44b1
commit a8755bd3de
85 changed files with 4218 additions and 764 deletions

81
utils/README.md Normal file
View File

@@ -0,0 +1,81 @@
# Utils Module
This module provides shared utilities used by all microservices in the MMORPG server architecture.
## Components
### Service Discovery
The `service_discovery.rs` module provides functionality for discovering services in both Kubernetes and Consul environments:
- `get_service_endpoints_by_dns`: Discovers service endpoints using Consul DNS
- `get_kube_service_endpoints_by_dns`: Discovers service endpoints using Kubernetes DNS
- `get_service_info`: Retrieves detailed information about a Kubernetes service
- `get_services_by_label`: Finds Kubernetes services matching specific labels
### Redis Cache
The `redis_cache.rs` module provides a caching layer using Redis:
- Implements the `Cache` trait for standardized cache operations
- Provides methods for getting, setting, and deleting cached values
- Supports TTL (Time To Live) for cached entries
### Multi-Service Load Balancer
The `multi_service_load_balancer.rs` module provides load balancing across multiple service instances:
- Supports Random and Round-Robin load balancing strategies
- Dynamically refreshes service endpoints
- Provides failover capabilities
### Signal Handler
The `signal_handler.rs` module provides graceful shutdown capabilities:
- `wait_for_signal`: Waits for termination signals (SIGINT, SIGTERM)
- Cross-platform support for Unix and Windows signals
### Consul Registration
The `consul_registration.rs` module provides service registration with Consul:
- `register_service`: Registers a service with Consul
- `generate_service_id`: Generates unique service IDs
- `get_or_generate_service_id`: Retrieves or creates service IDs
### Health Check
The `health_check.rs` module provides HTTP health check endpoints:
- `start_health_check`: Starts a health check endpoint on a specified port
### Logging
The `logging.rs` module provides standardized logging setup:
- `setup_logging`: Configures tracing with appropriate log levels
## Usage
Import the required utilities in your service:
```rust
use utils::logging;
use utils::service_discovery::get_kube_service_endpoints_by_dns;
use utils::signal_handler::wait_for_signal;
// Setup logging
logging::setup_logging("my-service", &["my_service"]);
// Discover services
let db_url = format!("http://{}",
get_kube_service_endpoints_by_dns("database-service", "tcp", "database-service")
.await?
.get(0)
.unwrap()
);
// Wait for termination signal
wait_for_signal().await;
```

View File

@@ -88,19 +88,11 @@ pub async fn register_service(
Ok(())
}
pub async fn deregister_service(
consul_url: &str,
service_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn deregister_service(consul_url: &str, service_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let consul_deregister_url =
format!("{}/v1/agent/service/deregister/{}", consul_url, service_id);
let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_id);
client
.put(&consul_deregister_url)
.send()
.await?
.error_for_status()?; // Ensure response is successful
client.put(&consul_deregister_url).send().await?.error_for_status()?; // Ensure response is successful
Ok(())
}

View File

@@ -12,14 +12,7 @@ pub async fn start_health_check(service_address: &str) -> Result<(), Box<dyn std
.map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK))
.with(log);
tokio::spawn(
warp::serve(health_route).run(
health_check_endpoint_addr
.to_socket_addrs()?
.next()
.unwrap(),
),
);
tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap()));
Ok(())
}

View File

@@ -1,8 +1,8 @@
pub mod consul_registration;
pub mod health_check;
pub mod logging;
pub mod multi_service_load_balancer;
pub mod null_string;
pub mod redis_cache;
pub mod service_discovery;
pub mod signal_handler;
pub mod multi_service_load_balancer;
pub mod logging;
pub mod health_check;

View File

@@ -4,23 +4,21 @@ use tracing_subscriber::EnvFilter;
pub fn setup_logging(app_name: &str, additional_crates: &[&str]) {
let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
let mut filter_string = format!("{app_name}={log_level},utils={log_level}");
for (crate_name) in additional_crates {
for &crate_name in additional_crates {
filter_string.push(',');
filter_string.push_str(&format!("{crate_name}={log_level}"));
}
let filter = EnvFilter::try_new(filter_string)
.unwrap_or_else(|_| EnvFilter::new(format!("{app_name}=info,utils=info")));
tracing_subscriber::fmt()
.with_env_filter(filter)
.init();
let filter =
EnvFilter::try_new(filter_string).unwrap_or_else(|_| EnvFilter::new(format!("{app_name}=info,utils=info")));
tracing_subscriber::fmt().with_env_filter(filter).init();
error!("Error messages enabled");
warn!("Warning messages enabled");
info!("Info messages enabled");
debug!("Debug messages enabled");
trace!("Trace messages enabled");
}
}

View File

@@ -1,135 +1,132 @@
use crate::service_discovery::get_service_endpoints_by_dns;
use rand::seq::SliceRandom;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use rand::seq::SliceRandom;
use crate::service_discovery::get_service_endpoints_by_dns;
pub enum LoadBalancingStrategy {
Random,
RoundRobin,
Random,
RoundRobin,
}
// Service identifier
#[derive(Clone, PartialEq, Eq, Hash)]
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ServiceId {
pub name: String,
pub protocol: String,
pub name: String,
pub protocol: String,
}
impl ServiceId {
pub fn new(name: &str, protocol: &str) -> Self {
ServiceId {
name: name.to_string(),
protocol: protocol.to_string(),
pub fn new(name: &str, protocol: &str) -> Self {
ServiceId {
name: name.to_string(),
protocol: protocol.to_string(),
}
}
}
}
// Per-service state
struct ServiceState {
endpoints: Vec<SocketAddr>,
current_index: usize,
endpoints: Vec<SocketAddr>,
current_index: usize,
}
impl ServiceState {
fn new(endpoints: Vec<SocketAddr>) -> Self {
ServiceState {
endpoints,
current_index: 0,
}
}
fn get_endpoint(&mut self, strategy: &LoadBalancingStrategy) -> Option<SocketAddr> {
if self.endpoints.is_empty() {
return None;
fn new(endpoints: Vec<SocketAddr>) -> Self {
ServiceState {
endpoints,
current_index: 0,
}
}
match strategy {
LoadBalancingStrategy::Random => {
let mut rng = rand::thread_rng();
self.endpoints.choose(&mut rng).copied()
}
LoadBalancingStrategy::RoundRobin => {
let endpoint = self.endpoints[self.current_index].clone();
self.current_index = (self.current_index + 1) % self.endpoints.len();
Some(endpoint)
}
fn get_endpoint(&mut self, strategy: &LoadBalancingStrategy) -> Option<SocketAddr> {
if self.endpoints.is_empty() {
return None;
}
match strategy {
LoadBalancingStrategy::Random => {
let mut rng = rand::thread_rng();
self.endpoints.choose(&mut rng).copied()
}
LoadBalancingStrategy::RoundRobin => {
let endpoint = self.endpoints[self.current_index].clone();
self.current_index = (self.current_index + 1) % self.endpoints.len();
Some(endpoint)
}
}
}
}
}
pub struct MultiServiceLoadBalancer {
consul_url: String,
strategy: LoadBalancingStrategy,
services: Arc<Mutex<HashMap<ServiceId, ServiceState>>>,
consul_url: String,
strategy: LoadBalancingStrategy,
services: Arc<Mutex<HashMap<ServiceId, ServiceState>>>,
}
impl MultiServiceLoadBalancer {
pub fn new(consul_url: &str, strategy: LoadBalancingStrategy) -> Self {
MultiServiceLoadBalancer {
consul_url: consul_url.to_string(),
strategy,
services: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn get_endpoint(
&self,
service_name: &str,
service_protocol: &str,
) -> Result<Option<SocketAddr>, Box<dyn std::error::Error>> {
let service_id = ServiceId::new(service_name, service_protocol);
// Try to get an endpoint from the cache first
{
let mut services = self.services.lock().unwrap();
if let Some(service_state) = services.get_mut(&service_id) {
if let Some(endpoint) = service_state.get_endpoint(&self.strategy) {
return Ok(Some(endpoint));
pub fn new(consul_url: &str, strategy: LoadBalancingStrategy) -> Self {
MultiServiceLoadBalancer {
consul_url: consul_url.to_string(),
strategy,
services: Arc::new(Mutex::new(HashMap::new())),
}
}
}
// If we don't have endpoints or they're all unavailable, refresh them
self.refresh_service_endpoints(service_name, service_protocol).await?;
pub async fn get_endpoint(
&self,
service_name: &str,
service_protocol: &str,
) -> Result<Option<SocketAddr>, Box<dyn std::error::Error>> {
let service_id = ServiceId::new(service_name, service_protocol);
// Try again after refresh
let mut services = self.services.lock().unwrap();
if let Some(service_state) = services.get_mut(&service_id) {
return Ok(service_state.get_endpoint(&self.strategy));
// Try to get an endpoint from the cache first
{
let mut services = self.services.lock().unwrap();
if let Some(service_state) = services.get_mut(&service_id) {
if let Some(endpoint) = service_state.get_endpoint(&self.strategy) {
return Ok(Some(endpoint));
}
}
}
// If we don't have endpoints or they're all unavailable, refresh them
self.refresh_service_endpoints(service_name, service_protocol).await?;
// Try again after refresh
let mut services = self.services.lock().unwrap();
if let Some(service_state) = services.get_mut(&service_id) {
return Ok(service_state.get_endpoint(&self.strategy));
}
Ok(None)
}
Ok(None)
}
pub async fn refresh_service_endpoints(
&self,
service_name: &str,
service_protocol: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let endpoints = get_service_endpoints_by_dns(&self.consul_url, service_protocol, service_name).await?;
pub async fn refresh_service_endpoints(
&self,
service_name: &str,
service_protocol: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let endpoints = get_service_endpoints_by_dns(
&self.consul_url,
service_protocol,
service_name,
).await?;
let service_id = ServiceId::new(service_name, service_protocol);
let mut services = self.services.lock().unwrap();
let service_id = ServiceId::new(service_name, service_protocol);
let mut services = self.services.lock().unwrap();
services.insert(service_id, ServiceState::new(endpoints));
Ok(())
}
pub async fn refresh_all_services(&self) -> Result<(), Box<dyn std::error::Error>> {
let service_ids: Vec<ServiceId> = {
let services = self.services.lock().unwrap();
services.keys().cloned().collect()
};
for service_id in service_ids {
self.refresh_service_endpoints(&service_id.name, &service_id.protocol).await?;
services.insert(service_id, ServiceState::new(endpoints));
Ok(())
}
Ok(())
}
}
pub async fn refresh_all_services(&self) -> Result<(), Box<dyn std::error::Error>> {
let service_ids: Vec<ServiceId> = {
let services = self.services.lock().unwrap();
services.keys().cloned().collect()
};
for service_id in service_ids {
self.refresh_service_endpoints(&service_id.name, &service_id.protocol)
.await?;
}
Ok(())
}
}

View File

@@ -1,16 +1,12 @@
use async_trait::async_trait;
use deadpool_redis::{Config, Pool, Runtime};
use redis::{AsyncCommands, Commands, RedisError};
use redis::{AsyncCommands, RedisError};
use serde::{Deserialize, Serialize};
#[async_trait]
pub trait Cache {
async fn set<T: Serialize + Send + Sync>(
&self,
key: &String,
value: &T,
ttl: u64,
) -> Result<(), redis::RedisError>;
async fn set<T: Serialize + Send + Sync>(&self, key: &String, value: &T, ttl: u64)
-> Result<(), redis::RedisError>;
async fn update<T: Serialize + Send + Sync>(
&self,

View File

@@ -1,14 +1,18 @@
use hickory_resolver::config::*;
use hickory_resolver::{Resolver, TokioAsyncResolver};
use hickory_resolver::system_conf::read_system_conf;
use hickory_resolver::{TokioAsyncResolver};
use k8s_openapi::api::core::v1::Service;
use kube::{Api, Client};
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::str::FromStr;
use kube::{Client, Api};
use k8s_openapi::api::core::v1::Service;
use std::collections::{BTreeMap};
use hickory_resolver::system_conf::read_system_conf;
use tracing::debug;
pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
pub async fn get_service_endpoints_by_dns(
consul_url: &str,
service_protocol: &str,
service_name: &str,
) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let mut rc = ResolverConfig::new();
let url = consul_url.parse()?;
rc.add_name_server(NameServerConfig::new(url, Protocol::Tcp));
@@ -23,14 +27,22 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s
let hostname = record.target();
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
for response in lookup_responses {
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
endpoints.push(SocketAddr::from_str(&format!(
"{}:{}",
&response.to_string(),
record.port()
))?);
}
}
Ok(endpoints)
}
pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
pub async fn get_kube_service_endpoints_by_dns(
port_name: &str,
service_protocol: &str,
service_name: &str,
) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let (config, options) = read_system_conf()?;
let resolver = TokioAsyncResolver::tokio(config, options);
@@ -42,7 +54,11 @@ pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol
let hostname = record.target();
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
for response in lookup_responses {
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
endpoints.push(SocketAddr::from_str(&format!(
"{}:{}",
&response.to_string(),
record.port()
))?);
}
}
@@ -58,7 +74,7 @@ pub struct ServiceInfo {
}
pub async fn get_service_info(
namespace: &str,
_namespace: &str,
service_name: &str,
) -> Result<ServiceInfo, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::try_default().await?;

View File

@@ -16,15 +16,13 @@ async fn terminate_signal() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler");
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler");
sigterm.recv().await;
}
#[cfg(windows)]
{
let mut ctrlbreak =
signal::windows::ctrl_break().expect("Failed to set up CTRL_BREAK handler");
let mut ctrlbreak = signal::windows::ctrl_break().expect("Failed to set up CTRL_BREAK handler");
ctrlbreak.recv().await;
}
}