- add: initial database and auth services
This commit is contained in:
21
database-service/Cargo.toml
Normal file
21
database-service/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "database-service"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.12.3"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.41.1", features = ["full"] }
|
||||
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
|
||||
tonic = "0.12.3"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
dotenv = "0.15"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
|
||||
prost = "0.13.3"
|
||||
prost-types = "0.13.3"
|
||||
redis = "0.27.5"
|
||||
deadpool-redis = "0.18.0"
|
||||
serde_json = "1.0.133"
|
||||
8
database-service/build.rs
Normal file
8
database-service/build.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
fn main() {
|
||||
tonic_build::configure()
|
||||
.build_server(true)
|
||||
.compile_well_known_types(true)
|
||||
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
|
||||
.compile_protos(&["../proto/database.proto"], &["../proto"])
|
||||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||
}
|
||||
24
database-service/src/db.rs
Normal file
24
database-service/src/db.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use crate::redis_cache::RedisCache;
|
||||
use crate::users::UsersService;
|
||||
|
||||
pub struct Database {
|
||||
pub users_service: UsersService, // User-specific functionality
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub async fn new(pool: PgPool, cache: Arc<RedisCache>) -> Self {
|
||||
let users_service = UsersService { pool, cache };
|
||||
|
||||
Self { users_service }
|
||||
}
|
||||
|
||||
pub async fn health_check(&self) -> bool {
|
||||
// Simple query to check database health
|
||||
sqlx::query("SELECT 1")
|
||||
.execute(&self.users_service.pool)
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
}
|
||||
63
database-service/src/grpc.rs
Normal file
63
database-service/src/grpc.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use crate::db::Database;
|
||||
use crate::database::{CreateUserRequest, CreateUserResponse, GetUserRequest, GetUserByUsernameRequest, GetUserResponse};
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
use crate::database::database_service_server::{DatabaseService};
|
||||
use tracing::{debug, error};
|
||||
|
||||
pub struct MyDatabaseService {
|
||||
pub db: Database, // Use the Database struct from users.rs
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl DatabaseService for MyDatabaseService {
|
||||
async fn get_user(
|
||||
&self,
|
||||
request: Request<GetUserRequest>,
|
||||
) -> Result<Response<GetUserResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
|
||||
let user = self.db.users_service.get_user_by_id(req.user_id)
|
||||
.await
|
||||
.map_err(|_| Status::not_found("User not found"))?;
|
||||
|
||||
Ok(Response::new(GetUserResponse {
|
||||
user_id: user.id,
|
||||
username: user.username,
|
||||
email: user.email,
|
||||
hashed_password: user.hashed_password,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get_user_by_username(
|
||||
&self,
|
||||
request: Request<GetUserByUsernameRequest>,
|
||||
) -> Result<Response<GetUserResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
|
||||
let user = self.db.users_service.get_user_by_username(&req.username)
|
||||
.await
|
||||
.map_err(|_| Status::not_found("User not found"))?;
|
||||
|
||||
Ok(Response::new(GetUserResponse {
|
||||
user_id: user.id,
|
||||
username: user.username,
|
||||
email: user.email,
|
||||
hashed_password: user.hashed_password,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn create_user(
|
||||
&self,
|
||||
request: Request<CreateUserRequest>,
|
||||
) -> Result<Response<CreateUserResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
|
||||
let user_id = self.db.users_service.create_user(&req.username, &req.email, &req.hashed_password)
|
||||
.await
|
||||
.map_err(|_| Status::internal("Failed to create user"))?;
|
||||
|
||||
// Return the newly created user ID
|
||||
Ok(Response::new(CreateUserResponse { user_id: user_id }))
|
||||
}
|
||||
}
|
||||
8
database-service/src/lib.rs
Normal file
8
database-service/src/lib.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
pub mod users;
|
||||
pub mod redis_cache;
|
||||
pub mod db;
|
||||
pub mod grpc;
|
||||
|
||||
pub mod database {
|
||||
tonic::include_proto!("database");
|
||||
}
|
||||
48
database-service/src/main.rs
Normal file
48
database-service/src/main.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use tonic::transport::Server;
|
||||
use database_service::db::Database;
|
||||
use database_service::redis_cache::RedisCache;
|
||||
use database::database_service_server::DatabaseServiceServer;
|
||||
use std::sync::Arc;
|
||||
use database_service::database;
|
||||
use database_service::grpc::MyDatabaseService;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv().ok();
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with_thread_names(true)
|
||||
.with_timer(tracing_subscriber::fmt::time::ChronoLocal::rfc_3339())
|
||||
.init();
|
||||
|
||||
let addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1:50052".to_string());
|
||||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
|
||||
let addr = addr.parse().expect("Invalid address");
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&database_url)
|
||||
.await
|
||||
.expect("Failed to create PostgreSQL connection pool");
|
||||
|
||||
let redis_cache = RedisCache::new(&redis_url);
|
||||
|
||||
let cache = Arc::new(redis_cache); // Share the cache instance between tasks
|
||||
let database_service = MyDatabaseService {
|
||||
db: Database::new(pool, cache).await,
|
||||
};
|
||||
|
||||
// Pass `shared_cache` into services as needed
|
||||
println!("Database Service running on {}", addr);
|
||||
Server::builder()
|
||||
.add_service(DatabaseServiceServer::new(database_service))
|
||||
.serve(addr)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
60
database-service/src/redis_cache.rs
Normal file
60
database-service/src/redis_cache.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use deadpool_redis::{Config, Pool, Runtime};
|
||||
use redis::{AsyncCommands, RedisError};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
pub struct RedisCache {
|
||||
pub pool: Pool,
|
||||
}
|
||||
|
||||
impl RedisCache {
|
||||
pub fn new(redis_url: &str) -> Self {
|
||||
let cfg = Config::from_url(redis_url);
|
||||
let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");
|
||||
RedisCache { pool }
|
||||
}
|
||||
|
||||
pub async fn set<T: Serialize + std::marker::Send + std::marker::Sync>(
|
||||
&self,
|
||||
key: &String,
|
||||
value: &T,
|
||||
ttl: u64,
|
||||
) -> Result<(), redis::RedisError> {
|
||||
let mut conn = self.pool.get().await
|
||||
.map_err(|err| {
|
||||
redis::RedisError::from((
|
||||
redis::ErrorKind::IoError,
|
||||
"Failed to get Redis connection",
|
||||
format!("{:?}", err),
|
||||
))
|
||||
})?;
|
||||
let serialized_value = serde_json::to_string(value)
|
||||
.map_err(|err| RedisError::from((
|
||||
redis::ErrorKind::IoError,
|
||||
"Serialization error",
|
||||
format!("Serialization error: {}", err),
|
||||
)))?;
|
||||
conn.set_ex(key, serialized_value, ttl).await
|
||||
}
|
||||
|
||||
pub async fn get<T: DeserializeOwned>(&self, key: &String) -> Result<Option<T>, redis::RedisError> {
|
||||
let mut conn = self.pool.get().await
|
||||
.map_err(|err| {
|
||||
redis::RedisError::from((
|
||||
redis::ErrorKind::IoError,
|
||||
"Failed to get Redis connection",
|
||||
format!("{:?}", err),
|
||||
))
|
||||
})?;
|
||||
if let Some(serialized_value) = conn.get::<_, Option<String>>(key).await? {
|
||||
let deserialized_value = serde_json::from_str(&serialized_value)
|
||||
.map_err(|err| RedisError::from((
|
||||
redis::ErrorKind::IoError,
|
||||
"Deserialization error",
|
||||
format!("Deserialization error: {}", err),
|
||||
)))?;
|
||||
Ok(Some(deserialized_value))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
123
database-service/src/users.rs
Normal file
123
database-service/src/users.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use sqlx::Error;
|
||||
use sqlx::PgPool;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use std::sync::Arc;
|
||||
use crate::redis_cache::RedisCache;
|
||||
use tracing::{debug, error};
|
||||
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct User {
|
||||
pub id: i32,
|
||||
pub username: String,
|
||||
pub email: String,
|
||||
pub hashed_password: String,
|
||||
}
|
||||
|
||||
pub struct UsersService {
|
||||
pub pool: PgPool,
|
||||
pub cache: Arc<RedisCache>, // Shared Redis cache
|
||||
}
|
||||
|
||||
|
||||
impl UsersService {
|
||||
pub async fn create_user(
|
||||
&self,
|
||||
username: &str,
|
||||
email: &str,
|
||||
hashed_password: &str,
|
||||
) -> Result<i32, Error> {
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO users (username, email, hashed_password)
|
||||
VALUES ($1, $2, $3)
|
||||
RETURNING id
|
||||
"#,
|
||||
username,
|
||||
email,
|
||||
hashed_password
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(result.id)
|
||||
}
|
||||
|
||||
pub async fn get_user_by_id(&self, user_id: i32) -> Result<User, sqlx::Error> {
|
||||
// Check Redis cache first
|
||||
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user:{}", user_id)).await {
|
||||
return Ok(cached_user);
|
||||
}
|
||||
|
||||
// Fetch from PostgreSQL
|
||||
let user = sqlx::query_as!(
|
||||
User,
|
||||
"SELECT id, username, email, hashed_password FROM users WHERE id = $1",
|
||||
user_id
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
// Store result in Redis
|
||||
self.cache
|
||||
.set(&format!("user:{}", user_id), &user, 3600)
|
||||
.await
|
||||
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
pub async fn get_user_by_username(&self, username: &str) -> Result<User, Error> {
|
||||
// Check Redis cache first
|
||||
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user_by_username:{}", username)).await {
|
||||
return Ok(cached_user);
|
||||
}
|
||||
|
||||
// Fetch from PostgreSQL
|
||||
let user = sqlx::query_as!(
|
||||
User,
|
||||
"SELECT id, username, email, hashed_password FROM users WHERE username = $1",
|
||||
username
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
// Store result in Redis
|
||||
self.cache
|
||||
.set(&format!("user_by_username:{}", username), &user, 3600)
|
||||
.await
|
||||
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
||||
|
||||
Ok(user)
|
||||
}
|
||||
|
||||
pub async fn update_user_email(&self, user_id: i32, new_email: &str) -> Result<(), Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
UPDATE users
|
||||
SET email = $1, updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = $2
|
||||
"#,
|
||||
new_email,
|
||||
user_id
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_user(&self, user_id: i32) -> Result<(), Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM users
|
||||
WHERE id = $1
|
||||
"#,
|
||||
user_id
|
||||
)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
30
database-service/tests/get_user.rs
Normal file
30
database-service/tests/get_user.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use sqlx::{PgPool, Executor};
|
||||
use tokio;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_user() {
|
||||
// Set up a temporary in-memory PostgreSQL database
|
||||
let pool = PgPool::connect("postgres://user:password@localhost/test_database").await.unwrap();
|
||||
|
||||
// Create the test table
|
||||
pool.execute(
|
||||
r#"
|
||||
CREATE TABLE users (
|
||||
user_id TEXT PRIMARY KEY,
|
||||
username TEXT NOT NULL,
|
||||
email TEXT NOT NULL,
|
||||
hashed_password TEXT NOT NULL
|
||||
);
|
||||
INSERT INTO users (user_id, username, email, hashed_password)
|
||||
VALUES ('123', 'test_user', 'test@example.com', 'hashed_password_example');
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Test the `get_user` function
|
||||
let user = get_user(&pool, "123").await.unwrap();
|
||||
assert_eq!(user.user_id, "123");
|
||||
assert_eq!(user.username, "test_user");
|
||||
assert_eq!(user.email, "test@example.com");
|
||||
}
|
||||
25
database-service/tests/grpc_get_user.rs
Normal file
25
database-service/tests/grpc_get_user.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
use tonic::{Request, Response};
|
||||
use database_service::database::database_service_server::DatabaseService;
|
||||
use database_service::database::GetUserRequest;
|
||||
use database_service::MyDatabaseService;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grpc_get_user() {
|
||||
let pool = setup_test_pool().await; // Set up your test pool
|
||||
let cache = setup_test_cache().await; // Set up mock Redis cache
|
||||
|
||||
let service = MyDatabaseService { pool, cache };
|
||||
|
||||
// Create a mock gRPC request
|
||||
let request = Request::new(GetUserRequest {
|
||||
user_id: 123,
|
||||
});
|
||||
|
||||
// Call the service
|
||||
let response = service.get_user(request).await.unwrap().into_inner();
|
||||
|
||||
// Validate the response
|
||||
assert_eq!(response.user_id, 123);
|
||||
assert_eq!(response.username, "test_user");
|
||||
assert_eq!(response.email, "test@example.com");
|
||||
}
|
||||
14
database-service/tests/integration.rs
Normal file
14
database-service/tests/integration.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use dotenv::dotenv;
|
||||
use database_service::db::Database;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_health_check() {
|
||||
dotenv().ok();
|
||||
let database_url = std::env::var("DATABASE_URL").unwrap();
|
||||
let db = Database::new(&database_url).await;
|
||||
assert!(db.health_check().await);
|
||||
}
|
||||
}
|
||||
19
database-service/tests/redis_cache.rs
Normal file
19
database-service/tests/redis_cache.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
use deadpool_redis::{Config, Pool, Runtime};
|
||||
use redis::AsyncCommands;
|
||||
use database_service::redis_cache::RedisCache;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_redis_cache() {
|
||||
let redis_url = "redis://127.0.0.1:6379";
|
||||
let cache = RedisCache::new(redis_url);
|
||||
|
||||
let key = &"test_key".to_string();
|
||||
let value = "test_value";
|
||||
|
||||
// Test setting a value
|
||||
cache.set(key, &value, 10).await.unwrap();
|
||||
|
||||
// Test getting the value
|
||||
let cached_value: Option<String> = cache.get(key).await.unwrap();
|
||||
assert_eq!(cached_value, Some("test_value".to_string()));
|
||||
}
|
||||
Reference in New Issue
Block a user