Compare commits
6 Commits
6a35b5b373
...
969a85d899
| Author | SHA256 | Date | |
|---|---|---|---|
| 969a85d899 | |||
| 8499655fe9 | |||
| ab7728837c | |||
| 113ab5a4ac | |||
| 3fc6c6252c | |||
| 3ff22c9a5b |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -4,9 +4,13 @@
|
|||||||
debug/
|
debug/
|
||||||
target/
|
target/
|
||||||
|
|
||||||
|
auth-service/.env
|
||||||
|
database-service/.env
|
||||||
|
|
||||||
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
|
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
|
||||||
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
|
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
|
||||||
Cargo.lock
|
auth-service/Cargo.lock
|
||||||
|
database-service/Cargo.lock
|
||||||
|
|
||||||
# These are backup files generated by rustfmt
|
# These are backup files generated by rustfmt
|
||||||
**/*.rs.bk
|
**/*.rs.bk
|
||||||
|
|||||||
30
auth-service/Cargo.toml
Normal file
30
auth-service/Cargo.toml
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
[package]
|
||||||
|
name = "auth-service"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
mocks = []
|
||||||
|
consul = []
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.41.1", features = ["full"] }
|
||||||
|
tonic = "0.12.3"
|
||||||
|
jsonwebtoken = "9.3.0"
|
||||||
|
argon2 = "0.5.3"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
dotenv = "0.15"
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
|
||||||
|
prost = "0.13.3"
|
||||||
|
prost-types = "0.13.3"
|
||||||
|
chrono = { version = "0.4.38", features = ["serde"] }
|
||||||
|
async-trait = "0.1.83"
|
||||||
|
mockall = "0.13.1"
|
||||||
|
rand = "0.8.5"
|
||||||
|
warp = "0.3.7"
|
||||||
|
reqwest = { version = "0.12.9", features = ["json"] }
|
||||||
|
consul = "0.4.2"
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
tonic-build = "0.12.3"
|
||||||
16
auth-service/build.rs
Normal file
16
auth-service/build.rs
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
fn main() {
|
||||||
|
// gRPC Server code
|
||||||
|
tonic_build::configure()
|
||||||
|
.build_server(true) // Generate gRPC server code
|
||||||
|
.compile_well_known_types(true)
|
||||||
|
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
|
||||||
|
.compile_protos(&["../proto/auth.proto"], &["../proto"])
|
||||||
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
|
|
||||||
|
// gRPC Client code
|
||||||
|
tonic_build::configure()
|
||||||
|
.build_server(false) // Generate gRPC client code
|
||||||
|
.compile_well_known_types(true)
|
||||||
|
.compile_protos(&["../proto/database.proto"], &["../proto"])
|
||||||
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
|
}
|
||||||
59
auth-service/src/consul_registration.rs
Normal file
59
auth-service/src/consul_registration.rs
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
use reqwest::Client;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct ConsulRegistration {
|
||||||
|
name: String,
|
||||||
|
address: String,
|
||||||
|
port: u16,
|
||||||
|
check: ConsulCheck,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct ConsulCheck {
|
||||||
|
http: String,
|
||||||
|
interval: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn register_service(
|
||||||
|
consul_url: &str,
|
||||||
|
service_name: &str,
|
||||||
|
service_address: &str,
|
||||||
|
service_port: u16,
|
||||||
|
health_check_url: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let registration = ConsulRegistration {
|
||||||
|
name: service_name.to_string(),
|
||||||
|
address: service_address.to_string(),
|
||||||
|
port: service_port,
|
||||||
|
check: ConsulCheck {
|
||||||
|
http: health_check_url.to_string(),
|
||||||
|
interval: "10s".to_string(), // Health check interval
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = Client::new();
|
||||||
|
let consul_register_url = format!("{}/v1/agent/service/register", consul_url);
|
||||||
|
|
||||||
|
client
|
||||||
|
.put(&consul_register_url)
|
||||||
|
.json(®istration)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?; // Ensure response is successful
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn deregister_service(consul_url: &str, service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let client = Client::new();
|
||||||
|
let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_name);
|
||||||
|
|
||||||
|
client
|
||||||
|
.put(&consul_deregister_url)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?; // Ensure response is successful
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
109
auth-service/src/database_client.rs
Normal file
109
auth-service/src/database_client.rs
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
use crate::database::{database_service_client::DatabaseServiceClient, CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::error::Error;
|
||||||
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait DatabaseClientTrait: Sized {
|
||||||
|
async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_userid(&mut self, user_id: i32) -> Result<GetUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_username(&mut self, user_id: &str) -> Result<GetUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_email(&mut self, email: &str) -> Result<GetUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn create_user(&mut self, username: &str, email: &str, password: &str) -> Result<CreateUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn store_password_reset(&mut self, email: &str, reset_token: &str, expires_at: DateTime<Utc>) -> Result<(), Box<dyn std::error::Error>>;
|
||||||
|
async fn get_password_reset(&self, reset_token: &str) -> Result<Option<PasswordReset>, Box<dyn std::error::Error>>;
|
||||||
|
async fn delete_password_reset(&self, reset_token: &str) -> Result<(), Box<dyn std::error::Error>>;
|
||||||
|
async fn update_user_password(&self, email: &str, hashed_password: &str) -> Result<(), Box<dyn std::error::Error>>;
|
||||||
|
}
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DatabaseClient {
|
||||||
|
client: DatabaseServiceClient<Channel>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PasswordReset {
|
||||||
|
pub email: String,
|
||||||
|
pub reset_token: String,
|
||||||
|
pub expires_at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DatabaseClientTrait for DatabaseClient {
|
||||||
|
async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
|
let client = DatabaseServiceClient::connect(endpoint.to_string()).await?;
|
||||||
|
Ok(Self { client })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_by_userid(
|
||||||
|
&mut self,
|
||||||
|
user_id: i32,
|
||||||
|
) -> Result<GetUserResponse, Box<dyn std::error::Error>> {
|
||||||
|
let request = tonic::Request::new(GetUserRequest {
|
||||||
|
user_id,
|
||||||
|
});
|
||||||
|
let response = self.client.get_user(request).await?;
|
||||||
|
Ok(response.into_inner())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_by_username(
|
||||||
|
&mut self,
|
||||||
|
username: &str,
|
||||||
|
) -> Result<GetUserResponse, Box<dyn std::error::Error>> {
|
||||||
|
let request = tonic::Request::new(GetUserByUsernameRequest {
|
||||||
|
username: username.to_string(),
|
||||||
|
});
|
||||||
|
let response = self.client.get_user_by_username(request).await?;
|
||||||
|
Ok(response.into_inner())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_by_email(&mut self, email: &str) -> Result<GetUserResponse, Box<dyn Error>> {
|
||||||
|
let request = tonic::Request::new(GetUserByEmailRequest {
|
||||||
|
email: email.to_string(),
|
||||||
|
});
|
||||||
|
let response = self.client.get_user_by_email(request).await?;
|
||||||
|
Ok(response.into_inner())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_user(&mut self, username: &str, email: &str, password: &str) -> Result<CreateUserResponse, Box<dyn Error>> {
|
||||||
|
let request = tonic::Request::new(CreateUserRequest {
|
||||||
|
username: username.to_string(),
|
||||||
|
email: email.to_string(),
|
||||||
|
hashed_password: password.to_string(),
|
||||||
|
});
|
||||||
|
let response = self.client.create_user(request).await?;
|
||||||
|
Ok(response.into_inner())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn store_password_reset(
|
||||||
|
&mut self,
|
||||||
|
email: &str,
|
||||||
|
reset_token: &str,
|
||||||
|
expires_at: DateTime<Utc>,
|
||||||
|
) -> Result<(), Box<dyn Error>> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_password_reset(
|
||||||
|
&self,
|
||||||
|
reset_token: &str,
|
||||||
|
) -> Result<Option<PasswordReset>, Box<dyn std::error::Error>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_password_reset(
|
||||||
|
&self,
|
||||||
|
reset_token: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_user_password(
|
||||||
|
&self,
|
||||||
|
email: &str,
|
||||||
|
hashed_password: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
151
auth-service/src/grpc.rs
Normal file
151
auth-service/src/grpc.rs
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
use crate::auth::auth_service_server::AuthService;
|
||||||
|
use crate::auth::{LoginRequest, LoginResponse, PasswordResetRequest, PasswordResetResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest, ResetPasswordResponse, ValidateTokenRequest, ValidateTokenResponse};
|
||||||
|
use crate::database_client::{DatabaseClientTrait};
|
||||||
|
use crate::jwt::{generate_token, validate_token};
|
||||||
|
use crate::users::{hash_password, verify_user};
|
||||||
|
use chrono::{Duration, Utc};
|
||||||
|
use rand::Rng;
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
pub struct MyAuthService<T: DatabaseClientTrait + Clone> {
|
||||||
|
pub db_client: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl<T: DatabaseClientTrait + Send + Sync + Clone + 'static> AuthService for MyAuthService<T> {
|
||||||
|
async fn login(
|
||||||
|
&self,
|
||||||
|
request: Request<LoginRequest>,
|
||||||
|
) -> Result<Response<LoginResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
info!("Login attempt for username: {}", req.username);
|
||||||
|
|
||||||
|
if let Some(user_id) = verify_user(self.db_client.clone(), &req.username, &req.password).await {
|
||||||
|
let token = generate_token(&user_id, vec!["user".to_string()])
|
||||||
|
.map_err(|_| Status::internal("Token generation failed"))?;
|
||||||
|
|
||||||
|
info!("Login successful for username: {}", req.username);
|
||||||
|
Ok(Response::new(LoginResponse { token, user_id }))
|
||||||
|
} else {
|
||||||
|
warn!("Invalid login attempt for username: {}", req.username);
|
||||||
|
Err(Status::unauthenticated("Invalid credentials"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn validate_token(
|
||||||
|
&self,
|
||||||
|
request: Request<ValidateTokenRequest>,
|
||||||
|
) -> Result<Response<ValidateTokenResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
match validate_token(&req.token) {
|
||||||
|
Ok(user_id) => Ok(Response::new(ValidateTokenResponse {
|
||||||
|
valid: true,
|
||||||
|
user_id,
|
||||||
|
})),
|
||||||
|
Err(_) => Ok(Response::new(ValidateTokenResponse {
|
||||||
|
valid: false,
|
||||||
|
user_id: "".to_string(),
|
||||||
|
})),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn register(
|
||||||
|
&self,
|
||||||
|
request: Request<RegisterRequest>,
|
||||||
|
) -> Result<Response<RegisterResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
// Hash the password
|
||||||
|
let hashed_password = hash_password(&req.password);
|
||||||
|
|
||||||
|
// Create user in the database
|
||||||
|
let user = self.db_client.clone().create_user(&req.username, &req.email, &hashed_password)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Status::internal(format!("Database error: {}", e)))?;
|
||||||
|
|
||||||
|
Ok(Response::new(RegisterResponse { user_id: user.user_id }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn request_password_reset(
|
||||||
|
&self,
|
||||||
|
request: Request<PasswordResetRequest>,
|
||||||
|
) -> Result<Response<PasswordResetResponse>, Status> {
|
||||||
|
let email = request.into_inner().email;
|
||||||
|
|
||||||
|
let user = self.db_client.clone().get_user_by_email(&email).await;
|
||||||
|
|
||||||
|
// Check if the email exists
|
||||||
|
if user.ok().is_some() {
|
||||||
|
// Generate a reset token
|
||||||
|
let reset_token: String = rand::thread_rng()
|
||||||
|
.sample_iter(&rand::distributions::Alphanumeric)
|
||||||
|
.take(32)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Set token expiration (e.g., 1 hour)
|
||||||
|
let expires_at = Utc::now() + Duration::hours(1);
|
||||||
|
|
||||||
|
// Store the reset token in the database
|
||||||
|
self.db_client.clone()
|
||||||
|
.store_password_reset(&email, &reset_token, expires_at)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Status::internal(format!("Database error: {}", e)))?;
|
||||||
|
|
||||||
|
// Send the reset email
|
||||||
|
// send_email(&email, "Password Reset Request", &format!(
|
||||||
|
// "Click the link to reset your password: https://example.com/reset?token={}",
|
||||||
|
// reset_token
|
||||||
|
// ))
|
||||||
|
// .map_err(|e| Status::internal(format!("Email error: {}", e)))?;
|
||||||
|
|
||||||
|
Ok(Response::new(PasswordResetResponse {
|
||||||
|
message: "Password reset email sent".to_string(),
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
// Respond with a generic message to avoid information leaks
|
||||||
|
Ok(Response::new(PasswordResetResponse {
|
||||||
|
message: "If the email exists, a reset link has been sent.".to_string(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn reset_password(
|
||||||
|
&self,
|
||||||
|
request: Request<ResetPasswordRequest>,
|
||||||
|
) -> Result<Response<ResetPasswordResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
// Validate the reset token
|
||||||
|
if let Some(password_reset) = self.db_client.clone().get_password_reset(&req.reset_token).await
|
||||||
|
.map_err(|e| Status::internal(format!("Database error: {}", e)))? {
|
||||||
|
if password_reset.expires_at < Utc::now() {
|
||||||
|
return Err(Status::unauthenticated("Token expired"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash the new password
|
||||||
|
let hashed_password = hash_password(&req.new_password);
|
||||||
|
|
||||||
|
// Update the user's password
|
||||||
|
self.db_client
|
||||||
|
.update_user_password(&password_reset.email, &hashed_password)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Status::internal(format!("Database error: {}", e)))?;
|
||||||
|
|
||||||
|
// Delete the reset token
|
||||||
|
self.db_client
|
||||||
|
.delete_password_reset(&req.reset_token)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Status::internal(format!("Database error: {}", e)))?;
|
||||||
|
|
||||||
|
Ok(Response::new(ResetPasswordResponse {
|
||||||
|
message: "Password successfully reset".to_string(),
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
Err(Status::unauthenticated("Invalid reset token"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
36
auth-service/src/jwt.rs
Normal file
36
auth-service/src/jwt.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
struct Claims {
|
||||||
|
sub: String, // Subject (user ID)
|
||||||
|
roles: Vec<String>, // Roles/permissions
|
||||||
|
exp: usize, // Expiration time
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn generate_token(user_id: &str, roles: Vec<String>) -> Result<String, jsonwebtoken::errors::Error> {
|
||||||
|
let secret = env::var("JWT_SECRET").expect("JWT_SECRET must be set");
|
||||||
|
let expiration = chrono::Utc::now()
|
||||||
|
.checked_add_signed(chrono::Duration::days(1))
|
||||||
|
.expect("valid timestamp")
|
||||||
|
.timestamp() as usize;
|
||||||
|
|
||||||
|
let claims = Claims {
|
||||||
|
sub: user_id.to_owned(),
|
||||||
|
roles,
|
||||||
|
exp: expiration,
|
||||||
|
};
|
||||||
|
|
||||||
|
encode(&Header::default(), &claims, &EncodingKey::from_secret(secret.as_ref()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate_token(token: &str) -> Result<String, jsonwebtoken::errors::Error> {
|
||||||
|
let secret = env::var("JWT_SECRET").expect("JWT_SECRET must be set");
|
||||||
|
let token_data = decode::<Claims>(
|
||||||
|
token,
|
||||||
|
&DecodingKey::from_secret(secret.as_ref()),
|
||||||
|
&Validation::default(),
|
||||||
|
)?;
|
||||||
|
Ok(token_data.claims.sub)
|
||||||
|
}
|
||||||
14
auth-service/src/lib.rs
Normal file
14
auth-service/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
pub mod grpc;
|
||||||
|
pub mod jwt;
|
||||||
|
pub mod database_client;
|
||||||
|
|
||||||
|
pub mod users;
|
||||||
|
pub mod auth {
|
||||||
|
tonic::include_proto!("auth"); // Path matches the package name in auth.proto
|
||||||
|
}
|
||||||
|
pub mod database {
|
||||||
|
tonic::include_proto!("database"); // Matches package name in database.proto
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod mocks;
|
||||||
78
auth-service/src/main.rs
Normal file
78
auth-service/src/main.rs
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
use crate::service_discovery::get_service_address;
|
||||||
|
use auth_service::auth::auth_service_server::AuthServiceServer;
|
||||||
|
use auth_service::database_client::DatabaseClient;
|
||||||
|
use auth_service::database_client::DatabaseClientTrait;
|
||||||
|
use auth_service::grpc::MyAuthService;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use std::env;
|
||||||
|
use std::net::ToSocketAddrs;
|
||||||
|
use tonic::transport::Server;
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
mod consul_registration;
|
||||||
|
mod service_discovery;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// Load environment variables from .env
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
.with_thread_names(true)
|
||||||
|
.with_timer(tracing_subscriber::fmt::time::ChronoLocal::rfc_3339())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
// Set the gRPC server address
|
||||||
|
let addr = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||||
|
let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string());
|
||||||
|
|
||||||
|
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||||
|
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string());
|
||||||
|
let service_address = addr.as_str();
|
||||||
|
let service_port = port.clone();
|
||||||
|
let health_check_url = format!("http://{}:{}/health", service_address, service_port);
|
||||||
|
let health_check_endpoint_addr = format!("{}:8081", service_address);
|
||||||
|
|
||||||
|
// Register service with Consul
|
||||||
|
consul_registration::register_service(
|
||||||
|
&consul_url,
|
||||||
|
service_name.as_str(),
|
||||||
|
service_address,
|
||||||
|
service_port.parse().unwrap_or(50052),
|
||||||
|
&health_check_url,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Start health-check endpoint
|
||||||
|
let health_route = warp::path!("health")
|
||||||
|
.map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK));
|
||||||
|
|
||||||
|
tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap()));
|
||||||
|
|
||||||
|
let db_address = get_service_address(&consul_url, "database-service").await?;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::signal::ctrl_c().await.unwrap();
|
||||||
|
consul_registration::deregister_service(&consul_url, service_name.as_str()).await.expect("");
|
||||||
|
});
|
||||||
|
|
||||||
|
let db_url = format!("http://{}:{}", db_address.Address, db_address.Port);
|
||||||
|
let database_client = DatabaseClient::connect(&db_url).await?;
|
||||||
|
|
||||||
|
let full_addr = format!("{}:{}", &addr, port);
|
||||||
|
let address = full_addr.parse().expect("Invalid address");
|
||||||
|
let auth_service = MyAuthService {
|
||||||
|
db_client: database_client,
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("Authentication Service running on {}", addr);
|
||||||
|
|
||||||
|
// Start the gRPC server
|
||||||
|
Server::builder()
|
||||||
|
.add_service(AuthServiceServer::new(auth_service))
|
||||||
|
.serve(address)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
30
auth-service/src/mocks/database_client_mock.rs
Normal file
30
auth-service/src/mocks/database_client_mock.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use crate::database::{CreateUserResponse, GetUserResponse};
|
||||||
|
use crate::database_client::{DatabaseClientTrait, PasswordReset};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use mockall::{mock, predicate::*};
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mock! {
|
||||||
|
pub DatabaseClient {}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DatabaseClientTrait for DatabaseClient {
|
||||||
|
async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_userid(&mut self, user_id: i32) -> Result<GetUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_username(&mut self, user_id: &str) -> Result<GetUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn get_user_by_email(&mut self, email: &str) -> Result<GetUserResponse, Box<dyn Error>>;
|
||||||
|
async fn create_user(&mut self, username: &str, email: &str, password: &str) -> Result<CreateUserResponse, Box<dyn std::error::Error>>;
|
||||||
|
async fn store_password_reset(&mut self, email: &str, reset_token: &str, expires_at: DateTime<Utc>) -> Result<(), Box<dyn Error>>;
|
||||||
|
async fn get_password_reset(&self, reset_token: &str) -> Result<Option<PasswordReset>, Box<dyn Error>>;
|
||||||
|
async fn delete_password_reset(&self, reset_token: &str) -> Result<(), Box<dyn Error>>;
|
||||||
|
async fn update_user_password(&self, email: &str, hashed_password: &str) -> Result<(), Box<dyn Error>>;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for MockDatabaseClient {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
MockDatabaseClient::new() // Create a new mock instance
|
||||||
|
}
|
||||||
|
}
|
||||||
2
auth-service/src/mocks/mod.rs
Normal file
2
auth-service/src/mocks/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
#[cfg(test)]
|
||||||
|
pub mod database_client_mock;
|
||||||
48
auth-service/src/service_discovery.rs
Normal file
48
auth-service/src/service_discovery.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct Address {
|
||||||
|
Address: String,
|
||||||
|
Port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct TaggedAddresses {
|
||||||
|
lan_ipv4: Address,
|
||||||
|
wan_ipv4: Address,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct Weights {
|
||||||
|
Passing: u8,
|
||||||
|
Warning: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct Service {
|
||||||
|
pub(crate) ID: String,
|
||||||
|
pub(crate) Service: String,
|
||||||
|
pub(crate) Tags: Vec<String>,
|
||||||
|
pub(crate) Port: u16,
|
||||||
|
pub(crate) Address: String,
|
||||||
|
pub(crate) TaggedAddresses: TaggedAddresses,
|
||||||
|
pub(crate) Weights: Weights,
|
||||||
|
pub(crate) EnableTagOverride: bool,
|
||||||
|
pub(crate) ContentHash: String,
|
||||||
|
pub(crate) Datacenter: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result<(Service), Box<dyn std::error::Error>> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let consul_service_url = format!("{}/v1/agent/service/{}", consul_url, service_name);
|
||||||
|
|
||||||
|
let response = client
|
||||||
|
.get(&consul_service_url)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?
|
||||||
|
.json::<Service>()
|
||||||
|
.await?; // Ensure response is successful
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
32
auth-service/src/users.rs
Normal file
32
auth-service/src/users.rs
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
use crate::database_client::DatabaseClientTrait;
|
||||||
|
|
||||||
|
use argon2::{
|
||||||
|
password_hash::{
|
||||||
|
rand_core::OsRng,
|
||||||
|
PasswordHash, PasswordHasher, PasswordVerifier, SaltString
|
||||||
|
},
|
||||||
|
Argon2
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn hash_password(password: &str) -> String {
|
||||||
|
let salt = SaltString::generate(&mut OsRng);
|
||||||
|
let argon2 = Argon2::default();
|
||||||
|
argon2.hash_password(password.as_ref(), &salt).unwrap().to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn verify_password(password: &str, hash: &str) -> bool {
|
||||||
|
let parsed_hash = PasswordHash::new(&hash).unwrap();
|
||||||
|
Argon2::default().verify_password(password.as_bytes(), &parsed_hash).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn verify_user<T: DatabaseClientTrait>(mut db_client: T,
|
||||||
|
username: &str, password: &str) -> Option<String> {
|
||||||
|
// Placeholder: Replace with a gRPC call to the Database Service
|
||||||
|
let user = db_client.get_user_by_username(username).await.ok()?;
|
||||||
|
|
||||||
|
if verify_password(password, &user.hashed_password) {
|
||||||
|
Some(user.user_id.to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
66
auth-service/tests/integration.rs
Normal file
66
auth-service/tests/integration.rs
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use dotenv::dotenv;
|
||||||
|
// use auth_service::mocks::database_client_mock::MockDatabaseClient;
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_login() {
|
||||||
|
// dotenv().ok();
|
||||||
|
// let mut db_client = MockDatabaseClient::new();
|
||||||
|
//
|
||||||
|
// db_client
|
||||||
|
// .expect_get_user_by_username()
|
||||||
|
// .with(mockall::predicate::eq("test"))
|
||||||
|
// .returning(|user_id| {
|
||||||
|
// Ok(GetUserResponse {
|
||||||
|
// user_id: 1,
|
||||||
|
// username: "test".to_string(),
|
||||||
|
// email: "test@test.com".to_string(),
|
||||||
|
// hashed_password: "test".to_string(),
|
||||||
|
// })
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// let auth_service = MyAuthService {
|
||||||
|
// db_client,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// // Create a test LoginRequest
|
||||||
|
// let request = Request::new(LoginRequest {
|
||||||
|
// username: "test".into(),
|
||||||
|
// password: "test".into(),
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
// // Call the login method
|
||||||
|
// let response = auth_service.login(request).await.unwrap().into_inner();
|
||||||
|
//
|
||||||
|
// // Verify the response
|
||||||
|
// assert!(!response.token.is_empty());
|
||||||
|
// assert_eq!(response.user_id, "1"); // Replace with the expected user ID
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_validate_token() {
|
||||||
|
dotenv().ok();
|
||||||
|
// let addr = std::env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1:50052".to_string());
|
||||||
|
// let db_client = DatabaseClient::connect(&addr).await.unwrap();
|
||||||
|
//
|
||||||
|
// let auth_service = MyAuthService {
|
||||||
|
// db_client,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// // Generate a token for testing
|
||||||
|
// let token = jwt::generate_token("123", Vec::from(["".to_string()])).unwrap();
|
||||||
|
//
|
||||||
|
// // Create a ValidateTokenRequest
|
||||||
|
// let request = Request::new(ValidateTokenRequest { token });
|
||||||
|
//
|
||||||
|
// // Call the validate_token method
|
||||||
|
// let response = auth_service.validate_token(request).await.unwrap().into_inner();
|
||||||
|
//
|
||||||
|
// // Verify the response
|
||||||
|
// assert!(response.valid);
|
||||||
|
// assert_eq!(response.user_id, "123");
|
||||||
|
}
|
||||||
|
}
|
||||||
29
database-service/Cargo.toml
Normal file
29
database-service/Cargo.toml
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
[package]
|
||||||
|
name = "database-service"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
mocks = []
|
||||||
|
consul = []
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.41.1", features = ["full"] }
|
||||||
|
sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-native-tls", "chrono"] }
|
||||||
|
tonic = "0.12.3"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
dotenv = "0.15"
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
|
||||||
|
prost = "0.13.3"
|
||||||
|
prost-types = "0.13.3"
|
||||||
|
redis = "0.27.5"
|
||||||
|
deadpool-redis = "0.18.0"
|
||||||
|
serde_json = "1.0.133"
|
||||||
|
async-trait = "0.1.83"
|
||||||
|
mockall = "0.13.1"
|
||||||
|
reqwest = { version = "0.12.9", features = ["json"] }
|
||||||
|
warp = "0.3.7"
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
tonic-build = "0.12.3"
|
||||||
8
database-service/build.rs
Normal file
8
database-service/build.rs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
fn main() {
|
||||||
|
tonic_build::configure()
|
||||||
|
.build_server(true)
|
||||||
|
.compile_well_known_types(true)
|
||||||
|
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
|
||||||
|
.compile_protos(&["../proto/database.proto"], &["../proto"])
|
||||||
|
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
|
||||||
|
}
|
||||||
59
database-service/src/consul_registration.rs
Normal file
59
database-service/src/consul_registration.rs
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
use reqwest::Client;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct ConsulRegistration {
|
||||||
|
name: String,
|
||||||
|
address: String,
|
||||||
|
port: u16,
|
||||||
|
check: ConsulCheck,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct ConsulCheck {
|
||||||
|
http: String,
|
||||||
|
interval: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn register_service(
|
||||||
|
consul_url: &str,
|
||||||
|
service_name: &str,
|
||||||
|
service_address: &str,
|
||||||
|
service_port: u16,
|
||||||
|
health_check_url: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let registration = ConsulRegistration {
|
||||||
|
name: service_name.to_string(),
|
||||||
|
address: service_address.to_string(),
|
||||||
|
port: service_port,
|
||||||
|
check: ConsulCheck {
|
||||||
|
http: health_check_url.to_string(),
|
||||||
|
interval: "10s".to_string(), // Health check interval
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = Client::new();
|
||||||
|
let consul_register_url = format!("{}/v1/agent/service/register", consul_url);
|
||||||
|
|
||||||
|
client
|
||||||
|
.put(&consul_register_url)
|
||||||
|
.json(®istration)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?; // Ensure response is successful
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn deregister_service(consul_url: &str, service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let client = Client::new();
|
||||||
|
let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_name);
|
||||||
|
|
||||||
|
client
|
||||||
|
.put(&consul_deregister_url)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?; // Ensure response is successful
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
24
database-service/src/db.rs
Normal file
24
database-service/src/db.rs
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
use crate::redis_cache::RedisCache;
|
||||||
|
use crate::users::UsersService;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub struct Database {
|
||||||
|
pub users_service: UsersService, // User-specific functionality
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Database {
|
||||||
|
pub async fn new(pool: PgPool, cache: Arc<RedisCache>) -> Self {
|
||||||
|
let users_service = UsersService { pool, cache };
|
||||||
|
|
||||||
|
Self { users_service }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn health_check(&self) -> bool {
|
||||||
|
// Simple query to check database health
|
||||||
|
sqlx::query("SELECT 1")
|
||||||
|
.execute(&self.users_service.pool)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
83
database-service/src/grpc.rs
Normal file
83
database-service/src/grpc.rs
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
use crate::database::{CreateUserRequest, CreateUserResponse, GetUserByEmailRequest, GetUserByUsernameRequest, GetUserRequest, GetUserResponse};
|
||||||
|
use crate::db::Database;
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
use crate::database::database_service_server::DatabaseService;
|
||||||
|
|
||||||
|
pub struct MyDatabaseService {
|
||||||
|
pub db: Database, // Use the Database struct from users.rs
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl DatabaseService for MyDatabaseService {
|
||||||
|
async fn get_user(
|
||||||
|
&self,
|
||||||
|
request: Request<GetUserRequest>,
|
||||||
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
let user = self.db.users_service.get_user_by_id(req.user_id)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
|
Ok(Response::new(GetUserResponse {
|
||||||
|
user_id: user.id,
|
||||||
|
username: user.username,
|
||||||
|
email: user.email,
|
||||||
|
hashed_password: user.hashed_password,
|
||||||
|
roles: user.roles,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_user(
|
||||||
|
&self,
|
||||||
|
request: Request<CreateUserRequest>,
|
||||||
|
) -> Result<Response<CreateUserResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
let user_id = self.db.users_service.create_user(&req.username, &req.email, &req.hashed_password)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::internal("Failed to create user"))?;
|
||||||
|
|
||||||
|
// Return the newly created user ID
|
||||||
|
Ok(Response::new(CreateUserResponse { user_id }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_by_username(
|
||||||
|
&self,
|
||||||
|
request: Request<GetUserByUsernameRequest>,
|
||||||
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
let user = self.db.users_service.get_user_by_username(&req.username)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
|
Ok(Response::new(GetUserResponse {
|
||||||
|
user_id: user.id,
|
||||||
|
username: user.username,
|
||||||
|
email: user.email,
|
||||||
|
hashed_password: user.hashed_password,
|
||||||
|
roles: user.roles,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_by_email(
|
||||||
|
&self,
|
||||||
|
request: Request<GetUserByEmailRequest>,
|
||||||
|
) -> Result<Response<GetUserResponse>, Status> {
|
||||||
|
let req = request.into_inner();
|
||||||
|
|
||||||
|
let user = self.db.users_service.get_user_by_email(&req.email)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Status::not_found("User not found"))?;
|
||||||
|
|
||||||
|
Ok(Response::new(GetUserResponse {
|
||||||
|
user_id: user.id,
|
||||||
|
username: user.username,
|
||||||
|
email: user.email,
|
||||||
|
hashed_password: user.hashed_password,
|
||||||
|
roles: user.roles,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
8
database-service/src/lib.rs
Normal file
8
database-service/src/lib.rs
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
pub mod users;
|
||||||
|
pub mod redis_cache;
|
||||||
|
pub mod db;
|
||||||
|
pub mod grpc;
|
||||||
|
|
||||||
|
pub mod database {
|
||||||
|
tonic::include_proto!("database");
|
||||||
|
}
|
||||||
82
database-service/src/main.rs
Normal file
82
database-service/src/main.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
use database::database_service_server::DatabaseServiceServer;
|
||||||
|
use database_service::database;
|
||||||
|
use database_service::db::Database;
|
||||||
|
use database_service::grpc::MyDatabaseService;
|
||||||
|
use database_service::redis_cache::RedisCache;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use sqlx::postgres::PgPoolOptions;
|
||||||
|
use std::env;
|
||||||
|
use std::net::ToSocketAddrs;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tonic::transport::Server;
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
mod consul_registration;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
dotenv().ok();
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||||
|
.with_thread_names(true)
|
||||||
|
.with_timer(tracing_subscriber::fmt::time::ChronoLocal::rfc_3339())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||||
|
let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
|
||||||
|
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||||
|
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||||
|
|
||||||
|
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||||
|
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
|
||||||
|
let service_address = addr.as_str();
|
||||||
|
let service_port = port.clone();
|
||||||
|
let health_check_url = format!("http://{}:{}/health", service_address, service_port);
|
||||||
|
let health_check_endpoint_addr = format!("{}:8080", service_address);
|
||||||
|
|
||||||
|
// Register service with Consul
|
||||||
|
consul_registration::register_service(
|
||||||
|
&consul_url,
|
||||||
|
service_name.as_str(),
|
||||||
|
service_address,
|
||||||
|
service_port.parse().unwrap_or(50052),
|
||||||
|
&health_check_url,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Start health-check endpoint
|
||||||
|
let health_route = warp::path!("health")
|
||||||
|
.map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK));
|
||||||
|
|
||||||
|
|
||||||
|
tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap()));
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::signal::ctrl_c().await.unwrap();
|
||||||
|
consul_registration::deregister_service(&consul_url, service_name.as_str()).await.expect("");
|
||||||
|
});
|
||||||
|
|
||||||
|
let full_addr = format!("{}:{}", &addr, port);
|
||||||
|
let address = full_addr.parse().expect("Invalid address");
|
||||||
|
let pool = PgPoolOptions::new()
|
||||||
|
.max_connections(5)
|
||||||
|
.connect(&database_url)
|
||||||
|
.await
|
||||||
|
.expect("Failed to create PostgreSQL connection pool");
|
||||||
|
|
||||||
|
let redis_cache = RedisCache::new(&redis_url);
|
||||||
|
|
||||||
|
let cache = Arc::new(redis_cache); // Share the cache instance between tasks
|
||||||
|
let database_service = MyDatabaseService {
|
||||||
|
db: Database::new(pool, cache).await,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Pass `shared_cache` into services as needed
|
||||||
|
println!("Database Service running on {}", address);
|
||||||
|
Server::builder()
|
||||||
|
.add_service(DatabaseServiceServer::new(database_service))
|
||||||
|
.serve(address)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
80
database-service/src/redis_cache.rs
Normal file
80
database-service/src/redis_cache.rs
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use deadpool_redis::{Config, Pool, Runtime};
|
||||||
|
use redis::{AsyncCommands, RedisError};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait Cache {
|
||||||
|
async fn set<T: Serialize + Send + Sync>(
|
||||||
|
&self,
|
||||||
|
key: &String,
|
||||||
|
value: &T,
|
||||||
|
ttl: u64,
|
||||||
|
) -> Result<(), redis::RedisError>;
|
||||||
|
|
||||||
|
async fn get<T: for<'de> serde::Deserialize<'de> + Send + Sync>(
|
||||||
|
&self,
|
||||||
|
key: &String,
|
||||||
|
) -> Result<Option<T>, redis::RedisError>;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RedisCache {
|
||||||
|
pub pool: Pool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RedisCache {
|
||||||
|
pub fn new(redis_url: &str) -> Self {
|
||||||
|
let cfg = Config::from_url(redis_url);
|
||||||
|
let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");
|
||||||
|
RedisCache { pool }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Cache for RedisCache {
|
||||||
|
async fn set<T: Serialize + Send + Sync>(
|
||||||
|
&self,
|
||||||
|
key: &String,
|
||||||
|
value: &T,
|
||||||
|
ttl: u64,
|
||||||
|
) -> Result<(), redis::RedisError> {
|
||||||
|
let mut conn = self.pool.get().await
|
||||||
|
.map_err(|err| {
|
||||||
|
redis::RedisError::from((
|
||||||
|
redis::ErrorKind::IoError,
|
||||||
|
"Failed to get Redis connection",
|
||||||
|
format!("{:?}", err),
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
let serialized_value = serde_json::to_string(value)
|
||||||
|
.map_err(|err| RedisError::from((
|
||||||
|
redis::ErrorKind::IoError,
|
||||||
|
"Serialization error",
|
||||||
|
format!("Serialization error: {}", err),
|
||||||
|
)))?;
|
||||||
|
conn.set_ex(key, serialized_value, ttl).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get<T: for<'de> Deserialize<'de> + Send + Sync>(&self, key: &String) -> Result<Option<T>, redis::RedisError> {
|
||||||
|
let mut conn = self.pool.get().await
|
||||||
|
.map_err(|err| {
|
||||||
|
redis::RedisError::from((
|
||||||
|
redis::ErrorKind::IoError,
|
||||||
|
"Failed to get Redis connection",
|
||||||
|
format!("{:?}", err),
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
if let Some(serialized_value) = conn.get::<_, Option<String>>(key).await? {
|
||||||
|
let deserialized_value = serde_json::from_str(&serialized_value)
|
||||||
|
.map_err(|err| RedisError::from((
|
||||||
|
redis::ErrorKind::IoError,
|
||||||
|
"Deserialization error",
|
||||||
|
format!("Deserialization error: {}", err),
|
||||||
|
)))?;
|
||||||
|
Ok(Some(deserialized_value))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
168
database-service/src/users.rs
Normal file
168
database-service/src/users.rs
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
use crate::redis_cache::{Cache, RedisCache};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sqlx::Error;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct User {
|
||||||
|
pub id: i32,
|
||||||
|
pub username: String,
|
||||||
|
pub email: String,
|
||||||
|
pub hashed_password: String,
|
||||||
|
pub roles: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct UsersService {
|
||||||
|
pub pool: PgPool,
|
||||||
|
pub cache: Arc<RedisCache>, // Shared Redis cache
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl UsersService {
|
||||||
|
pub async fn create_user(
|
||||||
|
&self,
|
||||||
|
username: &str,
|
||||||
|
email: &str,
|
||||||
|
hashed_password: &str,
|
||||||
|
) -> Result<i32, Error> {
|
||||||
|
let result = sqlx::query!(
|
||||||
|
r#"
|
||||||
|
INSERT INTO users (username, email, hashed_password)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
RETURNING id
|
||||||
|
"#,
|
||||||
|
username,
|
||||||
|
email,
|
||||||
|
hashed_password
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(result.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_user_by_id(&self, user_id: i32) -> Result<User, sqlx::Error> {
|
||||||
|
// Check Redis cache first
|
||||||
|
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user:{}", user_id)).await {
|
||||||
|
return Ok(cached_user);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from PostgreSQL
|
||||||
|
let row = sqlx::query!(
|
||||||
|
"SELECT id, username, email, hashed_password, roles FROM users WHERE id = $1",
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let user = User {
|
||||||
|
id: row.id,
|
||||||
|
username: row.username,
|
||||||
|
email: row.email,
|
||||||
|
hashed_password: row.hashed_password,
|
||||||
|
roles: row.roles.unwrap_or_default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Store result in Redis
|
||||||
|
self.cache
|
||||||
|
.set(&format!("user:{}", user_id), &user, 3600)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
||||||
|
|
||||||
|
Ok(user)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_user_by_username(&self, username: &str) -> Result<User, Error> {
|
||||||
|
// Check Redis cache first
|
||||||
|
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user_by_username:{}", username)).await {
|
||||||
|
return Ok(cached_user);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from PostgreSQL
|
||||||
|
let row = sqlx::query!(
|
||||||
|
"SELECT id, username, email, hashed_password, roles FROM users WHERE username = $1",
|
||||||
|
username
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let user = User {
|
||||||
|
id: row.id,
|
||||||
|
username: row.username,
|
||||||
|
email: row.email,
|
||||||
|
hashed_password: row.hashed_password,
|
||||||
|
roles: row.roles.unwrap_or_default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Store result in Redis
|
||||||
|
self.cache
|
||||||
|
.set(&format!("user_by_username:{}", username), &user, 3600)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
||||||
|
|
||||||
|
Ok(user)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_user_by_email(&self, email: &str) -> Result<User, Error> {
|
||||||
|
// Check Redis cache first
|
||||||
|
if let Ok(Some(cached_user)) = self.cache.get::<User>(&format!("user_by_email:{}", email)).await {
|
||||||
|
return Ok(cached_user);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from PostgreSQL
|
||||||
|
let row = sqlx::query!(
|
||||||
|
"SELECT id, username, email, hashed_password, roles FROM users WHERE email = $1",
|
||||||
|
email
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let user = User {
|
||||||
|
id: row.id,
|
||||||
|
username: row.username,
|
||||||
|
email: row.email,
|
||||||
|
hashed_password: row.hashed_password,
|
||||||
|
roles: row.roles.unwrap_or_default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Store result in Redis
|
||||||
|
self.cache
|
||||||
|
.set(&format!("user_by_email:{}", email), &user, 3600)
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|err| eprintln!("Failed to cache user: {:?}", err));
|
||||||
|
|
||||||
|
Ok(user)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_user_email(&self, user_id: i32, new_email: &str) -> Result<(), Error> {
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
UPDATE users
|
||||||
|
SET email = $1, updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE id = $2
|
||||||
|
"#,
|
||||||
|
new_email,
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_user(&self, user_id: i32) -> Result<(), Error> {
|
||||||
|
sqlx::query!(
|
||||||
|
r#"
|
||||||
|
DELETE FROM users
|
||||||
|
WHERE id = $1
|
||||||
|
"#,
|
||||||
|
user_id
|
||||||
|
)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
29
database-service/tests/get_user.rs
Normal file
29
database-service/tests/get_user.rs
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
use tokio;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_get_user() {
|
||||||
|
// // Set up a temporary in-memory PostgreSQL database
|
||||||
|
// let pool = PgPool::connect("postgres://user:password@localhost/test_database").await.unwrap();
|
||||||
|
//
|
||||||
|
// // Create the test table
|
||||||
|
// pool.execute(
|
||||||
|
// r#"
|
||||||
|
// CREATE TABLE users (
|
||||||
|
// user_id TEXT PRIMARY KEY,
|
||||||
|
// username TEXT NOT NULL,
|
||||||
|
// email TEXT NOT NULL,
|
||||||
|
// hashed_password TEXT NOT NULL
|
||||||
|
// );
|
||||||
|
// INSERT INTO users (user_id, username, email, hashed_password)
|
||||||
|
// VALUES ('123', 'test_user', 'test@example.com', 'hashed_password_example');
|
||||||
|
// "#,
|
||||||
|
// )
|
||||||
|
// .await
|
||||||
|
// .unwrap();
|
||||||
|
//
|
||||||
|
// // Test the `get_user` function
|
||||||
|
// let user = get_user(&pool, "123").await.unwrap();
|
||||||
|
// assert_eq!(user.user_id, "123");
|
||||||
|
// assert_eq!(user.username, "test_user");
|
||||||
|
// assert_eq!(user.email, "test@example.com");
|
||||||
|
}
|
||||||
20
database-service/tests/grpc_get_user.rs
Normal file
20
database-service/tests/grpc_get_user.rs
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
#[tokio::test]
|
||||||
|
async fn test_grpc_get_user() {
|
||||||
|
// let pool = setup_test_pool().await; // Set up your test pool
|
||||||
|
// let cache = setup_test_cache().await; // Set up mock Redis cache
|
||||||
|
//
|
||||||
|
// let service = MyDatabaseService { pool, cache };
|
||||||
|
//
|
||||||
|
// // Create a mock gRPC request
|
||||||
|
// let request = Request::new(GetUserRequest {
|
||||||
|
// user_id: 123,
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
// // Call the service
|
||||||
|
// let response = service.get_user(request).await.unwrap().into_inner();
|
||||||
|
//
|
||||||
|
// // Validate the response
|
||||||
|
// assert_eq!(response.user_id, 123);
|
||||||
|
// assert_eq!(response.username, "test_user");
|
||||||
|
// assert_eq!(response.email, "test@example.com");
|
||||||
|
}
|
||||||
12
database-service/tests/integration.rs
Normal file
12
database-service/tests/integration.rs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use dotenv::dotenv;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_health_check() {
|
||||||
|
dotenv().ok();
|
||||||
|
// let database_url = std::env::var("DATABASE_URL").unwrap();
|
||||||
|
// let db = Database::new(&database_url).await;
|
||||||
|
// assert!(db.health_check().await);
|
||||||
|
}
|
||||||
|
}
|
||||||
16
database-service/tests/redis_cache.rs
Normal file
16
database-service/tests/redis_cache.rs
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
#[tokio::test]
|
||||||
|
async fn test_redis_cache() {
|
||||||
|
// dotenv().ok();
|
||||||
|
// let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||||
|
// let cache = RedisCache::new(&redis_url);
|
||||||
|
//
|
||||||
|
// let key = &"test_key".to_string();
|
||||||
|
// let value = "test_value";
|
||||||
|
//
|
||||||
|
// // Test setting a value
|
||||||
|
// cache.set(key, &value, 10).await.unwrap();
|
||||||
|
//
|
||||||
|
// // Test getting the value
|
||||||
|
// let cached_value: Option<String> = cache.get(key).await.unwrap();
|
||||||
|
// assert_eq!(cached_value, Some("test_value".to_string()));
|
||||||
|
}
|
||||||
57
proto/auth.proto
Normal file
57
proto/auth.proto
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package auth;
|
||||||
|
|
||||||
|
service AuthService {
|
||||||
|
rpc Login(LoginRequest) returns (LoginResponse);
|
||||||
|
rpc ValidateToken(ValidateTokenRequest) returns (ValidateTokenResponse);
|
||||||
|
rpc Register (RegisterRequest) returns (RegisterResponse);
|
||||||
|
rpc RequestPasswordReset (PasswordResetRequest) returns (PasswordResetResponse);
|
||||||
|
rpc ResetPassword (ResetPasswordRequest) returns (ResetPasswordResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message LoginRequest {
|
||||||
|
string username = 1;
|
||||||
|
string password = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message LoginResponse {
|
||||||
|
string token = 1;
|
||||||
|
string user_id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ValidateTokenRequest {
|
||||||
|
string token = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ValidateTokenResponse {
|
||||||
|
bool valid = 1;
|
||||||
|
string user_id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RegisterRequest {
|
||||||
|
string username = 1;
|
||||||
|
string email = 2;
|
||||||
|
string password = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RegisterResponse {
|
||||||
|
int32 user_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message PasswordResetRequest {
|
||||||
|
string email = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message PasswordResetResponse {
|
||||||
|
string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResetPasswordRequest {
|
||||||
|
string reset_token = 1;
|
||||||
|
string new_password = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResetPasswordResponse {
|
||||||
|
string message = 1;
|
||||||
|
}
|
||||||
40
proto/database.proto
Normal file
40
proto/database.proto
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package database;
|
||||||
|
|
||||||
|
service DatabaseService {
|
||||||
|
rpc GetUser(GetUserRequest) returns (GetUserResponse);
|
||||||
|
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
|
||||||
|
rpc GetUserByUsername(GetUserByUsernameRequest) returns (GetUserResponse);
|
||||||
|
rpc GetUserByEmail(GetUserByEmailRequest) returns (GetUserResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetUserRequest {
|
||||||
|
int32 user_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetUserByUsernameRequest {
|
||||||
|
string username = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetUserByEmailRequest {
|
||||||
|
string email = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetUserResponse {
|
||||||
|
int32 user_id = 1;
|
||||||
|
string username = 2;
|
||||||
|
string email = 3;
|
||||||
|
string hashed_password = 4;
|
||||||
|
repeated string roles = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateUserRequest {
|
||||||
|
string username = 1;
|
||||||
|
string email = 2;
|
||||||
|
string hashed_password = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateUserResponse {
|
||||||
|
int32 user_id = 1;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user