diff --git a/api-service/src/axum_gateway.rs b/api-service/src/axum_gateway.rs index 9452483..f9a8025 100644 --- a/api-service/src/axum_gateway.rs +++ b/api-service/src/axum_gateway.rs @@ -147,7 +147,7 @@ pub async fn serve_rest_api( .layer(cors); let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string()); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "8080".to_string()); let listener = tokio::net::TcpListener::bind(format!("{}:{}", addr, port)) .await .unwrap(); diff --git a/api-service/src/main.rs b/api-service/src/main.rs index 3d06bff..1887613 100644 --- a/api-service/src/main.rs +++ b/api-service/src/main.rs @@ -6,59 +6,29 @@ use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{info, Level}; +use tracing_subscriber::EnvFilter; use utils::consul_registration; -use utils::service_discovery::get_service_address; +use utils::logging; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; mod axum_gateway; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8079".to_string()); - - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "api-service".to_string()); - let service_address = env::var("API_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(8080), - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "8080".to_string()); // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; - let auth_node = get_service_address(&consul_url, "auth-service").await?; + let auth_node = get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?; let auth_address = auth_node.get(0).unwrap(); - let auth_service_address = format!( - "http://{}:{}", - auth_address.ServiceAddress, auth_address.ServicePort - ); + let auth_service_address = format!("http://{}", auth_address); // Connect to the gRPC auth-service let grpc_client = AuthServiceClient::connect(auth_service_address.to_string()).await?; @@ -69,10 +39,5 @@ async fn main() -> Result<(), Box> { tokio::spawn(axum_gateway::serve_rest_api(grpc_client)); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/auth-service/Cargo.toml b/auth-service/Cargo.toml index 722006e..97ffc30 100644 --- a/auth-service/Cargo.toml +++ b/auth-service/Cargo.toml @@ -15,7 +15,7 @@ argon2 = "0.5.3" serde = { version = "1.0", features = ["derive"] } dotenv = "0.15" tracing = "0.1" -tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } prost = "0.13.4" prost-types = "0.13.3" chrono = { version = "0.4.38", features = ["serde"] } diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 16cbb5e..88ccad8 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -4,84 +4,31 @@ use auth_service::database_client::DatabaseClientTrait; use auth_service::grpc::MyAuthService; use auth_service::session::session_service_client::SessionServiceClient; use dotenv::dotenv; -use std::collections::HashMap; use std::env; -use std::str::FromStr; use std::sync::Arc; use tonic::transport::Server; -use tracing::{debug, info, Level}; -use utils::consul_registration; -use utils::multi_service_load_balancer::{LoadBalancingStrategy, MultiServiceLoadBalancer}; -use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns}; +use tracing::info; +use tracing_subscriber::{fmt, EnvFilter}; +use utils::logging; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { // Load environment variables from .env dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); - + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); + // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string()); - - let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string()); - let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string()); - let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string()); - let consul_url = format!("http://{}:{}", consul_address, consul_port); - let consul_dns_url = format!("{}:{}", consul_address, consul_dns_port); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string()); - let service_address = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - let lb = MultiServiceLoadBalancer::new(&consul_dns_url, LoadBalancingStrategy::RoundRobin); - - let mut db_url = "".to_string(); - match lb.get_endpoint("database-service", "grpc").await? { - Some(endpoint) => { - db_url = format!("http://{}", endpoint); - }, - None => { - println!("No endpoints available for database-service"); - } - } - - let mut session_service_address = "".to_string(); - match lb.get_endpoint("session-service", "grpc").await? { - Some(endpoint) => { - session_service_address = format!("http://{}", endpoint); - }, - None => { - println!("No endpoints available for session-service"); - } - } + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50051".to_string()); + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); + let session_service_address = format!("http://{}",get_kube_service_endpoints_by_dns("session-service","tcp","session-service").await?.get(0).unwrap()); let db_client = Arc::new(DatabaseClient::connect(&db_url).await?); let session_client = Arc::new(SessionServiceClient::connect(session_service_address).await?); - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; - let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); let auth_service = MyAuthService { @@ -92,7 +39,7 @@ async fn main() -> Result<(), Box> { let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter.set_serving::>().await; - println!("Authentication Service running on {}", addr); + info!("Authentication Service running on {}", addr); // Start the gRPC server tokio::spawn( @@ -103,10 +50,5 @@ async fn main() -> Result<(), Box> { ); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/character-service/Cargo.toml b/character-service/Cargo.toml index 8c063c5..a19b863 100644 --- a/character-service/Cargo.toml +++ b/character-service/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.15" tokio = { version = "1.41.1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } tracing = "0.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } tonic = "0.12.3" prost = "0.13.4" warp = "0.3.7" diff --git a/character-service/src/main.rs b/character-service/src/main.rs index cac965e..b2041c2 100644 --- a/character-service/src/main.rs +++ b/character-service/src/main.rs @@ -8,61 +8,28 @@ use crate::character_db_client::CharacterDbClient; use crate::character_service::character::character_service_server::CharacterServiceServer; use crate::character_service::MyCharacterService; use dotenv::dotenv; -use std::collections::HashMap; use std::env; use std::str::FromStr; use std::sync::Arc; use tracing::Level; -use utils::consul_registration; -use utils::service_discovery::get_service_address; +use tracing_subscriber::EnvFilter; +use utils::logging; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("CHARACTER_SERVICE_PORT").unwrap_or_else(|_| "50053".to_string()); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50053".to_string()); + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string()); - let service_address = - env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let db_nodes = get_service_address(&consul_url, "database-service").await?; - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let mut meta = HashMap::new(); - meta.insert("name".to_string(), "Rose".to_string()); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); - let db_address = db_nodes.get(0).unwrap(); - let db_url = format!( - "http://{}:{}", - db_address.ServiceAddress, db_address.ServicePort - ); let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?); let character_service = MyCharacterService { character_db_client, @@ -77,9 +44,5 @@ async fn main() -> Result<(), Box> { .await?; utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) } diff --git a/charts/osirose-new/.helmignore b/charts/osirose-new/.helmignore new file mode 100644 index 0000000..1b9a9cc --- /dev/null +++ b/charts/osirose-new/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/charts/osirose-new/Chart.yaml b/charts/osirose-new/Chart.yaml new file mode 100644 index 0000000..ad5bcc6 --- /dev/null +++ b/charts/osirose-new/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: osirose-new +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "0.1.0" diff --git a/charts/osirose-new/templates/_helpers.tpl b/charts/osirose-new/templates/_helpers.tpl new file mode 100644 index 0000000..cdb19e2 --- /dev/null +++ b/charts/osirose-new/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "osirose-new.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "osirose-new.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "osirose-new.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "osirose-new.labels" -}} +helm.sh/chart: {{ include "osirose-new.chart" . }} +{{ include "osirose-new.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "osirose-new.selectorLabels" -}} +app.kubernetes.io/name: {{ include "osirose-new.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "osirose-new.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "osirose-new.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/charts/osirose-new/templates/configmap.yaml b/charts/osirose-new/templates/configmap.yaml new file mode 100644 index 0000000..93920d0 --- /dev/null +++ b/charts/osirose-new/templates/configmap.yaml @@ -0,0 +1,14 @@ +{{- range .Values.services }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .name }}-env +data: +{{- range $key, $value := $.Values.global.env }} + {{ $key }}: "{{ $value }}" +{{- end }} +{{- range $key, $value := .env }} + {{ $key }}: "{{ $value }}" +{{- end }} +--- +{{- end }} diff --git a/charts/osirose-new/templates/deployment.yaml b/charts/osirose-new/templates/deployment.yaml new file mode 100644 index 0000000..6b79f98 --- /dev/null +++ b/charts/osirose-new/templates/deployment.yaml @@ -0,0 +1,38 @@ +{{- range .Values.services }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .name }} + labels: + app: {{ .name }} + {{- if .annotations }} + annotations: + {{- range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{- end }} + {{- end }} +spec: + replicas: {{ .replicas }} + selector: + matchLabels: + app: {{ .name }} + template: + metadata: + labels: + app: {{ .name }} + spec: + containers: + - name: {{ .name }} + image: "{{ $.Values.repository }}/{{ .image }}" + ports: + - containerPort: {{ .port }} + env: + - name: DATABASE_URL + value: "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB)" + envFrom: + - configMapRef: + name: {{ .name }}-env + - secretRef: + name: postgres-secrets +--- +{{- end }} diff --git a/charts/osirose-new/templates/hpa.yaml b/charts/osirose-new/templates/hpa.yaml new file mode 100644 index 0000000..492594a --- /dev/null +++ b/charts/osirose-new/templates/hpa.yaml @@ -0,0 +1,33 @@ +{{- if .Values.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "osirose-new.fullname" . }} + labels: + {{- include "osirose-new.labels" . | nindent 4 }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "osirose-new.fullname" . }} + minReplicas: {{ .Values.autoscaling.minReplicas }} + maxReplicas: {{ .Values.autoscaling.maxReplicas }} + metrics: + {{- if .Values.autoscaling.targetCPUUtilizationPercentage }} + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }} + {{- end }} + {{- if .Values.autoscaling.targetMemoryUtilizationPercentage }} + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }} + {{- end }} +--- +{{- end }} diff --git a/charts/osirose-new/templates/ingress.yaml b/charts/osirose-new/templates/ingress.yaml new file mode 100644 index 0000000..a98f3ab --- /dev/null +++ b/charts/osirose-new/templates/ingress.yaml @@ -0,0 +1,25 @@ +{{- range .Values.services }} +{{- if .ingress }} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ .name }}-ingress + labels: + app: {{ .name }} + annotations: + nginx.ingress.kubernetes.io/rewrite-target: / +spec: + rules: + - host: {{ .ingress.hostname }} + http: + paths: + - path: {{ .ingress.path }} + pathType: Prefix + backend: + service: + name: {{ .name }} + port: + number: {{ .port }} +--- +{{- end }} +{{- end }} diff --git a/charts/osirose-new/templates/service.yaml b/charts/osirose-new/templates/service.yaml new file mode 100644 index 0000000..b8f07a4 --- /dev/null +++ b/charts/osirose-new/templates/service.yaml @@ -0,0 +1,26 @@ +{{- range .Values.services }} +{{- if .service }} +apiVersion: v1 +kind: Service +metadata: + name: {{ .name }} + labels: + app: {{ .name }} + {{- if .service.annotations }} + annotations: + {{- range $key, $value := .service.annotations }} + {{ $key }}: "{{ $value }}" + {{- end }} + {{- end }} +spec: + type: {{ .service.type | default "ClusterIP" }} + ports: + - name: {{ .service.portName | default "tcp" }} + port: {{ .service.port }} + targetPort: {{ .service.targetPort | default .service.port }} + protocol: {{ .service.protocol | default "TCP" }} + selector: + app: {{ .name }} +{{- end }} +--- +{{- end }} diff --git a/charts/osirose-new/templates/tests/test-connection.yaml b/charts/osirose-new/templates/tests/test-connection.yaml new file mode 100644 index 0000000..d3b5e3d --- /dev/null +++ b/charts/osirose-new/templates/tests/test-connection.yaml @@ -0,0 +1,20 @@ +{{- if .Values.tests.enabled }} +{{- range .Values.tests.services }} +apiVersion: v1 +kind: Pod +metadata: + name: {{ .name }}-test + annotations: + "helm.sh/hook": test + "helm.sh/hook-delete-policy": before-hook-creation + labels: + app: {{ .name }} +spec: + containers: + - name: {{ .name }}-test + image: {{ .image }} + command: {{ .testCommand }} + restartPolicy: Never +--- +{{- end }} +{{- end }} diff --git a/charts/osirose-new/values.yaml b/charts/osirose-new/values.yaml new file mode 100644 index 0000000..15af58e --- /dev/null +++ b/charts/osirose-new/values.yaml @@ -0,0 +1,116 @@ + +repository: gitea.azgstudio.com/raven + +autoscaling: + enabled: false + +global: + env: + LOG_LEVEL: "debug" + APP_ENV: "dev" + DATABASE_URL: "" # This is a placeholder. Will be dynamically constructed + REDIS_URL: "redis://valkey:6379/0" + LISTEN_ADDR: "0.0.0.0" + +services: + - name: api-service + replicas: 1 + image: api-service:latest + port: 8080 + env: + SERVICE_PORT: 8080 + service: + type: LoadBalancer + portName: api-service + port: 8080 + targetPort: 8080 + protocol: TCP + ingress: + enabled: true + hostname: game-api.azgstudio.com + path: "/" + port: 8080 + + - name: auth-service + replicas: 1 + image: auth-service:latest + port: 50051 + env: + SERVICE_PORT: 50051 + service: + portName: auth-service + port: 50051 + targetPort: 50051 + protocol: TCP + + - name: character-service + replicas: 1 + image: character-service:latest + port: 50053 + env: + SERVICE_PORT: 50053 + service: + annotations: + name: "AZG Studio" + portName: character-service + port: 50053 + targetPort: 50053 + protocol: TCP + + - name: database-service + replicas: 1 + image: database-service:latest + port: 50052 + env: + SERVICE_PORT: 50052 + service: + portName: database-service + port: 50052 + targetPort: 50052 + protocol: TCP + + - name: packet-service + replicas: 1 + image: packet-service:latest + port: 29000 + env: + SERVICE_PORT: 29000 + service: + type: LoadBalancer + portName: game-packet-service + port: 29000 + targetPort: 29000 + protocol: TCP + + - name: session-service + replicas: 1 + image: session-service:latest + port: 50055 + env: + SERVICE_PORT: 50055 + service: + portName: session-service + port: 50055 + targetPort: 50055 + protocol: TCP + + - name: world-service + replicas: 1 + image: world-service:latest + port: 50054 + env: + SERVICE_PORT: 50054 + service: + annotations: + name: "Athena" + portName: world-service + port: 50054 + targetPort: 50054 + protocol: TCP + +tests: + enabled: false + services: + - name: api-service + testCommand: ["curl", "-f", "http://api-service:8080/health"] + image: curlimages/curl:latest diff --git a/database-service/Cargo.toml b/database-service/Cargo.toml index 1da6f24..6b84728 100644 --- a/database-service/Cargo.toml +++ b/database-service/Cargo.toml @@ -15,7 +15,7 @@ chrono = { version = "0.4.39", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } dotenv = "0.15" tracing = "0.1" -tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } prost = "0.13.3" serde_json = "1.0.133" async-trait = "0.1.83" diff --git a/database-service/src/characters.rs b/database-service/src/characters.rs index c476840..66d3b57 100644 --- a/database-service/src/characters.rs +++ b/database-service/src/characters.rs @@ -61,7 +61,7 @@ impl CharacterRepository { self.cache .lock() .await - .set(&cache_key, &character, 300) + .set(&cache_key, &character, 0) .await .map_err(|_| sqlx::Error::RowNotFound)?; Ok(character) @@ -164,7 +164,7 @@ impl CharacterRepository { self.cache .lock() .await - .set(&cache_key, &characters, 300) + .set(&cache_key, &characters, 0) .await .map_err(|_| sqlx::Error::RowNotFound)?; Ok(characters) diff --git a/database-service/src/main.rs b/database-service/src/main.rs index 3668113..4387b5b 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -4,61 +4,29 @@ use database_service::grpc::database_service::MyDatabaseService; use database_service::grpc::user_service_server::UserServiceServer; use dotenv::dotenv; use sqlx::postgres::PgPoolOptions; -use std::collections::HashMap; use std::env; +use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Server; use tracing::{info, Level}; -use utils::consul_registration; +use tracing_subscriber::EnvFilter; +use utils::logging; use utils::redis_cache::RedisCache; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string()); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50052".to_string()); - let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let redis_url = - std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string()); - let service_address = - env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - None, - None, - ) - .await?; - - consul_registration::start_health_check(addr.as_str()).await?; - - let full_addr = format!("{}:{}", &addr, port); - let address = full_addr.parse().expect("Invalid address"); let pool = PgPoolOptions::new() .max_connections(5) .connect(&database_url) @@ -72,8 +40,7 @@ async fn main() -> Result<(), Box> { let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); health_reporter.set_serving::>().await; - // Pass `shared_cache` into services as needed - info!("Database Service running on {}", address); + let address = SocketAddr::new(addr.parse()?, port.parse()?); tokio::spawn( Server::builder() .add_service(health_service) @@ -81,11 +48,8 @@ async fn main() -> Result<(), Box> { .add_service(CharacterDbServiceServer::new(my_service)) .serve(address), ); + info!("Database Service running on {}", address); utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/database-service/src/users.rs b/database-service/src/users.rs index e538173..36bffc9 100644 --- a/database-service/src/users.rs +++ b/database-service/src/users.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, Row}; +use sqlx::{FromRow}; use std::sync::Arc; use tokio::sync::Mutex; use utils::redis_cache::{Cache, RedisCache}; diff --git a/packet-service/Cargo.toml b/packet-service/Cargo.toml index b829ddd..6c9c111 100644 --- a/packet-service/Cargo.toml +++ b/packet-service/Cargo.toml @@ -13,7 +13,7 @@ tokio = { version = "1.41.1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } bytes = { version = "1.8.0", features = ["std", "serde"] } tracing = "0.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } bincode = { version = "2.0.0", features = ["derive", "serde"] } thiserror = "2.0.3" lazy_static = "1.5.0" diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs index 9394690..870cd05 100644 --- a/packet-service/src/handlers/auth.rs +++ b/packet-service/src/handlers/auth.rs @@ -25,6 +25,7 @@ use tonic::{Code, Status}; use tracing::{debug, error, info, warn}; use utils::null_string::NullTerminatedString; use utils::service_discovery; +use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info}; pub(crate) async fn handle_alive_req( stream: &mut TcpStream, @@ -161,60 +162,59 @@ pub(crate) async fn handle_login_req( state.session_id = Some(response.session_id.parse().unwrap()); } - let consul_url = - env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let servers = - service_discovery::get_service_address(&consul_url, "character-service") - .await - .unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); - - if servers.len() == 0 { - let data = SrvLoginReply { - result: srv_login_reply::Result::Failed, - right: 0, - type_: 0, - servers_info: Vec::new(), - }; - let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; - send_packet(stream, &response_packet).await?; - return Ok(()); - } - - let mut server_info: Vec = Vec::new(); let mut id = 0; - for server in servers { - let mut name = server - .ServiceMeta - .get("name") - .unwrap_or(&"".to_string()) - .clone(); - let is_test = server.ServiceTags.contains(&"test".to_string()) - || server.ServiceTags.contains(&"staging".to_string()); - if is_test { - name = format!("@{}", name); - } else { - name = format!(" {}", name); - } - server_info.push(ServerInfo { - test: u8::from(is_test), - name: NullTerminatedString::new(&name), - id, - }); - id = id + 1; - } - debug!("Server info: {:?}", server_info); + let mut server_info: Vec = Vec::new(); + match get_service_info("default", "character-service").await { + Ok(service_info) => { + if let Some(annotations) = service_info.annotations { + let mut server_name = "".to_string(); + let mut is_test = false; + for (key, value) in annotations { + match key.as_str() { + "name" => { + server_name = value; + } + "tags" => { - let data = SrvLoginReply { - result: srv_login_reply::Result::Ok, - right: 0, - type_: 0, - servers_info: server_info, - }; - let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; - send_packet(stream, &response_packet).await?; + } + _ => {} + } + } + + if is_test { + server_name = format!("@{}", server_name); + } else { + server_name = format!(" {}", server_name); + } + + server_info.push(ServerInfo { + test: u8::from(is_test), + name: NullTerminatedString::new(&server_name), + id, + }); + + let data = SrvLoginReply { + result: srv_login_reply::Result::Ok, + right: 0, + type_: 0, + servers_info: server_info, + }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + } + Err(err) => { + let data = SrvLoginReply { + result: srv_login_reply::Result::Failed, + right: 0, + type_: 0, + servers_info: Vec::new(), + }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + } } } Err(status) => { @@ -300,52 +300,47 @@ pub(crate) async fn handle_channel_list_req( let request = CliChannelListReq::decode(packet.payload.as_slice()); debug!("{:?}", request); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let channels = service_discovery::get_service_address(&consul_url, "world-service") - .await - .unwrap_or_else(|err| { - warn!(err); - Vec::new() - }); - - if channels.len() == 0 { - let data = SrvChannelListReply { - id: request?.server_id, - channels: Vec::new(), - }; - let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; - send_packet(stream, &response_packet).await?; - return Ok(()); - } - - debug!("Server info: {:?}", channels); - let mut channel_info: Vec = Vec::new(); let mut id = 1; - for channel in channels { - let name = format!( - "{}", - channel - .ServiceMeta - .get("name") - .unwrap_or(&"".to_string()) - .clone() - ); - channel_info.push(ChannelInfo { - id: id, - low_age: 0, - high_age: 0, - capacity: 0, - name: NullTerminatedString::new(&name), - }); - id = id + 1; - } - debug!("Channel info: {:?}", channel_info); + let mut channel_info: Vec = Vec::new(); + match get_service_info("default", "world-service").await { + Ok(service_info) => { + if let Some(annotations) = service_info.annotations { + let mut server_name = "".to_string(); + for (key, value) in annotations { + match key.as_str() { + "name" => { + server_name = value; + } + "tags" => {} + _ => {} + } + } + channel_info.push(ChannelInfo { + id: id, + low_age: 0, + high_age: 0, + capacity: 0, + name: NullTerminatedString::new(&server_name), + }); + id = id + 1; - let data = SrvChannelListReply { - id: request?.server_id, - channels: channel_info, - }; - let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; - send_packet(stream, &response_packet).await?; + let data = SrvChannelListReply { + id: request?.server_id, + channels: channel_info, + }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; + } + } + Err(err) => { + let data = SrvChannelListReply { + id: request?.server_id, + channels: Vec::new(), + }; + let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?; + send_packet(stream, &response_packet).await?; + return Ok(()); + } + } Ok(()) } diff --git a/packet-service/src/handlers/character.rs b/packet-service/src/handlers/character.rs index 354695a..ca1462c 100644 --- a/packet-service/src/handlers/character.rs +++ b/packet-service/src/handlers/character.rs @@ -292,9 +292,9 @@ pub(crate) async fn handle_select_char_req( for item in items { if item.slot < MAX_VISIBLE_ITEMS as i32 { - let slot = convert_type_to_body_part(item.slot) as usize - 2; + let slot = convert_type_to_body_part(item.slot) as isize - 2; if slot >= 0 { - equipped_item_list[slot] = EquippedItem { + equipped_item_list[slot as usize] = EquippedItem { id: item.item_id as u16, gem_opt: item.gem_option as u16, socket: item.socket as i8, diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index f33d816..baa8a78 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -20,8 +20,9 @@ use tokio::sync::{Mutex, Semaphore}; use tokio::{select, signal}; use tracing::Level; use tracing::{debug, error, info, warn}; -use utils::consul_registration; -use utils::service_discovery::get_service_address; +use tracing_subscriber::EnvFilter; +use utils::{consul_registration, logging}; +use utils::service_discovery::{get_kube_service_endpoints_by_dns}; use warp::Filter; mod auth_client; @@ -58,61 +59,20 @@ const MAX_CONCURRENT_CONNECTIONS: usize = 100; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string()); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "29000".to_string()); let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string()); - - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string()); - let service_address = - env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - let auth_node = get_service_address(&consul_url, "auth-service").await?; - let character_node = get_service_address(&consul_url, "character-service").await?; - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "tcp".to_string()]; - let mut meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50052), - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; + let auth_url = format!("http://{}",get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?.get(0).unwrap()); + let character_url = format!("http://{}",get_kube_service_endpoints_by_dns("character-service","tcp","character-service").await?.get(0).unwrap()); // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; - let auth_address = auth_node.get(0).unwrap(); - let auth_url = format!( - "http://{}:{}", - auth_address.ServiceAddress, auth_address.ServicePort - ); let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); - - let character_address = character_node.get(0).unwrap(); - let character_url = format!( - "http://{}:{}", - character_address.ServiceAddress, character_address.ServicePort - ); let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?)); let full_addr = format!("{}:{}", &addr, port); @@ -160,10 +120,5 @@ async fn main() -> Result<(), Box> { prometheus_exporter::start(binding.parse().unwrap()).unwrap(); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); - info!("service {} deregistered", service_name); Ok(()) } diff --git a/scripts/build_and_push.py b/scripts/build_and_push.py new file mode 100644 index 0000000..df85507 --- /dev/null +++ b/scripts/build_and_push.py @@ -0,0 +1,72 @@ +import subprocess +import os + +# Define your images, tags, and Dockerfile paths +images = ["api-service", "auth-service", "character-service", "database-service", "packet-service", "session-service", "world-service"] +dockerfile_paths = [ + "../api-service/Dockerfile", + "../auth-service/Dockerfile", + "../character-service/Dockerfile", + "../database-service/Dockerfile", + "../packet-service/Dockerfile", + "../session-service/Dockerfile", + "../world-service/Dockerfile", +] + +common_tag = "latest" +version_tag = "v0.1.1" +image_tag_prefix = "gitea.azgstudio.com/raven/" +build_context = "../" + +def run_command(command): + """Run a shell command and handle errors.""" + try: + subprocess.run(command, check=True) + except subprocess.CalledProcessError as e: + print(f"Error: Command '{' '.join(command)}' failed with exit code {e.returncode}") + exit(1) + + +def build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context): + """Build all Docker images.""" + for image, dockerfile_path in zip(images, dockerfile_paths): + # Add the prefix to the image name + full_image_name = f"{image_tag_prefix}{image}" + + # Build the image with both tags + print(f"Building {full_image_name}:{version_tag} and {full_image_name}:{common_tag} using Dockerfile at {dockerfile_path}...") + run_command([ + "docker", "build", + "-t", f"{full_image_name}:{version_tag}", + "-t", f"{full_image_name}:{common_tag}", + "-f", dockerfile_path, + build_context + ]) + + +def push_images(images, common_tag, version_tag, image_tag_prefix): + """Push all Docker images.""" + for image in images: + # Add the prefix to the image name + full_image_name = f"{image_tag_prefix}{image}" + + # Push both tags + print(f"Pushing {full_image_name}:{version_tag}...") + run_command(["docker", "push", f"{full_image_name}:{version_tag}"]) + + print(f"Pushing {full_image_name}:{common_tag}...") + run_command(["docker", "push", f"{full_image_name}:{common_tag}"]) + + +if __name__ == "__main__": + script_dir = os.path.dirname(os.path.abspath(__file__)) + os.chdir(script_dir) + + # Build all images first + print("Starting the build phase...") + build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context) + + # Push all images after builds are complete + print("Starting the push phase...") + push_images(images, common_tag, version_tag, image_tag_prefix) + diff --git a/session-service/Cargo.toml b/session-service/Cargo.toml index 4a72afa..0d0c113 100644 --- a/session-service/Cargo.toml +++ b/session-service/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.15" tokio = { version = "1.41.1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } tracing = "0.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } tonic = "0.12.3" prost = "0.13.4" warp = "0.3.7" diff --git a/session-service/src/main.rs b/session-service/src/main.rs index bce8341..1d451b4 100644 --- a/session-service/src/main.rs +++ b/session-service/src/main.rs @@ -10,7 +10,8 @@ use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Server; use tracing::Level; -use utils::consul_registration; +use tracing_subscriber::{fmt, EnvFilter}; +use utils::{consul_registration, logging}; use utils::redis_cache::RedisCache; pub mod common { @@ -24,42 +25,14 @@ pub mod api { #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string()); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50055".to_string()); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let redis_url = - std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); - let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string()); - let service_address = - env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - - // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let meta = HashMap::new(); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50055), - tags, - meta, - None, - None, - ) - .await?; let full_addr = format!("{}:{}", &addr, port); let address = full_addr.parse().expect("Invalid address"); @@ -77,9 +50,5 @@ async fn main() -> Result<(), Box> { ); utils::signal_handler::wait_for_signal().await; - - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 72dbab9..5b39e48 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -17,3 +17,6 @@ async-trait = "0.1.87" serde_json = "1.0.140" hickory-resolver = "0.24.4" rand = "0.8.5" +kube = { version = "0.99.0", features = ["derive"] } +k8s-openapi = { version = "0.24.0", features = ["v1_32"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 06e1115..5d375c2 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -4,3 +4,4 @@ pub mod redis_cache; pub mod service_discovery; pub mod signal_handler; pub mod multi_service_load_balancer; +pub mod logging; diff --git a/utils/src/logging.rs b/utils/src/logging.rs new file mode 100644 index 0000000..49911a1 --- /dev/null +++ b/utils/src/logging.rs @@ -0,0 +1,19 @@ +use std::env; +use tracing::{debug, error, info, trace, warn}; +use tracing_subscriber::EnvFilter; + +pub fn setup_logging(app_name: &str) { + let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()); + let filter = EnvFilter::try_new(format!("{app_name}={log_level},utils={log_level}")) + .unwrap_or_else(|_| EnvFilter::new(format!("{app_name}=info,utils=info"))); + + tracing_subscriber::fmt() + .with_env_filter(filter) + .init(); + + error!("Error messages enabled"); + warn!("Warning messages enabled"); + info!("Info messages enabled"); + debug!("Debug messages enabled"); + trace!("Trace messages enabled"); +} \ No newline at end of file diff --git a/utils/src/service_discovery.rs b/utils/src/service_discovery.rs index 82471e2..bbac3a1 100644 --- a/utils/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -1,11 +1,11 @@ use hickory_resolver::config::*; use hickory_resolver::{Resolver, TokioAsyncResolver}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; -use tokio::runtime::Runtime; -use tracing::log::debug; +use kube::{Client, Api}; +use k8s_openapi::api::core::v1::Service; +use std::collections::{BTreeMap}; +use hickory_resolver::system_conf::read_system_conf; pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result, Box> { let mut rc = ResolverConfig::new(); @@ -29,68 +29,92 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s Ok(endpoints) } -#[derive(Debug, Deserialize)] -pub struct ServiceNode { - pub ServiceAddress: String, - pub ServicePort: u16, - pub ServiceTags: Vec, - pub ServiceMeta: HashMap, +pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result, Box> { + let (config, options) = read_system_conf()?; + let resolver = TokioAsyncResolver::tokio(config, options); + + let srv_name = format!("_{}._{}.{}", port_name, service_protocol, service_name); + let srv_record = resolver.srv_lookup(&srv_name).await?; + + let mut endpoints = Vec::new(); + for record in srv_record { + let hostname = record.target(); + let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?; + for response in lookup_responses { + endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?); + } + } + + Ok(endpoints) } -pub async fn get_service_address( - consul_url: &str, +#[derive(Debug)] +pub struct ServiceInfo { + pub name: String, + pub namespace: String, + pub annotations: Option>, + pub labels: Option>, +} + +pub async fn get_service_info( + namespace: &str, service_name: &str, -) -> Result, Box> { - let client = reqwest::Client::new(); - let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name); +) -> Result> { + let client = Client::try_default().await?; - let response = client.get(&consul_service_url).send().await?; + // Create an API object for services in the specified namespace + let services: Api = Api::namespaced(client, namespace); - if !response.status().is_success() { - return Err(format!( - "Failed to fetch service nodes for '{}': {}", - service_name, - response.status() - ) - .into()); - } + // Get the service object + let service = services.get(service_name).await?; - // Deserialize the response into a Vec - let nodes: Vec = response.json().await?; + // Extract metadata + let name = service.metadata.name.unwrap_or_default(); + let namespace = service.metadata.namespace.unwrap_or_default(); + let annotations = service.metadata.annotations.clone(); + let labels = service.metadata.labels.clone(); - if nodes.is_empty() { - Err(format!("No nodes found for service '{}'", service_name).into()) - } else { - Ok(nodes) - } + // Return the service info + Ok(ServiceInfo { + name, + namespace, + annotations, + labels, + }) } -async fn get_services_with_tag( - service_name: &str, - tag: &str, - consul_url: &str, -) -> Result, Box> { - let url = format!("{}/v1/catalog/service/{}", consul_url, service_name); - let client = reqwest::Client::new(); - let response = client.get(&url).send().await?; +pub async fn get_services_by_label( + namespace: &str, + label_selector: &str, +) -> Result, Box> { + let client = Client::try_default().await?; - if !response.status().is_success() { - return Err(format!( - "Failed to fetch service nodes for '{}': {}", - service_name, - response.status() - ) - .into()); + // Create an API object for services in the specified namespace + let services: Api = Api::namespaced(client, namespace); + + // Use ListParams to filter services by label + let list_params = kube::api::ListParams::default().labels(label_selector); + + // List services that match the label selector + let service_list = services.list(&list_params).await?; + + // Convert the list of services into a vector of ServiceInfo + let mut service_infos = Vec::new(); + for service in service_list.items { + let name = service.metadata.name.clone().unwrap_or_default(); + let namespace = service.metadata.namespace.clone().unwrap_or_default(); + + // Convert BTreeMap to HashMap for annotations and labels + let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect()); + let labels = service.metadata.labels.map(|btree| btree.into_iter().collect()); + + service_infos.push(ServiceInfo { + name, + namespace, + annotations, + labels, + }); } - // Deserialize the response into a Vec - let nodes: Vec = response.json().await?; - - // Filter nodes that include the specified tag - let filtered_nodes = nodes - .into_iter() - .filter(|node| node.ServiceTags.contains(&tag.to_string())) - .collect(); - - Ok(filtered_nodes) + Ok(service_infos) } diff --git a/world-service/Cargo.toml b/world-service/Cargo.toml index ae131d1..9b5c615 100644 --- a/world-service/Cargo.toml +++ b/world-service/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.15" tokio = { version = "1.41.1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } tracing = "0.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] } tonic = "0.12.3" prost = "0.13.4" warp = "0.3.7" diff --git a/world-service/src/main.rs b/world-service/src/main.rs index abb26e5..0d7a054 100644 --- a/world-service/src/main.rs +++ b/world-service/src/main.rs @@ -1,72 +1,21 @@ use dotenv::dotenv; -use std::collections::HashMap; use std::env; -use std::str::FromStr; -use tracing::{debug, Level}; -use utils::consul_registration; -use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns}; +use utils::{consul_registration, logging}; +use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns}; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); - tracing_subscriber::fmt() - .with_max_level( - Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())) - .unwrap_or_else(|_| Level::INFO), - ) - .init(); + let app_name = env!("CARGO_PKG_NAME"); + logging::setup_logging(app_name); // Set the gRPC server address let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); - let port = env::var("WORLD_SERVICE_PORT").unwrap_or_else(|_| "50054".to_string()); - let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string()); - - // let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); - let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string()); - let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string()); - let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string()); - let consul_url = format!("http://{}:{}", consul_address, consul_port); - let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string()); - let service_address = - env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); - let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, health_port); - let db_nodes = get_service_address(&consul_url, "database-service").await?; - - let temp_db_nodes = get_service_endpoints_by_dns(format!("{}:{}", consul_address, consul_dns_port).as_str(), "grpc", "database-service").await?; - debug!("{:?}", temp_db_nodes); + let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50054".to_string()); + let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap()); // Register service with Consul - let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME")); - let version = env!("CARGO_PKG_VERSION").to_string(); - let tags = vec![version, "grpc".to_string()]; - let mut meta = HashMap::new(); - meta.insert("name".to_string(), "Athena".to_string()); - consul_registration::register_service( - &consul_url, - service_id.as_str(), - service_name.as_str(), - service_address.as_str(), - service_port.parse().unwrap_or(50054), - tags, - meta, - Some("http"), - Some(&health_check_url), - ) - .await?; - - // Start health-check endpoint consul_registration::start_health_check(addr.as_str()).await?; - - let db_address = db_nodes.get(0).unwrap(); - let db_url = format!( - "http://{}:{}", - db_address.ServiceAddress, db_address.ServicePort - ); - utils::signal_handler::wait_for_signal().await; - consul_registration::deregister_service(&consul_url, service_id.as_str()) - .await - .expect(""); Ok(()) }