Kubernetes & Helm Charts #10

Manually merged
Raven merged 12 commits from helm into betterauth 2025-03-20 17:37:39 -04:00
33 changed files with 716 additions and 514 deletions

View File

@@ -147,7 +147,7 @@ pub async fn serve_rest_api(
.layer(cors);
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
let listener = tokio::net::TcpListener::bind(format!("{}:{}", addr, port))
.await
.unwrap();

View File

@@ -6,59 +6,29 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{info, Level};
use tracing_subscriber::EnvFilter;
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use utils::logging;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
mod axum_gateway;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8079".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "api-service".to_string());
let service_address = env::var("API_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(8080),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let auth_node = get_service_address(&consul_url, "auth-service").await?;
let auth_node = get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?;
let auth_address = auth_node.get(0).unwrap();
let auth_service_address = format!(
"http://{}:{}",
auth_address.ServiceAddress, auth_address.ServicePort
);
let auth_service_address = format!("http://{}", auth_address);
// Connect to the gRPC auth-service
let grpc_client = AuthServiceClient::connect(auth_service_address.to_string()).await?;
@@ -69,10 +39,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(axum_gateway::serve_rest_api(grpc_client));
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -15,7 +15,7 @@ argon2 = "0.5.3"
serde = { version = "1.0", features = ["derive"] }
dotenv = "0.15"
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
prost = "0.13.4"
prost-types = "0.13.3"
chrono = { version = "0.4.38", features = ["serde"] }

View File

@@ -4,84 +4,31 @@ use auth_service::database_client::DatabaseClientTrait;
use auth_service::grpc::MyAuthService;
use auth_service::session::session_service_client::SessionServiceClient;
use dotenv::dotenv;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use tonic::transport::Server;
use tracing::{debug, info, Level};
use utils::consul_registration;
use utils::multi_service_load_balancer::{LoadBalancingStrategy, MultiServiceLoadBalancer};
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
use tracing::info;
use tracing_subscriber::{fmt, EnvFilter};
use utils::logging;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load environment variables from .env
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string());
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
let consul_url = format!("http://{}:{}", consul_address, consul_port);
let consul_dns_url = format!("{}:{}", consul_address, consul_dns_port);
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string());
let service_address = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let lb = MultiServiceLoadBalancer::new(&consul_dns_url, LoadBalancingStrategy::RoundRobin);
let mut db_url = "".to_string();
match lb.get_endpoint("database-service", "grpc").await? {
Some(endpoint) => {
db_url = format!("http://{}", endpoint);
},
None => {
println!("No endpoints available for database-service");
}
}
let mut session_service_address = "".to_string();
match lb.get_endpoint("session-service", "grpc").await? {
Some(endpoint) => {
session_service_address = format!("http://{}", endpoint);
},
None => {
println!("No endpoints available for session-service");
}
}
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50051".to_string());
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
let session_service_address = format!("http://{}",get_kube_service_endpoints_by_dns("session-service","tcp","session-service").await?.get(0).unwrap());
let db_client = Arc::new(DatabaseClient::connect(&db_url).await?);
let session_client = Arc::new(SessionServiceClient::connect(session_service_address).await?);
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
let auth_service = MyAuthService {
@@ -92,7 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter.set_serving::<AuthServiceServer<MyAuthService>>().await;
println!("Authentication Service running on {}", addr);
info!("Authentication Service running on {}", addr);
// Start the gRPC server
tokio::spawn(
@@ -103,10 +50,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -9,7 +9,7 @@ dotenv = "0.15"
tokio = { version = "1.41.1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
tonic = "0.12.3"
prost = "0.13.4"
warp = "0.3.7"

View File

@@ -8,61 +8,28 @@ use crate::character_db_client::CharacterDbClient;
use crate::character_service::character::character_service_server::CharacterServiceServer;
use crate::character_service::MyCharacterService;
use dotenv::dotenv;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use tracing::Level;
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use tracing_subscriber::EnvFilter;
use utils::logging;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("CHARACTER_SERVICE_PORT").unwrap_or_else(|_| "50053".to_string());
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50053".to_string());
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string());
let service_address =
env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let db_nodes = get_service_address(&consul_url, "database-service").await?;
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let mut meta = HashMap::new();
meta.insert("name".to_string(), "Rose".to_string());
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
let db_address = db_nodes.get(0).unwrap();
let db_url = format!(
"http://{}:{}",
db_address.ServiceAddress, db_address.ServicePort
);
let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?);
let character_service = MyCharacterService {
character_db_client,
@@ -77,9 +44,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,24 @@
apiVersion: v2
name: osirose-new
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.1.0"

View File

@@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "osirose-new.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "osirose-new.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "osirose-new.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "osirose-new.labels" -}}
helm.sh/chart: {{ include "osirose-new.chart" . }}
{{ include "osirose-new.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "osirose-new.selectorLabels" -}}
app.kubernetes.io/name: {{ include "osirose-new.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "osirose-new.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "osirose-new.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,14 @@
{{- range .Values.services }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .name }}-env
data:
{{- range $key, $value := $.Values.global.env }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- range $key, $value := .env }}
{{ $key }}: "{{ $value }}"
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,38 @@
{{- range .Values.services }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .name }}
labels:
app: {{ .name }}
{{- if .annotations }}
annotations:
{{- range $key, $value := .annotations }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
spec:
replicas: {{ .replicas }}
selector:
matchLabels:
app: {{ .name }}
template:
metadata:
labels:
app: {{ .name }}
spec:
containers:
- name: {{ .name }}
image: "{{ $.Values.repository }}/{{ .image }}"
ports:
- containerPort: {{ .port }}
env:
- name: DATABASE_URL
value: "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB)"
envFrom:
- configMapRef:
name: {{ .name }}-env
- secretRef:
name: postgres-secrets
---
{{- end }}

View File

@@ -0,0 +1,33 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "osirose-new.fullname" . }}
labels:
{{- include "osirose-new.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "osirose-new.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,25 @@
{{- range .Values.services }}
{{- if .ingress }}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ .name }}-ingress
labels:
app: {{ .name }}
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: {{ .ingress.hostname }}
http:
paths:
- path: {{ .ingress.path }}
pathType: Prefix
backend:
service:
name: {{ .name }}
port:
number: {{ .port }}
---
{{- end }}
{{- end }}

View File

@@ -0,0 +1,26 @@
{{- range .Values.services }}
{{- if .service }}
apiVersion: v1
kind: Service
metadata:
name: {{ .name }}
labels:
app: {{ .name }}
{{- if .service.annotations }}
annotations:
{{- range $key, $value := .service.annotations }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
spec:
type: {{ .service.type | default "ClusterIP" }}
ports:
- name: {{ .service.portName | default "tcp" }}
port: {{ .service.port }}
targetPort: {{ .service.targetPort | default .service.port }}
protocol: {{ .service.protocol | default "TCP" }}
selector:
app: {{ .name }}
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,20 @@
{{- if .Values.tests.enabled }}
{{- range .Values.tests.services }}
apiVersion: v1
kind: Pod
metadata:
name: {{ .name }}-test
annotations:
"helm.sh/hook": test
"helm.sh/hook-delete-policy": before-hook-creation
labels:
app: {{ .name }}
spec:
containers:
- name: {{ .name }}-test
image: {{ .image }}
command: {{ .testCommand }}
restartPolicy: Never
---
{{- end }}
{{- end }}

View File

@@ -0,0 +1,116 @@
repository: gitea.azgstudio.com/raven
autoscaling:
enabled: false
global:
env:
LOG_LEVEL: "debug"
APP_ENV: "dev"
DATABASE_URL: "" # This is a placeholder. Will be dynamically constructed
REDIS_URL: "redis://valkey:6379/0"
LISTEN_ADDR: "0.0.0.0"
services:
- name: api-service
replicas: 1
image: api-service:latest
port: 8080
env:
SERVICE_PORT: 8080
service:
type: LoadBalancer
portName: api-service
port: 8080
targetPort: 8080
protocol: TCP
ingress:
enabled: true
hostname: game-api.azgstudio.com
path: "/"
port: 8080
- name: auth-service
replicas: 1
image: auth-service:latest
port: 50051
env:
SERVICE_PORT: 50051
service:
portName: auth-service
port: 50051
targetPort: 50051
protocol: TCP
- name: character-service
replicas: 1
image: character-service:latest
port: 50053
env:
SERVICE_PORT: 50053
service:
annotations:
name: "AZG Studio"
portName: character-service
port: 50053
targetPort: 50053
protocol: TCP
- name: database-service
replicas: 1
image: database-service:latest
port: 50052
env:
SERVICE_PORT: 50052
service:
portName: database-service
port: 50052
targetPort: 50052
protocol: TCP
- name: packet-service
replicas: 1
image: packet-service:latest
port: 29000
env:
SERVICE_PORT: 29000
service:
type: LoadBalancer
portName: game-packet-service
port: 29000
targetPort: 29000
protocol: TCP
- name: session-service
replicas: 1
image: session-service:latest
port: 50055
env:
SERVICE_PORT: 50055
service:
portName: session-service
port: 50055
targetPort: 50055
protocol: TCP
- name: world-service
replicas: 1
image: world-service:latest
port: 50054
env:
SERVICE_PORT: 50054
service:
annotations:
name: "Athena"
portName: world-service
port: 50054
targetPort: 50054
protocol: TCP
tests:
enabled: false
services:
- name: api-service
testCommand: ["curl", "-f", "http://api-service:8080/health"]
image: curlimages/curl:latest

View File

@@ -15,7 +15,7 @@ chrono = { version = "0.4.39", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
dotenv = "0.15"
tracing = "0.1"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
prost = "0.13.3"
serde_json = "1.0.133"
async-trait = "0.1.83"

View File

@@ -61,7 +61,7 @@ impl CharacterRepository {
self.cache
.lock()
.await
.set(&cache_key, &character, 300)
.set(&cache_key, &character, 0)
.await
.map_err(|_| sqlx::Error::RowNotFound)?;
Ok(character)
@@ -164,7 +164,7 @@ impl CharacterRepository {
self.cache
.lock()
.await
.set(&cache_key, &characters, 300)
.set(&cache_key, &characters, 0)
.await
.map_err(|_| sqlx::Error::RowNotFound)?;
Ok(characters)

View File

@@ -4,61 +4,29 @@ use database_service::grpc::database_service::MyDatabaseService;
use database_service::grpc::user_service_server::UserServiceServer;
use dotenv::dotenv;
use sqlx::postgres::PgPoolOptions;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Server;
use tracing::{info, Level};
use utils::consul_registration;
use tracing_subscriber::EnvFilter;
use utils::logging;
use utils::redis_cache::RedisCache;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
let service_address =
env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
consul_registration::start_health_check(addr.as_str()).await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&database_url)
@@ -72,8 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter.set_serving::<UserServiceServer<MyDatabaseService>>().await;
// Pass `shared_cache` into services as needed
info!("Database Service running on {}", address);
let address = SocketAddr::new(addr.parse()?, port.parse()?);
tokio::spawn(
Server::builder()
.add_service(health_service)
@@ -81,11 +48,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.add_service(CharacterDbServiceServer::new(my_service))
.serve(address),
);
info!("Database Service running on {}", address);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, Row};
use sqlx::{FromRow};
use std::sync::Arc;
use tokio::sync::Mutex;
use utils::redis_cache::{Cache, RedisCache};

View File

@@ -13,7 +13,7 @@ tokio = { version = "1.41.1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
bytes = { version = "1.8.0", features = ["std", "serde"] }
tracing = "0.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
bincode = { version = "2.0.0", features = ["derive", "serde"] }
thiserror = "2.0.3"
lazy_static = "1.5.0"

View File

@@ -25,6 +25,7 @@ use tonic::{Code, Status};
use tracing::{debug, error, info, warn};
use utils::null_string::NullTerminatedString;
use utils::service_discovery;
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info};
pub(crate) async fn handle_alive_req(
stream: &mut TcpStream,
@@ -161,60 +162,59 @@ pub(crate) async fn handle_login_req(
state.session_id = Some(response.session_id.parse().unwrap());
}
let consul_url =
env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let servers =
service_discovery::get_service_address(&consul_url, "character-service")
.await
.unwrap_or_else(|err| {
warn!(err);
Vec::new()
});
if servers.len() == 0 {
let data = SrvLoginReply {
result: srv_login_reply::Result::Failed,
right: 0,
type_: 0,
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
let mut server_info: Vec<ServerInfo> = Vec::new();
let mut id = 0;
for server in servers {
let mut name = server
.ServiceMeta
.get("name")
.unwrap_or(&"".to_string())
.clone();
let is_test = server.ServiceTags.contains(&"test".to_string())
|| server.ServiceTags.contains(&"staging".to_string());
if is_test {
name = format!("@{}", name);
} else {
name = format!(" {}", name);
}
server_info.push(ServerInfo {
test: u8::from(is_test),
name: NullTerminatedString::new(&name),
id,
});
id = id + 1;
}
debug!("Server info: {:?}", server_info);
let mut server_info: Vec<ServerInfo> = Vec::new();
match get_service_info("default", "character-service").await {
Ok(service_info) => {
if let Some(annotations) = service_info.annotations {
let mut server_name = "".to_string();
let mut is_test = false;
for (key, value) in annotations {
match key.as_str() {
"name" => {
server_name = value;
}
"tags" => {
let data = SrvLoginReply {
result: srv_login_reply::Result::Ok,
right: 0,
type_: 0,
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
}
_ => {}
}
}
if is_test {
server_name = format!("@{}", server_name);
} else {
server_name = format!(" {}", server_name);
}
server_info.push(ServerInfo {
test: u8::from(is_test),
name: NullTerminatedString::new(&server_name),
id,
});
let data = SrvLoginReply {
result: srv_login_reply::Result::Ok,
right: 0,
type_: 0,
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
}
}
Err(err) => {
let data = SrvLoginReply {
result: srv_login_reply::Result::Failed,
right: 0,
type_: 0,
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
}
}
}
Err(status) => {
@@ -300,52 +300,47 @@ pub(crate) async fn handle_channel_list_req(
let request = CliChannelListReq::decode(packet.payload.as_slice());
debug!("{:?}", request);
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let channels = service_discovery::get_service_address(&consul_url, "world-service")
.await
.unwrap_or_else(|err| {
warn!(err);
Vec::new()
});
if channels.len() == 0 {
let data = SrvChannelListReply {
id: request?.server_id,
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
debug!("Server info: {:?}", channels);
let mut channel_info: Vec<ChannelInfo> = Vec::new();
let mut id = 1;
for channel in channels {
let name = format!(
"{}",
channel
.ServiceMeta
.get("name")
.unwrap_or(&"".to_string())
.clone()
);
channel_info.push(ChannelInfo {
id: id,
low_age: 0,
high_age: 0,
capacity: 0,
name: NullTerminatedString::new(&name),
});
id = id + 1;
}
debug!("Channel info: {:?}", channel_info);
let mut channel_info: Vec<ChannelInfo> = Vec::new();
match get_service_info("default", "world-service").await {
Ok(service_info) => {
if let Some(annotations) = service_info.annotations {
let mut server_name = "".to_string();
for (key, value) in annotations {
match key.as_str() {
"name" => {
server_name = value;
}
"tags" => {}
_ => {}
}
}
channel_info.push(ChannelInfo {
id: id,
low_age: 0,
high_age: 0,
capacity: 0,
name: NullTerminatedString::new(&server_name),
});
id = id + 1;
let data = SrvChannelListReply {
id: request?.server_id,
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
let data = SrvChannelListReply {
id: request?.server_id,
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
}
}
Err(err) => {
let data = SrvChannelListReply {
id: request?.server_id,
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
}
Ok(())
}

View File

@@ -292,9 +292,9 @@ pub(crate) async fn handle_select_char_req(
for item in items {
if item.slot < MAX_VISIBLE_ITEMS as i32 {
let slot = convert_type_to_body_part(item.slot) as usize - 2;
let slot = convert_type_to_body_part(item.slot) as isize - 2;
if slot >= 0 {
equipped_item_list[slot] = EquippedItem {
equipped_item_list[slot as usize] = EquippedItem {
id: item.item_id as u16,
gem_opt: item.gem_option as u16,
socket: item.socket as i8,

View File

@@ -20,8 +20,9 @@ use tokio::sync::{Mutex, Semaphore};
use tokio::{select, signal};
use tracing::Level;
use tracing::{debug, error, info, warn};
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use tracing_subscriber::EnvFilter;
use utils::{consul_registration, logging};
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
use warp::Filter;
mod auth_client;
@@ -58,61 +59,20 @@ const MAX_CONCURRENT_CONNECTIONS: usize = 100;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string());
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "29000".to_string());
let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string());
let service_address =
env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
let auth_node = get_service_address(&consul_url, "auth-service").await?;
let character_node = get_service_address(&consul_url, "character-service").await?;
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "tcp".to_string()];
let mut meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
let auth_url = format!("http://{}",get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?.get(0).unwrap());
let character_url = format!("http://{}",get_kube_service_endpoints_by_dns("character-service","tcp","character-service").await?.get(0).unwrap());
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let auth_address = auth_node.get(0).unwrap();
let auth_url = format!(
"http://{}:{}",
auth_address.ServiceAddress, auth_address.ServicePort
);
let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?));
let character_address = character_node.get(0).unwrap();
let character_url = format!(
"http://{}:{}",
character_address.ServiceAddress, character_address.ServicePort
);
let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?));
let full_addr = format!("{}:{}", &addr, port);
@@ -160,10 +120,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
prometheus_exporter::start(binding.parse().unwrap()).unwrap();
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

72
scripts/build_and_push.py Normal file
View File

@@ -0,0 +1,72 @@
import subprocess
import os
# Define your images, tags, and Dockerfile paths
images = ["api-service", "auth-service", "character-service", "database-service", "packet-service", "session-service", "world-service"]
dockerfile_paths = [
"../api-service/Dockerfile",
"../auth-service/Dockerfile",
"../character-service/Dockerfile",
"../database-service/Dockerfile",
"../packet-service/Dockerfile",
"../session-service/Dockerfile",
"../world-service/Dockerfile",
]
common_tag = "latest"
version_tag = "v0.1.1"
image_tag_prefix = "gitea.azgstudio.com/raven/"
build_context = "../"
def run_command(command):
"""Run a shell command and handle errors."""
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Error: Command '{' '.join(command)}' failed with exit code {e.returncode}")
exit(1)
def build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context):
"""Build all Docker images."""
for image, dockerfile_path in zip(images, dockerfile_paths):
# Add the prefix to the image name
full_image_name = f"{image_tag_prefix}{image}"
# Build the image with both tags
print(f"Building {full_image_name}:{version_tag} and {full_image_name}:{common_tag} using Dockerfile at {dockerfile_path}...")
run_command([
"docker", "build",
"-t", f"{full_image_name}:{version_tag}",
"-t", f"{full_image_name}:{common_tag}",
"-f", dockerfile_path,
build_context
])
def push_images(images, common_tag, version_tag, image_tag_prefix):
"""Push all Docker images."""
for image in images:
# Add the prefix to the image name
full_image_name = f"{image_tag_prefix}{image}"
# Push both tags
print(f"Pushing {full_image_name}:{version_tag}...")
run_command(["docker", "push", f"{full_image_name}:{version_tag}"])
print(f"Pushing {full_image_name}:{common_tag}...")
run_command(["docker", "push", f"{full_image_name}:{common_tag}"])
if __name__ == "__main__":
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
# Build all images first
print("Starting the build phase...")
build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context)
# Push all images after builds are complete
print("Starting the push phase...")
push_images(images, common_tag, version_tag, image_tag_prefix)

View File

@@ -9,7 +9,7 @@ dotenv = "0.15"
tokio = { version = "1.41.1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
tonic = "0.12.3"
prost = "0.13.4"
warp = "0.3.7"

View File

@@ -10,7 +10,8 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Server;
use tracing::Level;
use utils::consul_registration;
use tracing_subscriber::{fmt, EnvFilter};
use utils::{consul_registration, logging};
use utils::redis_cache::RedisCache;
pub mod common {
@@ -24,42 +25,14 @@ pub mod api {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string());
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50055".to_string());
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string());
let service_address =
env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50055),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
@@ -77,9 +50,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}

View File

@@ -17,3 +17,6 @@ async-trait = "0.1.87"
serde_json = "1.0.140"
hickory-resolver = "0.24.4"
rand = "0.8.5"
kube = { version = "0.99.0", features = ["derive"] }
k8s-openapi = { version = "0.24.0", features = ["v1_32"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }

View File

@@ -4,3 +4,4 @@ pub mod redis_cache;
pub mod service_discovery;
pub mod signal_handler;
pub mod multi_service_load_balancer;
pub mod logging;

19
utils/src/logging.rs Normal file
View File

@@ -0,0 +1,19 @@
use std::env;
use tracing::{debug, error, info, trace, warn};
use tracing_subscriber::EnvFilter;
pub fn setup_logging(app_name: &str) {
let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
let filter = EnvFilter::try_new(format!("{app_name}={log_level},utils={log_level}"))
.unwrap_or_else(|_| EnvFilter::new(format!("{app_name}=info,utils=info")));
tracing_subscriber::fmt()
.with_env_filter(filter)
.init();
error!("Error messages enabled");
warn!("Warning messages enabled");
info!("Info messages enabled");
debug!("Debug messages enabled");
trace!("Trace messages enabled");
}

View File

@@ -1,11 +1,11 @@
use hickory_resolver::config::*;
use hickory_resolver::{Resolver, TokioAsyncResolver};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tracing::log::debug;
use kube::{Client, Api};
use k8s_openapi::api::core::v1::Service;
use std::collections::{BTreeMap};
use hickory_resolver::system_conf::read_system_conf;
pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let mut rc = ResolverConfig::new();
@@ -29,68 +29,92 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s
Ok(endpoints)
}
#[derive(Debug, Deserialize)]
pub struct ServiceNode {
pub ServiceAddress: String,
pub ServicePort: u16,
pub ServiceTags: Vec<String>,
pub ServiceMeta: HashMap<String, String>,
pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let (config, options) = read_system_conf()?;
let resolver = TokioAsyncResolver::tokio(config, options);
let srv_name = format!("_{}._{}.{}", port_name, service_protocol, service_name);
let srv_record = resolver.srv_lookup(&srv_name).await?;
let mut endpoints = Vec::new();
for record in srv_record {
let hostname = record.target();
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
for response in lookup_responses {
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
}
}
Ok(endpoints)
}
pub async fn get_service_address(
consul_url: &str,
#[derive(Debug)]
pub struct ServiceInfo {
pub name: String,
pub namespace: String,
pub annotations: Option<BTreeMap<String, String>>,
pub labels: Option<BTreeMap<String, String>>,
}
pub async fn get_service_info(
namespace: &str,
service_name: &str,
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
) -> Result<ServiceInfo, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::try_default().await?;
let response = client.get(&consul_service_url).send().await?;
// Create an API object for services in the specified namespace
let services: Api<Service> = Api::namespaced(client, namespace);
if !response.status().is_success() {
return Err(format!(
"Failed to fetch service nodes for '{}': {}",
service_name,
response.status()
)
.into());
}
// Get the service object
let service = services.get(service_name).await?;
// Deserialize the response into a Vec<ServiceNode>
let nodes: Vec<ServiceNode> = response.json().await?;
// Extract metadata
let name = service.metadata.name.unwrap_or_default();
let namespace = service.metadata.namespace.unwrap_or_default();
let annotations = service.metadata.annotations.clone();
let labels = service.metadata.labels.clone();
if nodes.is_empty() {
Err(format!("No nodes found for service '{}'", service_name).into())
} else {
Ok(nodes)
}
// Return the service info
Ok(ServiceInfo {
name,
namespace,
annotations,
labels,
})
}
async fn get_services_with_tag(
service_name: &str,
tag: &str,
consul_url: &str,
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
let url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
let client = reqwest::Client::new();
let response = client.get(&url).send().await?;
pub async fn get_services_by_label(
namespace: &str,
label_selector: &str,
) -> Result<Vec<ServiceInfo>, Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
if !response.status().is_success() {
return Err(format!(
"Failed to fetch service nodes for '{}': {}",
service_name,
response.status()
)
.into());
// Create an API object for services in the specified namespace
let services: Api<Service> = Api::namespaced(client, namespace);
// Use ListParams to filter services by label
let list_params = kube::api::ListParams::default().labels(label_selector);
// List services that match the label selector
let service_list = services.list(&list_params).await?;
// Convert the list of services into a vector of ServiceInfo
let mut service_infos = Vec::new();
for service in service_list.items {
let name = service.metadata.name.clone().unwrap_or_default();
let namespace = service.metadata.namespace.clone().unwrap_or_default();
// Convert BTreeMap to HashMap for annotations and labels
let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect());
let labels = service.metadata.labels.map(|btree| btree.into_iter().collect());
service_infos.push(ServiceInfo {
name,
namespace,
annotations,
labels,
});
}
// Deserialize the response into a Vec<ServiceNode>
let nodes: Vec<ServiceNode> = response.json().await?;
// Filter nodes that include the specified tag
let filtered_nodes = nodes
.into_iter()
.filter(|node| node.ServiceTags.contains(&tag.to_string()))
.collect();
Ok(filtered_nodes)
Ok(service_infos)
}

View File

@@ -9,7 +9,7 @@ dotenv = "0.15"
tokio = { version = "1.41.1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
tonic = "0.12.3"
prost = "0.13.4"
warp = "0.3.7"

View File

@@ -1,72 +1,21 @@
use dotenv::dotenv;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use tracing::{debug, Level};
use utils::consul_registration;
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
use utils::{consul_registration, logging};
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
tracing_subscriber::fmt()
.with_max_level(
Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()))
.unwrap_or_else(|_| Level::INFO),
)
.init();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name);
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("WORLD_SERVICE_PORT").unwrap_or_else(|_| "50054".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string());
// let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
let consul_url = format!("http://{}:{}", consul_address, consul_port);
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string());
let service_address =
env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
let db_nodes = get_service_address(&consul_url, "database-service").await?;
let temp_db_nodes = get_service_endpoints_by_dns(format!("{}:{}", consul_address, consul_dns_port).as_str(), "grpc", "database-service").await?;
debug!("{:?}", temp_db_nodes);
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50054".to_string());
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let mut meta = HashMap::new();
meta.insert("name".to_string(), "Athena".to_string());
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50054),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let db_address = db_nodes.get(0).unwrap();
let db_url = format!(
"http://{}:{}",
db_address.ServiceAddress, db_address.ServicePort
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}