- update: major refactor of the database-service to make it easy to add newer api services
- add: character database api
This commit is contained in:
@@ -11,6 +11,6 @@ fn main() {
|
|||||||
tonic_build::configure()
|
tonic_build::configure()
|
||||||
.build_server(false) // Generate gRPC client code
|
.build_server(false) // Generate gRPC client code
|
||||||
.compile_well_known_types(true)
|
.compile_well_known_types(true)
|
||||||
.compile_protos(&["../proto/database.proto"], &["../proto"])
|
.compile_protos(&["../proto/user_db_api.proto"], &["../proto"])
|
||||||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::database::{database_service_client::DatabaseServiceClient, CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
use crate::database::{user_service_client::UserServiceClient, CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
@@ -18,7 +18,7 @@ pub trait DatabaseClientTrait: Sized {
|
|||||||
}
|
}
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct DatabaseClient {
|
pub struct DatabaseClient {
|
||||||
client: DatabaseServiceClient<Channel>,
|
client: UserServiceClient<Channel>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -31,7 +31,7 @@ pub struct PasswordReset {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl DatabaseClientTrait for DatabaseClient {
|
impl DatabaseClientTrait for DatabaseClient {
|
||||||
async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
|
async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
let client = DatabaseServiceClient::connect(endpoint.to_string()).await?;
|
let client = UserServiceClient::connect(endpoint.to_string()).await?;
|
||||||
Ok(Self { client })
|
Ok(Self { client })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ pub mod auth {
|
|||||||
tonic::include_proto!("auth"); // Path matches the package name in auth.proto
|
tonic::include_proto!("auth"); // Path matches the package name in auth.proto
|
||||||
}
|
}
|
||||||
pub mod database {
|
pub mod database {
|
||||||
tonic::include_proto!("database"); // Matches package name in database.proto
|
tonic::include_proto!("user_db_api"); // Matches package name in user_db_api.proto
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -11,6 +11,6 @@ fn main() {
|
|||||||
tonic_build::configure()
|
tonic_build::configure()
|
||||||
.build_server(false) // Generate gRPC client code
|
.build_server(false) // Generate gRPC client code
|
||||||
.compile_well_known_types(true)
|
.compile_well_known_types(true)
|
||||||
.compile_protos(&["../proto/database.proto", "../proto/auth.proto"], &["../proto"])
|
.compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto"], &["../proto"])
|
||||||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,6 @@ fn main() {
|
|||||||
.build_server(true)
|
.build_server(true)
|
||||||
.compile_well_known_types(true)
|
.compile_well_known_types(true)
|
||||||
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
|
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
|
||||||
.compile_protos(&["../proto/database.proto"], &["../proto"])
|
.compile_protos(&["../proto/user_db_api.proto", "../proto/character_db_api.proto"], &["../proto"])
|
||||||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
}
|
}
|
||||||
|
|||||||
110
database-service/src/characters.rs
Normal file
110
database-service/src/characters.rs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
use sqlx::{FromRow, Row};
|
||||||
|
use crate::redis_cache::{Cache, RedisCache}; // Import RedisCache
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
#[derive(Debug, FromRow, Serialize, Deserialize)]
|
||||||
|
pub struct Character {
|
||||||
|
pub id: i32,
|
||||||
|
pub user_id: i32,
|
||||||
|
pub name: String,
|
||||||
|
pub level: i16,
|
||||||
|
pub experience: i64,
|
||||||
|
pub inventory: serde_json::Value,
|
||||||
|
pub stats: serde_json::Value,
|
||||||
|
pub looks: serde_json::Value,
|
||||||
|
pub position: serde_json::Value,
|
||||||
|
pub created_at: chrono::NaiveDateTime,
|
||||||
|
pub updated_at: chrono::NaiveDateTime,
|
||||||
|
pub deleted_at: Option<chrono::NaiveDateTime>,
|
||||||
|
pub is_active: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CharacterRepository {
|
||||||
|
pool: sqlx::PgPool,
|
||||||
|
cache: Arc<Mutex<RedisCache>>, // Thread-safe RedisCache
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CharacterRepository {
|
||||||
|
pub fn new(pool: sqlx::PgPool, cache: Arc<Mutex<RedisCache>>) -> Self {
|
||||||
|
Self { pool, cache }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_character_by_id(&self, character_id: i32) -> Result<Character, sqlx::Error> {
|
||||||
|
let cache_key = format!("character:{}", character_id);
|
||||||
|
|
||||||
|
// Try fetching from Redis cache
|
||||||
|
if let Some(character) = self.cache.lock().await.get::<Character>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? {
|
||||||
|
return Ok(character);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from database
|
||||||
|
let character = sqlx::query_as::<_, Character>(
|
||||||
|
"SELECT id, user_id, name, level, experience, inventory, stats, looks, position, \
|
||||||
|
created_at, updated_at, deleted_at, is_active \
|
||||||
|
FROM characters WHERE id = $1 AND is_active = true",
|
||||||
|
)
|
||||||
|
.bind(character_id)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Cache result
|
||||||
|
self.cache.lock().await.set(&cache_key, &character, 300).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
|
Ok(character)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_character(&self, user_id: i32, name: &str, inventory: serde_json::Value, stats: serde_json::Value, looks: serde_json::Value, position: serde_json::Value) -> Result<i32, sqlx::Error> {
|
||||||
|
let result = sqlx::query(
|
||||||
|
"INSERT INTO characters (user_id, name, level, experience, inventory, stats, looks, position, created_at, updated_at, is_active) \
|
||||||
|
VALUES ($1, $2, 1, 0, $3, $4, $5, $6, NOW(), NOW(), true) RETURNING id",
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.bind(name)
|
||||||
|
.bind(inventory)
|
||||||
|
.bind(stats)
|
||||||
|
.bind(looks)
|
||||||
|
.bind(position)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(result.get("id"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_character(&self, character_id: i32) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE characters SET deleted_at = NOW(), is_active = false WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(character_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Invalidate cache
|
||||||
|
let cache_key = format!("character:{}", character_id);
|
||||||
|
self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_characters_by_user(&self, user_id: i32) -> Result<Vec<Character>, sqlx::Error> {
|
||||||
|
let cache_key = format!("character:user:{}", user_id);
|
||||||
|
|
||||||
|
// Try fetching from Redis cache
|
||||||
|
if let Some(characters) = self.cache.lock().await.get::<Vec<Character>>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? {
|
||||||
|
return Ok(characters);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from database
|
||||||
|
let characters = sqlx::query_as::<_, Character>(
|
||||||
|
"SELECT id, user_id, name, level, experience, inventory, stats, looks, position, \
|
||||||
|
created_at, updated_at, deleted_at, is_active \
|
||||||
|
FROM characters WHERE user_id = $1 AND is_active = true",
|
||||||
|
)
|
||||||
|
.bind(user_id)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Cache result
|
||||||
|
self.cache.lock().await.set(&cache_key, &characters, 300).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
|
Ok(characters)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,24 +1,23 @@
|
|||||||
|
use crate::users::UserRepository;
|
||||||
|
use crate::characters::CharacterRepository;
|
||||||
use crate::redis_cache::RedisCache;
|
use crate::redis_cache::RedisCache;
|
||||||
use crate::users::UsersService;
|
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
pub users_service: UsersService, // User-specific functionality
|
pub user_repo: Arc<UserRepository>,
|
||||||
|
pub character_repo: Arc<CharacterRepository>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
pub async fn new(pool: PgPool, cache: Arc<RedisCache>) -> Self {
|
pub fn new(pool: PgPool, redis_cache: Arc<Mutex<RedisCache>>) -> Self {
|
||||||
let users_service = UsersService { pool, cache };
|
let user_repo = Arc::new(UserRepository::new(pool.clone(), redis_cache.clone()));
|
||||||
|
let character_repo = Arc::new(CharacterRepository::new(pool.clone(), redis_cache));
|
||||||
|
|
||||||
Self { users_service }
|
Self {
|
||||||
}
|
user_repo,
|
||||||
|
character_repo,
|
||||||
pub async fn health_check(&self) -> bool {
|
}
|
||||||
// Simple query to check database health
|
|
||||||
sqlx::query("SELECT 1")
|
|
||||||
.execute(&self.users_service.pool)
|
|
||||||
.await
|
|
||||||
.is_ok()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
93
database-service/src/grpc/character_service.rs
Normal file
93
database-service/src/grpc/character_service.rs
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
use crate::grpc::{CharacterRequest, CharacterResponse, CreateCharacterRequest, DeleteCharacterRequest, Empty};
|
||||||
|
use crate::grpc::character_service_server::CharacterService;
|
||||||
|
use crate::grpc::database_service::MyDatabaseService;
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl CharacterService for MyDatabaseService {
|
||||||
|
async fn get_character(
|
||||||
|
&self,
|
||||||
|
request: Request<CharacterRequest>,
|
||||||
|
) -> Result<Response<CharacterResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
let repo = &self.db.character_repo;
|
||||||
|
|
||||||
|
let character = repo
|
||||||
|
.get_character_by_id(req.character_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::not_found("Character not found"))?;
|
||||||
|
|
||||||
|
let response = CharacterResponse {
|
||||||
|
id: character.id,
|
||||||
|
user_id: character.user_id,
|
||||||
|
name: character.name,
|
||||||
|
level: character.level as i32,
|
||||||
|
experience: character.experience,
|
||||||
|
inventory: character.inventory.to_string(),
|
||||||
|
stats: character.stats.to_string(),
|
||||||
|
looks: character.looks.to_string(),
|
||||||
|
position: character.position.to_string(),
|
||||||
|
created_at: character.created_at.to_string(),
|
||||||
|
updated_at: character.updated_at.to_string(),
|
||||||
|
is_active: character.is_active,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(response))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_character(
|
||||||
|
&self,
|
||||||
|
request: Request<CreateCharacterRequest>,
|
||||||
|
) -> Result<Response<CharacterResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
let repo = &self.db.character_repo;
|
||||||
|
|
||||||
|
let character_id = repo
|
||||||
|
.create_character(
|
||||||
|
req.user_id,
|
||||||
|
&req.name,
|
||||||
|
serde_json::from_str(&req.inventory).unwrap_or_default(),
|
||||||
|
serde_json::from_str(&req.stats).unwrap_or_default(),
|
||||||
|
serde_json::from_str(&req.looks).unwrap_or_default(),
|
||||||
|
serde_json::from_str(&req.position).unwrap_or_default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::internal("Failed to create character"))?;
|
||||||
|
|
||||||
|
let character = repo
|
||||||
|
.get_character_by_id(character_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::not_found("Character not found"))?;
|
||||||
|
|
||||||
|
let response = CharacterResponse {
|
||||||
|
id: character.id,
|
||||||
|
user_id: character.user_id,
|
||||||
|
name: character.name,
|
||||||
|
level: character.level as i32,
|
||||||
|
experience: character.experience,
|
||||||
|
inventory: character.inventory.to_string(),
|
||||||
|
stats: character.stats.to_string(),
|
||||||
|
looks: character.looks.to_string(),
|
||||||
|
position: character.position.to_string(),
|
||||||
|
created_at: character.created_at.to_string(),
|
||||||
|
updated_at: character.updated_at.to_string(),
|
||||||
|
is_active: character.is_active,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(response))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_character(
|
||||||
|
&self,
|
||||||
|
request: Request<DeleteCharacterRequest>,
|
||||||
|
) -> Result<Response<Empty>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
let repo = &self.db.character_repo;
|
||||||
|
|
||||||
|
repo.delete_character(req.character_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::internal("Failed to delete character"))?;
|
||||||
|
|
||||||
|
Ok(Response::new(Empty {}))
|
||||||
|
}
|
||||||
|
}
|
||||||
7
database-service/src/grpc/database_service.rs
Normal file
7
database-service/src/grpc/database_service.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
use crate::db::Database;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct MyDatabaseService {
|
||||||
|
pub db: Arc<Database>, // Use the Database struct from users.rs
|
||||||
|
}
|
||||||
6
database-service/src/grpc/mod.rs
Normal file
6
database-service/src/grpc/mod.rs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
pub mod database_service;
|
||||||
|
pub mod user_service;
|
||||||
|
mod character_service;
|
||||||
|
|
||||||
|
tonic::include_proto!("user_db_api");
|
||||||
|
tonic::include_proto!("character_db_api");
|
||||||
@@ -1,22 +1,17 @@
|
|||||||
use crate::database::{CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
use crate::grpc::{CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
||||||
use crate::db::Database;
|
use crate::grpc::user_service_server::UserService;
|
||||||
|
use crate::grpc::database_service::MyDatabaseService;
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
use crate::database::database_service_server::DatabaseService;
|
|
||||||
|
|
||||||
pub struct MyDatabaseService {
|
|
||||||
pub db: Database, // Use the Database struct from users.rs
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl DatabaseService for MyDatabaseService {
|
impl UserService for MyDatabaseService {
|
||||||
async fn get_user(
|
async fn get_user(
|
||||||
&self,
|
&self,
|
||||||
request: Request<GetUserRequest>,
|
request: Request<GetUserRequest>,
|
||||||
) -> Result<Response<GetUserResponse>, Status> {
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
|
|
||||||
let user = self.db.users_service.get_user_by_id(req.user_id)
|
let user = self.db.user_repo.get_user_by_id(req.user_id)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Status::not_found("User not found"))?;
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
@@ -35,7 +30,7 @@ impl DatabaseService for MyDatabaseService {
|
|||||||
) -> Result<Response<CreateUserResponse>, Status> {
|
) -> Result<Response<CreateUserResponse>, Status> {
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
|
|
||||||
let user_id = self.db.users_service.create_user(&req.username, &req.email, &req.hashed_password)
|
let user_id = self.db.user_repo.create_user(&req.username, &req.email, &req.hashed_password, &[])
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Status::internal("Failed to create user"))?;
|
.map_err(|_| Status::internal("Failed to create user"))?;
|
||||||
|
|
||||||
@@ -49,7 +44,7 @@ impl DatabaseService for MyDatabaseService {
|
|||||||
) -> Result<Response<GetUserResponse>, Status> {
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
|
|
||||||
let user = self.db.users_service.get_user_by_username(&req.username)
|
let user = self.db.user_repo.get_user_by_username(&req.username)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Status::not_found("User not found"))?;
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
@@ -68,7 +63,7 @@ impl DatabaseService for MyDatabaseService {
|
|||||||
) -> Result<Response<GetUserResponse>, Status> {
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
|
|
||||||
let user = self.db.users_service.get_user_by_email(&req.email)
|
let user = self.db.user_repo.get_user_by_email(&req.email)
|
||||||
.await
|
.await
|
||||||
.map_err(|_| Status::not_found("User not found"))?;
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
@@ -1,8 +1,5 @@
|
|||||||
pub mod users;
|
pub mod users;
|
||||||
|
pub mod characters;
|
||||||
pub mod redis_cache;
|
pub mod redis_cache;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod grpc;
|
pub mod grpc;
|
||||||
|
|
||||||
pub mod database {
|
|
||||||
tonic::include_proto!("database");
|
|
||||||
}
|
|
||||||
@@ -1,19 +1,18 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use database::database_service_server::DatabaseServiceServer;
|
|
||||||
use database_service::database;
|
|
||||||
use database_service::db::Database;
|
use database_service::db::Database;
|
||||||
use database_service::grpc::MyDatabaseService;
|
use database_service::grpc::database_service::MyDatabaseService;
|
||||||
|
use database_service::grpc::user_service_server::UserServiceServer;
|
||||||
|
use database_service::grpc::character_service_server::CharacterServiceServer;
|
||||||
use database_service::redis_cache::RedisCache;
|
use database_service::redis_cache::RedisCache;
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
use sqlx::postgres::PgPoolOptions;
|
use sqlx::postgres::PgPoolOptions;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::net::ToSocketAddrs;
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::{select, signal};
|
use tokio::{select, signal};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
use tracing::{info, Level};
|
use tracing::{info, Level};
|
||||||
use warp::Filter;
|
|
||||||
use utils::consul_registration;
|
use utils::consul_registration;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -32,15 +31,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
|
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
|
||||||
let service_address = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());;
|
let service_address = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||||
let service_port = port.clone();
|
let service_port = port.clone();
|
||||||
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
|
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
|
||||||
let health_check_endpoint_addr = format!("{}:{}", service_address, health_port);
|
|
||||||
|
|
||||||
// Register service with Consul
|
// Register service with Consul
|
||||||
let service_id = consul_registration::generate_service_id();
|
let service_id = consul_registration::generate_service_id();
|
||||||
let tags = vec!["version-1.0".to_string()];
|
let tags = vec!["version-1.0".to_string()];
|
||||||
let mut meta = HashMap::new();
|
let meta = HashMap::new();
|
||||||
consul_registration::register_service(
|
consul_registration::register_service(
|
||||||
&consul_url,
|
&consul_url,
|
||||||
service_id.as_str(),
|
service_id.as_str(),
|
||||||
@@ -63,17 +61,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to create PostgreSQL connection pool");
|
.expect("Failed to create PostgreSQL connection pool");
|
||||||
|
|
||||||
let redis_cache = RedisCache::new(&redis_url);
|
|
||||||
|
|
||||||
let cache = Arc::new(redis_cache); // Share the cache instance between tasks
|
let redis_cache = Arc::new(Mutex::new(RedisCache::new(&redis_url)));
|
||||||
let database_service = MyDatabaseService {
|
let db = Arc::new(Database::new(pool, redis_cache));
|
||||||
db: Database::new(pool, cache).await,
|
let my_service = MyDatabaseService { db };
|
||||||
};
|
|
||||||
|
|
||||||
// Pass `shared_cache` into services as needed
|
// Pass `shared_cache` into services as needed
|
||||||
info!("Database Service running on {}", address);
|
info!("Database Service running on {}", address);
|
||||||
tokio::spawn(Server::builder()
|
tokio::spawn(Server::builder()
|
||||||
.add_service(DatabaseServiceServer::new(database_service))
|
.add_service(UserServiceServer::new(my_service.clone()))
|
||||||
|
.add_service(CharacterServiceServer::new(my_service))
|
||||||
.serve(address));
|
.serve(address));
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
|
|||||||
@@ -1,168 +1,121 @@
|
|||||||
use crate::redis_cache::{Cache, RedisCache};
|
use sqlx::{FromRow, Row};
|
||||||
use serde::{Deserialize, Serialize};
|
use crate::redis_cache::{RedisCache, Cache}; // Import RedisCache and Cache Trait
|
||||||
use sqlx::Error;
|
use serde::{Serialize, Deserialize};
|
||||||
use sqlx::PgPool;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
#[derive(Debug, FromRow, Serialize, Deserialize)]
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub email: String,
|
pub email: String,
|
||||||
pub hashed_password: String,
|
pub hashed_password: String,
|
||||||
pub roles: Vec<String>,
|
pub roles: Vec<String>,
|
||||||
|
pub created_at: chrono::NaiveDateTime,
|
||||||
|
pub updated_at: chrono::NaiveDateTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct UsersService {
|
pub struct UserRepository {
|
||||||
pub pool: PgPool,
|
pool: sqlx::PgPool,
|
||||||
pub cache: Arc<RedisCache>, // Shared Redis cache
|
cache: Arc<Mutex<RedisCache>>, // Thread-safe RedisCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl UserRepository {
|
||||||
impl UsersService {
|
pub fn new(pool: sqlx::PgPool, cache: Arc<Mutex<RedisCache>>) -> Self {
|
||||||
pub async fn create_user(
|
Self { pool, cache }
|
||||||
&self,
|
|
||||||
username: &str,
|
|
||||||
email: &str,
|
|
||||||
hashed_password: &str,
|
|
||||||
) -> Result<i32, Error> {
|
|
||||||
let result = sqlx::query!(
|
|
||||||
r#"
|
|
||||||
INSERT INTO users (username, email, hashed_password)
|
|
||||||
VALUES ($1, $2, $3)
|
|
||||||
RETURNING id
|
|
||||||
"#,
|
|
||||||
username,
|
|
||||||
email,
|
|
||||||
hashed_password
|
|
||||||
)
|
|
||||||
.fetch_one(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(result.id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_user_by_id(&self, user_id: i32) -> Result<User, sqlx::Error> {
|
pub async fn get_user_by_id(&self, user_id: i32) -> Result<User, sqlx::Error> {
|
||||||
// Check Redis cache first
|
let cache_key = format!("user:{}", user_id);
|
||||||
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user:{}", user_id)).await {
|
|
||||||
return Ok(cached_user);
|
if let Some(user) = self.cache.lock().await.get::<User>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? {
|
||||||
|
return Ok(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from PostgreSQL
|
let user = sqlx::query_as::<_, User>(
|
||||||
let row = sqlx::query!(
|
"SELECT id, username, email, hashed_password, roles, created_at, updated_at FROM users WHERE id = $1",
|
||||||
"SELECT id, username, email, hashed_password, roles FROM users WHERE id = $1",
|
|
||||||
user_id
|
|
||||||
)
|
)
|
||||||
|
.bind(user_id)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let user = User {
|
self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
id: row.id,
|
|
||||||
username: row.username,
|
|
||||||
email: row.email,
|
|
||||||
hashed_password: row.hashed_password,
|
|
||||||
roles: row.roles.unwrap_or_default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Store result in Redis
|
|
||||||
self.cache
|
|
||||||
.set(&format!("user:{}", user_id), &user, 3600)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
|
||||||
|
|
||||||
Ok(user)
|
Ok(user)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_user_by_username(&self, username: &str) -> Result<User, Error> {
|
pub async fn get_user_by_username(&self, username: &str) -> Result<User, sqlx::Error> {
|
||||||
// Check Redis cache first
|
let cache_key = format!("user:username:{}", username);
|
||||||
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user_by_username:{}", username)).await {
|
|
||||||
return Ok(cached_user);
|
if let Some(user) = self.cache.lock().await.get::<User>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? {
|
||||||
|
return Ok(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from PostgreSQL
|
let user = sqlx::query_as::<_, User>(
|
||||||
let row = sqlx::query!(
|
"SELECT id, username, email, hashed_password, roles, created_at, updated_at FROM users WHERE username = $1",
|
||||||
"SELECT id, username, email, hashed_password, roles FROM users WHERE username = $1",
|
|
||||||
username
|
|
||||||
)
|
)
|
||||||
|
.bind(username)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let user = User {
|
self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
id: row.id,
|
|
||||||
username: row.username,
|
|
||||||
email: row.email,
|
|
||||||
hashed_password: row.hashed_password,
|
|
||||||
roles: row.roles.unwrap_or_default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Store result in Redis
|
|
||||||
self.cache
|
|
||||||
.set(&format!("user_by_username:{}", username), &user, 3600)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
|
||||||
|
|
||||||
Ok(user)
|
Ok(user)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_user_by_email(&self, email: &str) -> Result<User, Error> {
|
pub async fn get_user_by_email(&self, email: &str) -> Result<User, sqlx::Error> {
|
||||||
// Check Redis cache first
|
let cache_key = format!("user:email:{}", email);
|
||||||
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user_by_email:{}", email)).await {
|
|
||||||
return Ok(cached_user);
|
if let Some(user) = self.cache.lock().await.get::<User>(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)? {
|
||||||
|
return Ok(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch from PostgreSQL
|
let user = sqlx::query_as::<_, User>(
|
||||||
let row = sqlx::query!(
|
"SELECT id, username, email, hashed_password, roles, created_at, updated_at FROM users WHERE email = $1",
|
||||||
"SELECT id, username, email, hashed_password, roles FROM users WHERE email = $1",
|
|
||||||
email
|
|
||||||
)
|
)
|
||||||
|
.bind(email)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let user = User {
|
self.cache.lock().await.set(&cache_key, &user, 300).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
id: row.id,
|
|
||||||
username: row.username,
|
|
||||||
email: row.email,
|
|
||||||
hashed_password: row.hashed_password,
|
|
||||||
roles: row.roles.unwrap_or_default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Store result in Redis
|
|
||||||
self.cache
|
|
||||||
.set(&format!("user_by_email:{}", email), &user, 3600)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
|
||||||
|
|
||||||
Ok(user)
|
Ok(user)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_user_email(&self, user_id: i32, new_email: &str) -> Result<(), Error> {
|
pub async fn create_user(&self, username: &str, email: &str, hashed_password: &str, roles: &[String]) -> Result<i32, sqlx::Error> {
|
||||||
sqlx::query!(
|
let row = sqlx::query(
|
||||||
r#"
|
"INSERT INTO users (username, email, hashed_password, roles, created_at, updated_at) \
|
||||||
UPDATE users
|
VALUES ($1, $2, $3, $4, NOW(), NOW()) RETURNING id",
|
||||||
SET email = $1, updated_at = CURRENT_TIMESTAMP
|
|
||||||
WHERE id = $2
|
|
||||||
"#,
|
|
||||||
new_email,
|
|
||||||
user_id
|
|
||||||
)
|
)
|
||||||
|
.bind(username)
|
||||||
|
.bind(email)
|
||||||
|
.bind(hashed_password)
|
||||||
|
.bind(roles)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(row.get("id"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_user_email(&self, user_id: i32, new_email: &str) -> Result<(), sqlx::Error> {
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE users SET email = $1, updated_at = NOW() WHERE id = $2",
|
||||||
|
)
|
||||||
|
.bind(new_email)
|
||||||
|
.bind(user_id)
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
let cache_key = format!("user:{}", user_id);
|
||||||
|
self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_user(&self, user_id: i32) -> Result<(), Error> {
|
pub async fn delete_user(&self, user_id: i32) -> Result<(), sqlx::Error> {
|
||||||
sqlx::query!(
|
sqlx::query("DELETE FROM users WHERE id = $1")
|
||||||
r#"
|
.bind(user_id)
|
||||||
DELETE FROM users
|
|
||||||
WHERE id = $1
|
|
||||||
"#,
|
|
||||||
user_id
|
|
||||||
)
|
|
||||||
.execute(&self.pool)
|
.execute(&self.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
let cache_key = format!("user:{}", user_id);
|
||||||
|
self.cache.lock().await.delete(&cache_key).await.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
43
proto/character_db_api.proto
Normal file
43
proto/character_db_api.proto
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package character_db_api;
|
||||||
|
|
||||||
|
service CharacterService {
|
||||||
|
rpc GetCharacter (CharacterRequest) returns (CharacterResponse);
|
||||||
|
rpc CreateCharacter (CreateCharacterRequest) returns (CharacterResponse);
|
||||||
|
rpc DeleteCharacter (DeleteCharacterRequest) returns (Empty);
|
||||||
|
}
|
||||||
|
|
||||||
|
message CharacterRequest {
|
||||||
|
int32 character_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateCharacterRequest {
|
||||||
|
int32 user_id = 1;
|
||||||
|
string name = 2;
|
||||||
|
string inventory = 3; // JSON serialized
|
||||||
|
string stats = 4; // JSON serialized
|
||||||
|
string looks = 5; // JSON serialized
|
||||||
|
string position = 6; // JSON serialized
|
||||||
|
}
|
||||||
|
|
||||||
|
message DeleteCharacterRequest {
|
||||||
|
int32 character_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CharacterResponse {
|
||||||
|
int32 id = 1;
|
||||||
|
int32 user_id = 2;
|
||||||
|
string name = 3;
|
||||||
|
int32 level = 4;
|
||||||
|
int64 experience = 5;
|
||||||
|
string inventory = 6;
|
||||||
|
string stats = 7;
|
||||||
|
string looks = 8;
|
||||||
|
string position = 9;
|
||||||
|
string created_at = 10;
|
||||||
|
string updated_at = 11;
|
||||||
|
bool is_active = 12;
|
||||||
|
}
|
||||||
|
|
||||||
|
message Empty {}
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
package database;
|
package user_db_api;
|
||||||
|
|
||||||
service DatabaseService {
|
service UserService {
|
||||||
rpc GetUser(GetUserRequest) returns (GetUserResponse);
|
rpc GetUser(GetUserRequest) returns (GetUserResponse);
|
||||||
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
|
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
|
||||||
rpc GetUserByUsername(GetUserByUsernameRequest) returns (GetUserResponse);
|
rpc GetUserByUsername(GetUserByUsernameRequest) returns (GetUserResponse);
|
||||||
@@ -35,8 +35,6 @@ pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result
|
|||||||
Ok(nodes)
|
Ok(nodes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Example of filtering services with a specific tag
|
|
||||||
async fn get_services_with_tag(
|
async fn get_services_with_tag(
|
||||||
service_name: &str,
|
service_name: &str,
|
||||||
tag: &str,
|
tag: &str,
|
||||||
|
|||||||
@@ -11,6 +11,6 @@ fn main() {
|
|||||||
tonic_build::configure()
|
tonic_build::configure()
|
||||||
.build_server(false) // Generate gRPC client code
|
.build_server(false) // Generate gRPC client code
|
||||||
.compile_well_known_types(true)
|
.compile_well_known_types(true)
|
||||||
.compile_protos(&["../proto/database.proto", "../proto/auth.proto"], &["../proto"])
|
.compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto"], &["../proto"])
|
||||||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user