- removed: api-service
- removed: session-service - updated: moved health check out of consul registration - updated: get service info to pull the service from the default namespace for the service account - updated: the rest of the services to be able to handle the new database tables
This commit is contained in:
@@ -7,6 +7,7 @@ fn main() {
|
||||
&[
|
||||
"../proto/user_db_api.proto",
|
||||
"../proto/character_db_api.proto",
|
||||
"../proto/session_db_api.proto",
|
||||
],
|
||||
&["../proto"],
|
||||
)
|
||||
|
||||
@@ -49,9 +49,9 @@ impl CharacterRepository {
|
||||
|
||||
// Fetch from database
|
||||
let character = sqlx::query_as::<_, Character>(
|
||||
"SELECT id, userId as user_id, name, money, inventory, stats, skills, looks, position, \
|
||||
createdAt as created_at, updatedAt as updated_at, extract(epoch from (deletedAt - now()))::BIGINT as deleted_at, isActive as is_active \
|
||||
FROM character WHERE id = $1 AND isActive = true",
|
||||
"SELECT id, \"userId\" as user_id, name, money, inventory, stats, skills, looks, position, \
|
||||
\"createdAt\" as created_at, \"updatedAt\" as updated_at, extract(epoch from (\"deletedAt\" - now()))::BIGINT as deleted_at, \"isActive\" as is_active \
|
||||
FROM character WHERE id = $1 AND \"isActive\" = true",
|
||||
)
|
||||
.bind(character_id)
|
||||
.fetch_one(&self.pool)
|
||||
@@ -78,7 +78,7 @@ impl CharacterRepository {
|
||||
position: serde_json::Value,
|
||||
) -> Result<i32, sqlx::Error> {
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO character (userId, name, inventory, stats, skills, looks, position, createdAt, updatedAt, isActive) \
|
||||
"INSERT INTO character (\"userId\", name, inventory, stats, skills, looks, position, \"createdAt\", \"updatedAt\", \"isActive\") \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW(), true) RETURNING id",
|
||||
)
|
||||
.bind(&user_id)
|
||||
@@ -107,9 +107,9 @@ impl CharacterRepository {
|
||||
character_id: i32,
|
||||
delete_type: i32,
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
let mut query = "UPDATE character SET updatedAt = NOW(), deletedAt = NOW() + '24 hours' WHERE id = $1 RETURNING userId, extract(epoch from (deletedAt - now()))::BIGINT as deletedAt";
|
||||
let mut query = "UPDATE character SET \"updatedAt\" = NOW(), \"deletedAt\" = NOW() + '24 hours' WHERE id = $1 RETURNING \"userId\", extract(epoch from (\"deletedAt\" - now()))::BIGINT as deleted_at";
|
||||
if 0 == delete_type {
|
||||
query = "UPDATE character SET updatedAt = NOW(), deletedAt = null WHERE id = $1 RETURNING userId, 0::BIGINT as deletedAt";
|
||||
query = "UPDATE character SET \"updatedAt\" = NOW(), \"deletedAt\" = null WHERE id = $1 RETURNING \"userId\", 0::BIGINT as deleted_at";
|
||||
}
|
||||
let result = sqlx::query(query)
|
||||
.bind(character_id)
|
||||
@@ -136,7 +136,7 @@ impl CharacterRepository {
|
||||
|
||||
pub async fn get_characters_by_user(
|
||||
&self,
|
||||
user_id: i32,
|
||||
user_id: String,
|
||||
) -> Result<Vec<Character>, sqlx::Error> {
|
||||
let cache_key = format!("character:user:{}", user_id);
|
||||
|
||||
@@ -154,7 +154,7 @@ impl CharacterRepository {
|
||||
|
||||
// Fetch from database
|
||||
let characters = sqlx::query_as::<_, Character>(
|
||||
"SELECT id, userId as user_id, name, money, inventory, stats, skills, looks, position, createdAt as created_at, updatedAt as updated_at, extract(epoch from (deletedAt - now()))::BIGINT as deleted_at, isActive as is_active FROM character WHERE userId = $1 AND isActive = true",
|
||||
"SELECT id, \"userId\" as user_id, name, money, inventory, stats, skills, looks, position, \"createdAt\" as created_at, \"updatedAt\" as updated_at, extract(epoch from (\"deletedAt\" - now()))::BIGINT as deleted_at, \"isActive\" as is_active FROM character WHERE \"userId\" = $1 AND \"isActive\" = true",
|
||||
)
|
||||
.bind(user_id)
|
||||
.fetch_all(&self.pool)
|
||||
|
||||
@@ -4,20 +4,24 @@ use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use utils::redis_cache::RedisCache;
|
||||
use crate::sessions::SessionRepository;
|
||||
|
||||
pub struct Database {
|
||||
pub user_repo: Arc<UserRepository>,
|
||||
pub character_repo: Arc<CharacterRepository>,
|
||||
pub session_repo: Arc<SessionRepository>,
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub fn new(pool: PgPool, redis_cache: Arc<Mutex<RedisCache>>) -> Self {
|
||||
let user_repo = Arc::new(UserRepository::new(pool.clone(), redis_cache.clone()));
|
||||
let character_repo = Arc::new(CharacterRepository::new(pool.clone(), redis_cache));
|
||||
let character_repo = Arc::new(CharacterRepository::new(pool.clone(), redis_cache.clone()));
|
||||
let session_repo = Arc::new(SessionRepository::new(pool.clone(), redis_cache));
|
||||
|
||||
Self {
|
||||
user_repo,
|
||||
character_repo,
|
||||
session_repo
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
mod character_service;
|
||||
pub mod database_service;
|
||||
pub mod user_service;
|
||||
mod character_service;
|
||||
mod user_service;
|
||||
mod session_service;
|
||||
|
||||
tonic::include_proto!("user_db_api");
|
||||
tonic::include_proto!("character_db_api");
|
||||
tonic::include_proto!("session_db_api");
|
||||
|
||||
25
database-service/src/grpc/session_service.rs
Normal file
25
database-service/src/grpc/session_service.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
use crate::grpc::database_service::MyDatabaseService;
|
||||
use crate::grpc::session_service_server::SessionService;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::debug;
|
||||
use crate::grpc::{GetSessionRequest, GetSessionResponse};
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl SessionService for MyDatabaseService {
|
||||
async fn get_session(
|
||||
&self,
|
||||
request: Request<GetSessionRequest>,
|
||||
) -> Result<Response<GetSessionResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("get_session: {:?}", req);
|
||||
|
||||
let session = self.db.session_repo.get_session(&req.session_id).await
|
||||
.map_err(|_| Status::not_found("Session not found"))?;
|
||||
|
||||
debug!("session: {:?}", session);
|
||||
Ok(Response::new(GetSessionResponse {
|
||||
session_id: session.id,
|
||||
user_id: session.user_id,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -2,3 +2,4 @@ pub mod characters;
|
||||
pub mod db;
|
||||
pub mod grpc;
|
||||
pub mod users;
|
||||
pub mod sessions;
|
||||
|
||||
@@ -12,6 +12,7 @@ use tokio::sync::Mutex;
|
||||
use tonic::transport::Server;
|
||||
use tracing::{info, Level};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use database_service::grpc::session_service_server::SessionServiceServer;
|
||||
use utils::logging;
|
||||
use utils::redis_cache::RedisCache;
|
||||
|
||||
@@ -19,7 +20,7 @@ use utils::redis_cache::RedisCache;
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv().ok();
|
||||
let app_name = env!("CARGO_PKG_NAME");
|
||||
logging::setup_logging(app_name);
|
||||
logging::setup_logging(app_name, &["database_service"]);
|
||||
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
|
||||
@@ -45,7 +46,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Server::builder()
|
||||
.add_service(health_service)
|
||||
.add_service(UserServiceServer::new(my_service.clone()))
|
||||
.add_service(CharacterDbServiceServer::new(my_service))
|
||||
.add_service(CharacterDbServiceServer::new(my_service.clone()))
|
||||
.add_service(SessionServiceServer::new(my_service))
|
||||
.serve(address),
|
||||
);
|
||||
info!("Database Service running on {}", address);
|
||||
|
||||
49
database-service/src/sessions.rs
Normal file
49
database-service/src/sessions.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{FromRow, Row};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use utils::redis_cache::{Cache, RedisCache};
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Debug, FromRow, Serialize, Deserialize)]
|
||||
pub struct Session {
|
||||
pub id: String,
|
||||
pub user_id: String,
|
||||
}
|
||||
|
||||
pub struct SessionRepository {
|
||||
pool: sqlx::PgPool,
|
||||
cache: Arc<Mutex<RedisCache>>,
|
||||
}
|
||||
|
||||
impl SessionRepository {
|
||||
pub fn new(pool: sqlx::PgPool, cache: Arc<Mutex<RedisCache>>) -> Self {
|
||||
Self { pool, cache }
|
||||
}
|
||||
|
||||
pub async fn get_session(&self, session_id: &str) -> Result<Session, sqlx::Error> {
|
||||
let cache_key = format!("session:{}", session_id);
|
||||
|
||||
if let Some(session) = self.cache.lock().await
|
||||
.get::<Session>(&cache_key).await
|
||||
.map_err(|_| sqlx::Error::RowNotFound)?
|
||||
{
|
||||
return Ok(session);
|
||||
}
|
||||
|
||||
// Fetch from database
|
||||
let session = sqlx::query_as::<_, Session>(
|
||||
"SELECT id, \"userId\" as user_id FROM session WHERE id = $1",
|
||||
)
|
||||
.bind(session_id)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
debug!("session: {:?}", session);
|
||||
|
||||
self.cache.lock().await
|
||||
.set(&cache_key, &session, 0).await
|
||||
.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||
Ok(session)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user