From ab7728837cff57788d390e50946eef7c995a18d302a1d4909ac47ab081ac6f4c Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Tue, 26 Nov 2024 01:59:01 -0500 Subject: [PATCH] - add: service discovery protocol using consul --- auth-service/Cargo.toml | 5 ++ auth-service/src/consul_registration.rs | 59 +++++++++++++++++++++ auth-service/src/main.rs | 53 ++++++++++++++---- auth-service/src/service_discovery.rs | 53 ++++++++++++++++++ database-service/Cargo.toml | 10 +++- database-service/src/consul_registration.rs | 59 +++++++++++++++++++++ database-service/src/main.rs | 43 +++++++++++++-- 7 files changed, 266 insertions(+), 16 deletions(-) create mode 100644 auth-service/src/consul_registration.rs create mode 100644 auth-service/src/service_discovery.rs create mode 100644 database-service/src/consul_registration.rs diff --git a/auth-service/Cargo.toml b/auth-service/Cargo.toml index 663065c..5a0e741 100644 --- a/auth-service/Cargo.toml +++ b/auth-service/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [features] mocks = [] +consul = [] [dependencies] tokio = { version = "1.41.1", features = ["full"] } @@ -20,6 +21,10 @@ prost-types = "0.13.3" chrono = { version = "0.4.38", features = ["serde"] } async-trait = "0.1.83" mockall = "0.13.1" +rand = "0.8.5" +warp = "0.3.7" +reqwest = { version = "0.12.9", features = ["json"] } +consul = "0.4.2" [build-dependencies] tonic-build = "0.12.3" diff --git a/auth-service/src/consul_registration.rs b/auth-service/src/consul_registration.rs new file mode 100644 index 0000000..be77976 --- /dev/null +++ b/auth-service/src/consul_registration.rs @@ -0,0 +1,59 @@ +use reqwest::Client; +use serde::Serialize; + +#[derive(Serialize)] +struct ConsulRegistration { + name: String, + address: String, + port: u16, + check: ConsulCheck, +} + +#[derive(Serialize)] +struct ConsulCheck { + http: String, + interval: String, +} + +pub async fn register_service( + consul_url: &str, + service_name: &str, + service_address: &str, + service_port: u16, + health_check_url: &str, +) -> Result<(), Box> { + let registration = ConsulRegistration { + name: service_name.to_string(), + address: service_address.to_string(), + port: service_port, + check: ConsulCheck { + http: health_check_url.to_string(), + interval: "10s".to_string(), // Health check interval + }, + }; + + let client = Client::new(); + let consul_register_url = format!("{}/v1/agent/service/register", consul_url); + + client + .put(&consul_register_url) + .json(®istration) + .send() + .await? + .error_for_status()?; // Ensure response is successful + + Ok(()) +} + +pub async fn deregister_service(consul_url: &str, service_name: &str) -> Result<(), Box> { + let client = Client::new(); + let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_name); + + client + .put(&consul_deregister_url) + .send() + .await? + .error_for_status()?; // Ensure response is successful + + Ok(()) +} \ No newline at end of file diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 525a936..3fd0764 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,17 +1,16 @@ +use warp::Filter; use dotenv::dotenv; use std::env; +use std::net::ToSocketAddrs; use tonic::transport::Server; use auth_service::grpc::MyAuthService; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; use auth_service::auth::auth_service_server::AuthServiceServer; +use crate::service_discovery::get_service_address; -pub mod auth { - tonic::include_proto!("auth"); // Path matches the package name in auth.proto -} -pub mod database { - tonic::include_proto!("database"); // Matches package name in database.proto -} +mod consul_registration; +mod service_discovery; #[tokio::main] async fn main() -> Result<(), Box> { @@ -25,11 +24,45 @@ async fn main() -> Result<(), Box> { .init(); // Set the gRPC server address - let addr = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1:50051".to_string()); + let addr = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string()); let db_addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "http://127.0.0.1:50052".to_string()); - let database_client = DatabaseClient::connect(&db_addr).await?; - let addr = addr.parse().expect("Invalid address"); + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string()); + let service_address = addr.as_str(); + let service_port = port.clone(); + let health_check_url = format!("http://{}:{}/health", service_address, service_port); + let health_check_endpoint_addr = format!("{}:8081", service_address); + + // Register service with Consul + consul_registration::register_service( + &consul_url, + service_name.as_str(), + service_address, + service_port.parse().unwrap_or(50052), + &health_check_url, + ) + .await?; + + // Start health-check endpoint + let health_route = warp::path!("health") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + + tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + + let db_address = get_service_address(&consul_url, "database-service").await?; + + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + consul_registration::deregister_service(&consul_url, service_name.as_str()).await.expect(""); + }); + + let db_url = format!("http://{}:{}", db_address.Address, db_address.Port); + let database_client = DatabaseClient::connect(&db_url).await?; + + let full_addr = format!("{}:{}", &addr, port); + let address = full_addr.parse().expect("Invalid address"); let auth_service = MyAuthService { db_client: database_client, }; @@ -39,7 +72,7 @@ async fn main() -> Result<(), Box> { // Start the gRPC server Server::builder() .add_service(AuthServiceServer::new(auth_service)) - .serve(addr) + .serve(address) .await?; Ok(()) diff --git a/auth-service/src/service_discovery.rs b/auth-service/src/service_discovery.rs new file mode 100644 index 0000000..f0ecc20 --- /dev/null +++ b/auth-service/src/service_discovery.rs @@ -0,0 +1,53 @@ +use std::collections::HashMap; +use std::iter::Map; +use consul::Client; +use consul::health::Health; +use reqwest::Response; +use serde::Deserialize; + +#[derive(Deserialize)] +struct Address { + Address: String, + Port: u16, +} + +#[derive(Deserialize)] +struct TaggedAddresses { + lan_ipv4: Address, + wan_ipv4: Address, +} + +#[derive(Deserialize)] +struct Weights { + Passing: u8, + Warning: u8, +} + +#[derive(Deserialize)] +pub struct Service { + pub(crate) ID: String, + pub(crate) Service: String, + pub(crate) Tags: Vec, + pub(crate) Port: u16, + pub(crate) Address: String, + pub(crate) TaggedAddresses: TaggedAddresses, + pub(crate) Weights: Weights, + pub(crate) EnableTagOverride: bool, + pub(crate) ContentHash: String, + pub(crate) Datacenter: String, +} + +pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result<(Service), Box> { + let client = reqwest::Client::new(); + let consul_service_url = format!("{}/v1/agent/service/{}", consul_url, service_name); + + let response = client + .get(&consul_service_url) + .send() + .await? + .error_for_status()? + .json::() + .await?; // Ensure response is successful + + Ok(response) +} diff --git a/database-service/Cargo.toml b/database-service/Cargo.toml index 9dc1e8b..6c77e20 100644 --- a/database-service/Cargo.toml +++ b/database-service/Cargo.toml @@ -3,8 +3,9 @@ name = "database-service" version = "0.1.0" edition = "2021" -[build-dependencies] -tonic-build = "0.12.3" +[features] +mocks = [] +consul = [] [dependencies] tokio = { version = "1.41.1", features = ["full"] } @@ -21,3 +22,8 @@ deadpool-redis = "0.18.0" serde_json = "1.0.133" async-trait = "0.1.83" mockall = "0.13.1" +reqwest = { version = "0.12.9", features = ["json"] } +warp = "0.3.7" + +[build-dependencies] +tonic-build = "0.12.3" \ No newline at end of file diff --git a/database-service/src/consul_registration.rs b/database-service/src/consul_registration.rs new file mode 100644 index 0000000..be77976 --- /dev/null +++ b/database-service/src/consul_registration.rs @@ -0,0 +1,59 @@ +use reqwest::Client; +use serde::Serialize; + +#[derive(Serialize)] +struct ConsulRegistration { + name: String, + address: String, + port: u16, + check: ConsulCheck, +} + +#[derive(Serialize)] +struct ConsulCheck { + http: String, + interval: String, +} + +pub async fn register_service( + consul_url: &str, + service_name: &str, + service_address: &str, + service_port: u16, + health_check_url: &str, +) -> Result<(), Box> { + let registration = ConsulRegistration { + name: service_name.to_string(), + address: service_address.to_string(), + port: service_port, + check: ConsulCheck { + http: health_check_url.to_string(), + interval: "10s".to_string(), // Health check interval + }, + }; + + let client = Client::new(); + let consul_register_url = format!("{}/v1/agent/service/register", consul_url); + + client + .put(&consul_register_url) + .json(®istration) + .send() + .await? + .error_for_status()?; // Ensure response is successful + + Ok(()) +} + +pub async fn deregister_service(consul_url: &str, service_name: &str) -> Result<(), Box> { + let client = Client::new(); + let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_name); + + client + .put(&consul_deregister_url) + .send() + .await? + .error_for_status()?; // Ensure response is successful + + Ok(()) +} \ No newline at end of file diff --git a/database-service/src/main.rs b/database-service/src/main.rs index ef621df..58368d1 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -1,5 +1,7 @@ +use warp::Filter; use dotenv::dotenv; use std::env; +use std::net::ToSocketAddrs; use tonic::transport::Server; use database_service::db::Database; use database_service::redis_cache::RedisCache; @@ -8,7 +10,9 @@ use std::sync::Arc; use database_service::database; use database_service::grpc::MyDatabaseService; use sqlx::postgres::PgPoolOptions; +use tokio::task; +mod consul_registration; #[tokio::main] async fn main() -> Result<(), Box> { @@ -19,11 +23,42 @@ async fn main() -> Result<(), Box> { .with_timer(tracing_subscriber::fmt::time::ChronoLocal::rfc_3339()) .init(); - let addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1:50052".to_string()); + let addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string()); let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let addr = addr.parse().expect("Invalid address"); + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string()); + let service_address = addr.as_str(); + let service_port = port.clone(); + let health_check_url = format!("http://{}:{}/health", service_address, service_port); + let health_check_endpoint_addr = format!("{}:8080", service_address); + + // Register service with Consul + consul_registration::register_service( + &consul_url, + service_name.as_str(), + service_address, + service_port.parse().unwrap_or(50052), + &health_check_url, + ) + .await?; + + // Start health-check endpoint + let health_route = warp::path!("health") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + + + tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + consul_registration::deregister_service(&consul_url, service_name.as_str()).await.expect(""); + }); + + let full_addr = format!("{}:{}", &addr, port); + let address = full_addr.parse().expect("Invalid address"); let pool = PgPoolOptions::new() .max_connections(5) .connect(&database_url) @@ -38,10 +73,10 @@ async fn main() -> Result<(), Box> { }; // Pass `shared_cache` into services as needed - println!("Database Service running on {}", addr); + println!("Database Service running on {}", address); Server::builder() .add_service(DatabaseServiceServer::new(database_service)) - .serve(addr) + .serve(address) .await?; Ok(())