Compare commits
6 Commits
cf9efc9866
...
f4bc414ebd
| Author | SHA256 | Date | |
|---|---|---|---|
|
f4bc414ebd
|
|||
|
4734b7560a
|
|||
|
e28219c8b7
|
|||
|
eebf5c58e0
|
|||
|
6a8ea2521a
|
|||
|
f353a73658
|
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{info, Level};
|
||||
use utils::consul_registration;
|
||||
use utils::service_discovery::get_service_address;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
|
||||
|
||||
mod axum_gateway;
|
||||
|
||||
@@ -24,41 +24,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
|
||||
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8079".to_string());
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "api-service".to_string());
|
||||
let service_address = env::var("API_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version];
|
||||
let meta = HashMap::new();
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(8080),
|
||||
tags,
|
||||
meta,
|
||||
Some("http"),
|
||||
Some(&health_check_url),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Start health-check endpoint
|
||||
consul_registration::start_health_check(addr.as_str()).await?;
|
||||
|
||||
let auth_node = get_service_address(&consul_url, "auth-service").await?;
|
||||
let auth_node = get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?;
|
||||
let auth_address = auth_node.get(0).unwrap();
|
||||
let auth_service_address = format!(
|
||||
"http://{}:{}",
|
||||
auth_address.ServiceAddress, auth_address.ServicePort
|
||||
);
|
||||
let auth_service_address = format!("http://{}", auth_address);
|
||||
|
||||
// Connect to the gRPC auth-service
|
||||
let grpc_client = AuthServiceClient::connect(auth_service_address.to_string()).await?;
|
||||
@@ -69,10 +41,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tokio::spawn(axum_gateway::serve_rest_api(grpc_client));
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
info!("service {} deregistered", service_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use tonic::transport::Server;
|
||||
use tracing::{debug, info, Level};
|
||||
use utils::consul_registration;
|
||||
use utils::multi_service_load_balancer::{LoadBalancingStrategy, MultiServiceLoadBalancer};
|
||||
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
@@ -29,59 +29,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string());
|
||||
|
||||
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
|
||||
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
|
||||
let consul_url = format!("http://{}:{}", consul_address, consul_port);
|
||||
let consul_dns_url = format!("{}:{}", consul_address, consul_dns_port);
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string());
|
||||
let service_address = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
|
||||
let lb = MultiServiceLoadBalancer::new(&consul_dns_url, LoadBalancingStrategy::RoundRobin);
|
||||
|
||||
let mut db_url = "".to_string();
|
||||
match lb.get_endpoint("database-service", "grpc").await? {
|
||||
Some(endpoint) => {
|
||||
db_url = format!("http://{}", endpoint);
|
||||
},
|
||||
None => {
|
||||
println!("No endpoints available for database-service");
|
||||
}
|
||||
}
|
||||
|
||||
let mut session_service_address = "".to_string();
|
||||
match lb.get_endpoint("session-service", "grpc").await? {
|
||||
Some(endpoint) => {
|
||||
session_service_address = format!("http://{}", endpoint);
|
||||
},
|
||||
None => {
|
||||
println!("No endpoints available for session-service");
|
||||
}
|
||||
}
|
||||
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
|
||||
let session_service_address = format!("http://{}",get_kube_service_endpoints_by_dns("session-service","tcp","session-service").await?.get(0).unwrap());
|
||||
|
||||
let db_client = Arc::new(DatabaseClient::connect(&db_url).await?);
|
||||
let session_client = Arc::new(SessionServiceClient::connect(session_service_address).await?);
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "grpc".to_string()];
|
||||
let meta = HashMap::new();
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50052),
|
||||
tags,
|
||||
meta,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let full_addr = format!("{}:{}", &addr, port);
|
||||
let address = full_addr.parse().expect("Invalid address");
|
||||
let auth_service = MyAuthService {
|
||||
@@ -103,10 +56,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
);
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
info!("service {} deregistered", service_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tracing::Level;
|
||||
use utils::consul_registration;
|
||||
use utils::service_discovery::get_service_address;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
@@ -29,40 +29,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("CHARACTER_SERVICE_PORT").unwrap_or_else(|_| "50053".to_string());
|
||||
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string());
|
||||
let service_address =
|
||||
env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
let db_nodes = get_service_address(&consul_url, "database-service").await?;
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "grpc".to_string()];
|
||||
let mut meta = HashMap::new();
|
||||
meta.insert("name".to_string(), "Rose".to_string());
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50052),
|
||||
tags,
|
||||
meta,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let full_addr = format!("{}:{}", &addr, port);
|
||||
let address = full_addr.parse().expect("Invalid address");
|
||||
let db_address = db_nodes.get(0).unwrap();
|
||||
let db_url = format!(
|
||||
"http://{}:{}",
|
||||
db_address.ServiceAddress, db_address.ServicePort
|
||||
);
|
||||
let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?);
|
||||
let character_service = MyCharacterService {
|
||||
character_db_client,
|
||||
@@ -77,9 +48,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.await?;
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
23
charts/osirose-new/.helmignore
Normal file
23
charts/osirose-new/.helmignore
Normal file
@@ -0,0 +1,23 @@
|
||||
# Patterns to ignore when building packages.
|
||||
# This supports shell glob matching, relative path matching, and
|
||||
# negation (prefixed with !). Only one pattern per line.
|
||||
.DS_Store
|
||||
# Common VCS dirs
|
||||
.git/
|
||||
.gitignore
|
||||
.bzr/
|
||||
.bzrignore
|
||||
.hg/
|
||||
.hgignore
|
||||
.svn/
|
||||
# Common backup files
|
||||
*.swp
|
||||
*.bak
|
||||
*.tmp
|
||||
*.orig
|
||||
*~
|
||||
# Various IDEs
|
||||
.project
|
||||
.idea/
|
||||
*.tmproj
|
||||
.vscode/
|
||||
24
charts/osirose-new/Chart.yaml
Normal file
24
charts/osirose-new/Chart.yaml
Normal file
@@ -0,0 +1,24 @@
|
||||
apiVersion: v2
|
||||
name: osirose-new
|
||||
description: A Helm chart for Kubernetes
|
||||
|
||||
# A chart can be either an 'application' or a 'library' chart.
|
||||
#
|
||||
# Application charts are a collection of templates that can be packaged into versioned archives
|
||||
# to be deployed.
|
||||
#
|
||||
# Library charts provide useful utilities or functions for the chart developer. They're included as
|
||||
# a dependency of application charts to inject those utilities and functions into the rendering
|
||||
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
|
||||
type: application
|
||||
|
||||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.0
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "0.1.0"
|
||||
62
charts/osirose-new/templates/_helpers.tpl
Normal file
62
charts/osirose-new/templates/_helpers.tpl
Normal file
@@ -0,0 +1,62 @@
|
||||
{{/*
|
||||
Expand the name of the chart.
|
||||
*/}}
|
||||
{{- define "osirose-new.name" -}}
|
||||
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create a default fully qualified app name.
|
||||
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
|
||||
If release name contains chart name it will be used as a full name.
|
||||
*/}}
|
||||
{{- define "osirose-new.fullname" -}}
|
||||
{{- if .Values.fullnameOverride }}
|
||||
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- $name := default .Chart.Name .Values.nameOverride }}
|
||||
{{- if contains $name .Release.Name }}
|
||||
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
|
||||
{{- else }}
|
||||
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create chart name and version as used by the chart label.
|
||||
*/}}
|
||||
{{- define "osirose-new.chart" -}}
|
||||
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Common labels
|
||||
*/}}
|
||||
{{- define "osirose-new.labels" -}}
|
||||
helm.sh/chart: {{ include "osirose-new.chart" . }}
|
||||
{{ include "osirose-new.selectorLabels" . }}
|
||||
{{- if .Chart.AppVersion }}
|
||||
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
||||
{{- end }}
|
||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Selector labels
|
||||
*/}}
|
||||
{{- define "osirose-new.selectorLabels" -}}
|
||||
app.kubernetes.io/name: {{ include "osirose-new.name" . }}
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
Create the name of the service account to use
|
||||
*/}}
|
||||
{{- define "osirose-new.serviceAccountName" -}}
|
||||
{{- if .Values.serviceAccount.create }}
|
||||
{{- default (include "osirose-new.fullname" .) .Values.serviceAccount.name }}
|
||||
{{- else }}
|
||||
{{- default "default" .Values.serviceAccount.name }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
14
charts/osirose-new/templates/configmap.yaml
Normal file
14
charts/osirose-new/templates/configmap.yaml
Normal file
@@ -0,0 +1,14 @@
|
||||
{{- range .Values.services }}
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: {{ .name }}-env
|
||||
data:
|
||||
{{- range $key, $value := $.Values.global.env }}
|
||||
{{ $key }}: "{{ $value }}"
|
||||
{{- end }}
|
||||
{{- range $key, $value := .env }}
|
||||
{{ $key }}: "{{ $value }}"
|
||||
{{- end }}
|
||||
---
|
||||
{{- end }}
|
||||
38
charts/osirose-new/templates/deployment.yaml
Normal file
38
charts/osirose-new/templates/deployment.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
{{- range .Values.services }}
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: {{ .name }}
|
||||
labels:
|
||||
app: {{ .name }}
|
||||
spec:
|
||||
replicas: {{ .replicas }}
|
||||
selector:
|
||||
matchLabels:
|
||||
app: {{ .name }}
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .name }}
|
||||
spec:
|
||||
containers:
|
||||
- name: {{ .name }}
|
||||
image: "{{ $.Values.repository }}/{{ .image }}"
|
||||
ports:
|
||||
- containerPort: {{ .port }}
|
||||
env:
|
||||
- name: DATABASE_URL
|
||||
value: "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB)"
|
||||
envFrom:
|
||||
- configMapRef:
|
||||
name: {{ .name }}-env
|
||||
- secretRef:
|
||||
name: postgres-secrets
|
||||
volumeMounts:
|
||||
- name: service-ids
|
||||
mountPath: /services
|
||||
volumes:
|
||||
- name: service-ids
|
||||
emptyDir: {}
|
||||
---
|
||||
{{- end }}
|
||||
33
charts/osirose-new/templates/hpa.yaml
Normal file
33
charts/osirose-new/templates/hpa.yaml
Normal file
@@ -0,0 +1,33 @@
|
||||
{{- if .Values.autoscaling.enabled }}
|
||||
apiVersion: autoscaling/v2
|
||||
kind: HorizontalPodAutoscaler
|
||||
metadata:
|
||||
name: {{ include "osirose-new.fullname" . }}
|
||||
labels:
|
||||
{{- include "osirose-new.labels" . | nindent 4 }}
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
name: {{ include "osirose-new.fullname" . }}
|
||||
minReplicas: {{ .Values.autoscaling.minReplicas }}
|
||||
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
|
||||
metrics:
|
||||
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
|
||||
- type: Resource
|
||||
resource:
|
||||
name: cpu
|
||||
target:
|
||||
type: Utilization
|
||||
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
|
||||
{{- end }}
|
||||
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
|
||||
- type: Resource
|
||||
resource:
|
||||
name: memory
|
||||
target:
|
||||
type: Utilization
|
||||
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
|
||||
{{- end }}
|
||||
---
|
||||
{{- end }}
|
||||
25
charts/osirose-new/templates/ingress.yaml
Normal file
25
charts/osirose-new/templates/ingress.yaml
Normal file
@@ -0,0 +1,25 @@
|
||||
{{- range .Values.services }}
|
||||
{{- if .ingress.enabled }}
|
||||
apiVersion: networking.k8s.io/v1
|
||||
kind: Ingress
|
||||
metadata:
|
||||
name: {{ .name }}-ingress
|
||||
labels:
|
||||
app: {{ .name }}
|
||||
annotations:
|
||||
nginx.ingress.kubernetes.io/rewrite-target: /
|
||||
spec:
|
||||
rules:
|
||||
- host: {{ .ingress.hostname }}
|
||||
http:
|
||||
paths:
|
||||
- path: {{ .ingress.path }}
|
||||
pathType: Prefix
|
||||
backend:
|
||||
service:
|
||||
name: {{ .name }}
|
||||
port:
|
||||
number: {{ .port }}
|
||||
---
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
20
charts/osirose-new/templates/service.yaml
Normal file
20
charts/osirose-new/templates/service.yaml
Normal file
@@ -0,0 +1,20 @@
|
||||
{{- range .Values.services }}
|
||||
{{- if .tcp.enabled }}
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: {{ .name }}
|
||||
labels:
|
||||
app: {{ .name }}
|
||||
spec:
|
||||
type: {{ .tcp.type | default "ClusterIP" }}
|
||||
ports:
|
||||
- name: {{ .tcp.portName | default "tcp" }}
|
||||
port: {{ .tcp.port }}
|
||||
targetPort: {{ .tcp.targetPort | default .tcp.port }}
|
||||
protocol: {{ .tcp.protocol | default "TCP" }}
|
||||
selector:
|
||||
app: {{ .name }}
|
||||
{{- end }}
|
||||
---
|
||||
{{- end }}
|
||||
20
charts/osirose-new/templates/tests/test-connection.yaml
Normal file
20
charts/osirose-new/templates/tests/test-connection.yaml
Normal file
@@ -0,0 +1,20 @@
|
||||
{{- if .Values.tests.enabled }}
|
||||
{{- range .Values.tests.services }}
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: {{ .name }}-test
|
||||
annotations:
|
||||
"helm.sh/hook": test
|
||||
"helm.sh/hook-delete-policy": before-hook-creation
|
||||
labels:
|
||||
app: {{ .name }}
|
||||
spec:
|
||||
containers:
|
||||
- name: {{ .name }}-test
|
||||
image: {{ .image }}
|
||||
command: {{ .testCommand }}
|
||||
restartPolicy: Never
|
||||
---
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
115
charts/osirose-new/values.yaml
Normal file
115
charts/osirose-new/values.yaml
Normal file
@@ -0,0 +1,115 @@
|
||||
|
||||
repository: gitea.azgstudio.com/raven
|
||||
|
||||
autoscaling:
|
||||
enabled: false
|
||||
|
||||
global:
|
||||
env:
|
||||
LOG_LEVEL: "debug"
|
||||
APP_ENV: "dev"
|
||||
DATABASE_URL: "" # This is a placeholder. Will be dynamically constructed
|
||||
REDIS_URL: "redis://valkey:6379/0"
|
||||
LISTEN_ADDR: "0.0.0.0"
|
||||
|
||||
services:
|
||||
- name: api-service
|
||||
replicas: 1
|
||||
image: api-service:latest
|
||||
port: 8080
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: api-service
|
||||
port: 8080
|
||||
targetPort: 8080
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: true
|
||||
hostname: game-api.azgstudio.com
|
||||
path: "/"
|
||||
port: 8080
|
||||
|
||||
- name: auth-service
|
||||
replicas: 1
|
||||
image: auth-service:latest
|
||||
port: 50051
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: auth-service
|
||||
port: 50051
|
||||
targetPort: 50051
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
- name: character-service
|
||||
replicas: 1
|
||||
image: character-service:latest
|
||||
port: 50053
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: character-service
|
||||
port: 50053
|
||||
targetPort: 50053
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
- name: database-service
|
||||
replicas: 1
|
||||
image: database-service:latest
|
||||
port: 50052
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: database-service
|
||||
port: 50052
|
||||
targetPort: 50052
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
- name: packet-service
|
||||
replicas: 1
|
||||
image: packet-service:latest
|
||||
port: 29000
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: game-packet-service
|
||||
port: 29000
|
||||
targetPort: 29000
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
- name: session-service
|
||||
replicas: 1
|
||||
image: session-service:latest
|
||||
port: 50055
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: session-service
|
||||
port: 50055
|
||||
targetPort: 50055
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
- name: world-service
|
||||
replicas: 1
|
||||
image: world-service:latest
|
||||
port: 50054
|
||||
tcp:
|
||||
enabled: true
|
||||
portName: world-service
|
||||
port: 50054
|
||||
targetPort: 50054
|
||||
protocol: TCP
|
||||
ingress:
|
||||
enabled: false
|
||||
|
||||
tests:
|
||||
enabled: false
|
||||
services:
|
||||
- name: api-service
|
||||
testCommand: ["curl", "-f", "http://api-service:8080/health"]
|
||||
image: curlimages/curl:latest
|
||||
@@ -61,7 +61,7 @@ impl CharacterRepository {
|
||||
self.cache
|
||||
.lock()
|
||||
.await
|
||||
.set(&cache_key, &character, 300)
|
||||
.set(&cache_key, &character, 0)
|
||||
.await
|
||||
.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||
Ok(character)
|
||||
@@ -164,7 +164,7 @@ impl CharacterRepository {
|
||||
self.cache
|
||||
.lock()
|
||||
.await
|
||||
.set(&cache_key, &characters, 300)
|
||||
.set(&cache_key, &characters, 0)
|
||||
.await
|
||||
.map_err(|_| sqlx::Error::RowNotFound)?;
|
||||
Ok(characters)
|
||||
|
||||
@@ -27,35 +27,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
|
||||
|
||||
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let redis_url =
|
||||
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
|
||||
let service_address =
|
||||
env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "grpc".to_string()];
|
||||
let meta = HashMap::new();
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50052),
|
||||
tags,
|
||||
meta,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
consul_registration::start_health_check(addr.as_str()).await?;
|
||||
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
|
||||
let full_addr = format!("{}:{}", &addr, port);
|
||||
let address = full_addr.parse().expect("Invalid address");
|
||||
@@ -83,9 +56,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
);
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
info!("service {} deregistered", service_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use tonic::{Code, Status};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::null_string::NullTerminatedString;
|
||||
use utils::service_discovery;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info};
|
||||
|
||||
pub(crate) async fn handle_alive_req(
|
||||
stream: &mut TcpStream,
|
||||
@@ -161,60 +162,59 @@ pub(crate) async fn handle_login_req(
|
||||
state.session_id = Some(response.session_id.parse().unwrap());
|
||||
}
|
||||
|
||||
let consul_url =
|
||||
env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let servers =
|
||||
service_discovery::get_service_address(&consul_url, "character-service")
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
warn!(err);
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
if servers.len() == 0 {
|
||||
let data = SrvLoginReply {
|
||||
result: srv_login_reply::Result::Failed,
|
||||
right: 0,
|
||||
type_: 0,
|
||||
servers_info: Vec::new(),
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut server_info: Vec<ServerInfo> = Vec::new();
|
||||
let mut id = 0;
|
||||
for server in servers {
|
||||
let mut name = server
|
||||
.ServiceMeta
|
||||
.get("name")
|
||||
.unwrap_or(&"".to_string())
|
||||
.clone();
|
||||
let is_test = server.ServiceTags.contains(&"test".to_string())
|
||||
|| server.ServiceTags.contains(&"staging".to_string());
|
||||
if is_test {
|
||||
name = format!("@{}", name);
|
||||
} else {
|
||||
name = format!(" {}", name);
|
||||
}
|
||||
server_info.push(ServerInfo {
|
||||
test: u8::from(is_test),
|
||||
name: NullTerminatedString::new(&name),
|
||||
id,
|
||||
});
|
||||
id = id + 1;
|
||||
}
|
||||
debug!("Server info: {:?}", server_info);
|
||||
let mut server_info: Vec<ServerInfo> = Vec::new();
|
||||
match get_service_info("default", "character-service").await {
|
||||
Ok(service_info) => {
|
||||
if let Some(annotations) = service_info.annotations {
|
||||
let mut server_name = "".to_string();
|
||||
let mut is_test = false;
|
||||
for (key, value) in annotations {
|
||||
match key.as_str() {
|
||||
"name" => {
|
||||
server_name = value;
|
||||
}
|
||||
"tags" => {
|
||||
|
||||
let data = SrvLoginReply {
|
||||
result: srv_login_reply::Result::Ok,
|
||||
right: 0,
|
||||
type_: 0,
|
||||
servers_info: server_info,
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if is_test {
|
||||
server_name = format!("@{}", server_name);
|
||||
} else {
|
||||
server_name = format!(" {}", server_name);
|
||||
}
|
||||
|
||||
server_info.push(ServerInfo {
|
||||
test: u8::from(is_test),
|
||||
name: NullTerminatedString::new(&server_name),
|
||||
id,
|
||||
});
|
||||
|
||||
let data = SrvLoginReply {
|
||||
result: srv_login_reply::Result::Ok,
|
||||
right: 0,
|
||||
type_: 0,
|
||||
servers_info: server_info,
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let data = SrvLoginReply {
|
||||
result: srv_login_reply::Result::Failed,
|
||||
right: 0,
|
||||
type_: 0,
|
||||
servers_info: Vec::new(),
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(status) => {
|
||||
@@ -300,52 +300,47 @@ pub(crate) async fn handle_channel_list_req(
|
||||
let request = CliChannelListReq::decode(packet.payload.as_slice());
|
||||
debug!("{:?}", request);
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let channels = service_discovery::get_service_address(&consul_url, "world-service")
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
warn!(err);
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
if channels.len() == 0 {
|
||||
let data = SrvChannelListReply {
|
||||
id: request?.server_id,
|
||||
channels: Vec::new(),
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug!("Server info: {:?}", channels);
|
||||
let mut channel_info: Vec<ChannelInfo> = Vec::new();
|
||||
let mut id = 1;
|
||||
for channel in channels {
|
||||
let name = format!(
|
||||
"{}",
|
||||
channel
|
||||
.ServiceMeta
|
||||
.get("name")
|
||||
.unwrap_or(&"".to_string())
|
||||
.clone()
|
||||
);
|
||||
channel_info.push(ChannelInfo {
|
||||
id: id,
|
||||
low_age: 0,
|
||||
high_age: 0,
|
||||
capacity: 0,
|
||||
name: NullTerminatedString::new(&name),
|
||||
});
|
||||
id = id + 1;
|
||||
}
|
||||
debug!("Channel info: {:?}", channel_info);
|
||||
let mut channel_info: Vec<ChannelInfo> = Vec::new();
|
||||
match get_service_info("default", "world-service").await {
|
||||
Ok(service_info) => {
|
||||
if let Some(annotations) = service_info.annotations {
|
||||
let mut server_name = "".to_string();
|
||||
for (key, value) in annotations {
|
||||
match key.as_str() {
|
||||
"name" => {
|
||||
server_name = value;
|
||||
}
|
||||
"tags" => {}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
channel_info.push(ChannelInfo {
|
||||
id: id,
|
||||
low_age: 0,
|
||||
high_age: 0,
|
||||
capacity: 0,
|
||||
name: NullTerminatedString::new(&server_name),
|
||||
});
|
||||
id = id + 1;
|
||||
|
||||
let data = SrvChannelListReply {
|
||||
id: request?.server_id,
|
||||
channels: channel_info,
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
let data = SrvChannelListReply {
|
||||
id: request?.server_id,
|
||||
channels: channel_info,
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let data = SrvChannelListReply {
|
||||
id: request?.server_id,
|
||||
channels: Vec::new(),
|
||||
};
|
||||
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
||||
send_packet(stream, &response_packet).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -292,9 +292,9 @@ pub(crate) async fn handle_select_char_req(
|
||||
|
||||
for item in items {
|
||||
if item.slot < MAX_VISIBLE_ITEMS as i32 {
|
||||
let slot = convert_type_to_body_part(item.slot) as usize - 2;
|
||||
let slot = convert_type_to_body_part(item.slot) as isize - 2;
|
||||
if slot >= 0 {
|
||||
equipped_item_list[slot] = EquippedItem {
|
||||
equipped_item_list[slot as usize] = EquippedItem {
|
||||
id: item.item_id as u16,
|
||||
gem_opt: item.gem_option as u16,
|
||||
socket: item.socket as i8,
|
||||
|
||||
@@ -21,7 +21,7 @@ use tokio::{select, signal};
|
||||
use tracing::Level;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::consul_registration;
|
||||
use utils::service_discovery::get_service_address;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
|
||||
use warp::Filter;
|
||||
|
||||
mod auth_client;
|
||||
@@ -69,50 +69,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string());
|
||||
let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string());
|
||||
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string());
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string());
|
||||
let service_address =
|
||||
env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
|
||||
let auth_node = get_service_address(&consul_url, "auth-service").await?;
|
||||
let character_node = get_service_address(&consul_url, "character-service").await?;
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "tcp".to_string()];
|
||||
let mut meta = HashMap::new();
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50052),
|
||||
tags,
|
||||
meta,
|
||||
Some("http"),
|
||||
Some(&health_check_url),
|
||||
)
|
||||
.await?;
|
||||
let auth_url = format!("http://{}",get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?.get(0).unwrap());
|
||||
let character_url = format!("http://{}",get_kube_service_endpoints_by_dns("character-service","tcp","character-service").await?.get(0).unwrap());
|
||||
|
||||
// Start health-check endpoint
|
||||
consul_registration::start_health_check(addr.as_str()).await?;
|
||||
|
||||
let auth_address = auth_node.get(0).unwrap();
|
||||
let auth_url = format!(
|
||||
"http://{}:{}",
|
||||
auth_address.ServiceAddress, auth_address.ServicePort
|
||||
);
|
||||
let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?));
|
||||
|
||||
let character_address = character_node.get(0).unwrap();
|
||||
let character_url = format!(
|
||||
"http://{}:{}",
|
||||
character_address.ServiceAddress, character_address.ServicePort
|
||||
);
|
||||
let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?));
|
||||
|
||||
let full_addr = format!("{}:{}", &addr, port);
|
||||
@@ -160,10 +123,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
prometheus_exporter::start(binding.parse().unwrap()).unwrap();
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
info!("service {} deregistered", service_name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
72
scripts/build_and_push.py
Normal file
72
scripts/build_and_push.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
# Define your images, tags, and Dockerfile paths
|
||||
images = ["api-service", "auth-service", "character-service", "database-service", "packet-service", "session-service", "world-service"]
|
||||
dockerfile_paths = [
|
||||
"../api-service/Dockerfile",
|
||||
"../auth-service/Dockerfile",
|
||||
"../character-service/Dockerfile",
|
||||
"../database-service/Dockerfile",
|
||||
"../packet-service/Dockerfile",
|
||||
"../session-service/Dockerfile",
|
||||
"../world-service/Dockerfile",
|
||||
]
|
||||
|
||||
common_tag = "latest"
|
||||
version_tag = "v0.1.1"
|
||||
image_tag_prefix = "gitea.azgstudio.com/raven/"
|
||||
build_context = "../"
|
||||
|
||||
def run_command(command):
|
||||
"""Run a shell command and handle errors."""
|
||||
try:
|
||||
subprocess.run(command, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error: Command '{' '.join(command)}' failed with exit code {e.returncode}")
|
||||
exit(1)
|
||||
|
||||
|
||||
def build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context):
|
||||
"""Build all Docker images."""
|
||||
for image, dockerfile_path in zip(images, dockerfile_paths):
|
||||
# Add the prefix to the image name
|
||||
full_image_name = f"{image_tag_prefix}{image}"
|
||||
|
||||
# Build the image with both tags
|
||||
print(f"Building {full_image_name}:{version_tag} and {full_image_name}:{common_tag} using Dockerfile at {dockerfile_path}...")
|
||||
run_command([
|
||||
"docker", "build",
|
||||
"-t", f"{full_image_name}:{version_tag}",
|
||||
"-t", f"{full_image_name}:{common_tag}",
|
||||
"-f", dockerfile_path,
|
||||
build_context
|
||||
])
|
||||
|
||||
|
||||
def push_images(images, common_tag, version_tag, image_tag_prefix):
|
||||
"""Push all Docker images."""
|
||||
for image in images:
|
||||
# Add the prefix to the image name
|
||||
full_image_name = f"{image_tag_prefix}{image}"
|
||||
|
||||
# Push both tags
|
||||
print(f"Pushing {full_image_name}:{version_tag}...")
|
||||
run_command(["docker", "push", f"{full_image_name}:{version_tag}"])
|
||||
|
||||
print(f"Pushing {full_image_name}:{common_tag}...")
|
||||
run_command(["docker", "push", f"{full_image_name}:{common_tag}"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
os.chdir(script_dir)
|
||||
|
||||
# Build all images first
|
||||
print("Starting the build phase...")
|
||||
build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context)
|
||||
|
||||
# Push all images after builds are complete
|
||||
print("Starting the push phase...")
|
||||
push_images(images, common_tag, version_tag, image_tag_prefix)
|
||||
|
||||
@@ -34,32 +34,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string());
|
||||
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
|
||||
let redis_url =
|
||||
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string());
|
||||
let service_address =
|
||||
env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "grpc".to_string()];
|
||||
let meta = HashMap::new();
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50055),
|
||||
tags,
|
||||
meta,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let full_addr = format!("{}:{}", &addr, port);
|
||||
let address = full_addr.parse().expect("Invalid address");
|
||||
@@ -77,9 +53,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
);
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -17,3 +17,5 @@ async-trait = "0.1.87"
|
||||
serde_json = "1.0.140"
|
||||
hickory-resolver = "0.24.4"
|
||||
rand = "0.8.5"
|
||||
kube = { version = "0.99.0", features = ["derive"] }
|
||||
k8s-openapi = { version = "0.24.0", features = ["v1_32"] }
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use hickory_resolver::config::*;
|
||||
use hickory_resolver::{Resolver, TokioAsyncResolver};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::log::debug;
|
||||
use kube::{Client, Api};
|
||||
use k8s_openapi::api::core::v1::Service;
|
||||
use std::collections::{BTreeMap};
|
||||
use hickory_resolver::system_conf::read_system_conf;
|
||||
|
||||
pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
|
||||
let mut rc = ResolverConfig::new();
|
||||
@@ -29,68 +29,92 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s
|
||||
Ok(endpoints)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ServiceNode {
|
||||
pub ServiceAddress: String,
|
||||
pub ServicePort: u16,
|
||||
pub ServiceTags: Vec<String>,
|
||||
pub ServiceMeta: HashMap<String, String>,
|
||||
pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
|
||||
let (config, options) = read_system_conf()?;
|
||||
let resolver = TokioAsyncResolver::tokio(config, options);
|
||||
|
||||
let srv_name = format!("_{}._{}._{}", port_name, service_protocol, service_name);
|
||||
let srv_record = resolver.srv_lookup(&srv_name).await?;
|
||||
|
||||
let mut endpoints = Vec::new();
|
||||
for record in srv_record {
|
||||
let hostname = record.target();
|
||||
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
|
||||
for response in lookup_responses {
|
||||
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(endpoints)
|
||||
}
|
||||
|
||||
pub async fn get_service_address(
|
||||
consul_url: &str,
|
||||
#[derive(Debug)]
|
||||
pub struct ServiceInfo {
|
||||
pub name: String,
|
||||
pub namespace: String,
|
||||
pub annotations: Option<BTreeMap<String, String>>,
|
||||
pub labels: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
pub async fn get_service_info(
|
||||
namespace: &str,
|
||||
service_name: &str,
|
||||
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
|
||||
) -> Result<ServiceInfo, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
let response = client.get(&consul_service_url).send().await?;
|
||||
// Create an API object for services in the specified namespace
|
||||
let services: Api<Service> = Api::namespaced(client, namespace);
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(format!(
|
||||
"Failed to fetch service nodes for '{}': {}",
|
||||
service_name,
|
||||
response.status()
|
||||
)
|
||||
.into());
|
||||
}
|
||||
// Get the service object
|
||||
let service = services.get(service_name).await?;
|
||||
|
||||
// Deserialize the response into a Vec<ServiceNode>
|
||||
let nodes: Vec<ServiceNode> = response.json().await?;
|
||||
// Extract metadata
|
||||
let name = service.metadata.name.unwrap_or_default();
|
||||
let namespace = service.metadata.namespace.unwrap_or_default();
|
||||
let annotations = service.metadata.annotations.clone();
|
||||
let labels = service.metadata.labels.clone();
|
||||
|
||||
if nodes.is_empty() {
|
||||
Err(format!("No nodes found for service '{}'", service_name).into())
|
||||
} else {
|
||||
Ok(nodes)
|
||||
}
|
||||
// Return the service info
|
||||
Ok(ServiceInfo {
|
||||
name,
|
||||
namespace,
|
||||
annotations,
|
||||
labels,
|
||||
})
|
||||
}
|
||||
async fn get_services_with_tag(
|
||||
service_name: &str,
|
||||
tag: &str,
|
||||
consul_url: &str,
|
||||
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
|
||||
let url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let response = client.get(&url).send().await?;
|
||||
pub async fn get_services_by_label(
|
||||
namespace: &str,
|
||||
label_selector: &str,
|
||||
) -> Result<Vec<ServiceInfo>, Box<dyn std::error::Error>> {
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(format!(
|
||||
"Failed to fetch service nodes for '{}': {}",
|
||||
service_name,
|
||||
response.status()
|
||||
)
|
||||
.into());
|
||||
// Create an API object for services in the specified namespace
|
||||
let services: Api<Service> = Api::namespaced(client, namespace);
|
||||
|
||||
// Use ListParams to filter services by label
|
||||
let list_params = kube::api::ListParams::default().labels(label_selector);
|
||||
|
||||
// List services that match the label selector
|
||||
let service_list = services.list(&list_params).await?;
|
||||
|
||||
// Convert the list of services into a vector of ServiceInfo
|
||||
let mut service_infos = Vec::new();
|
||||
for service in service_list.items {
|
||||
let name = service.metadata.name.clone().unwrap_or_default();
|
||||
let namespace = service.metadata.namespace.clone().unwrap_or_default();
|
||||
|
||||
// Convert BTreeMap to HashMap for annotations and labels
|
||||
let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect());
|
||||
let labels = service.metadata.labels.map(|btree| btree.into_iter().collect());
|
||||
|
||||
service_infos.push(ServiceInfo {
|
||||
name,
|
||||
namespace,
|
||||
annotations,
|
||||
labels,
|
||||
});
|
||||
}
|
||||
|
||||
// Deserialize the response into a Vec<ServiceNode>
|
||||
let nodes: Vec<ServiceNode> = response.json().await?;
|
||||
|
||||
// Filter nodes that include the specified tag
|
||||
let filtered_nodes = nodes
|
||||
.into_iter()
|
||||
.filter(|node| node.ServiceTags.contains(&tag.to_string()))
|
||||
.collect();
|
||||
|
||||
Ok(filtered_nodes)
|
||||
Ok(service_infos)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::env;
|
||||
use std::str::FromStr;
|
||||
use tracing::{debug, Level};
|
||||
use utils::consul_registration;
|
||||
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
@@ -19,54 +19,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("WORLD_SERVICE_PORT").unwrap_or_else(|_| "50054".to_string());
|
||||
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string());
|
||||
|
||||
// let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
|
||||
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
|
||||
let consul_url = format!("http://{}:{}", consul_address, consul_port);
|
||||
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string());
|
||||
let service_address =
|
||||
env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let service_port = port.clone();
|
||||
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
|
||||
let db_nodes = get_service_address(&consul_url, "database-service").await?;
|
||||
|
||||
let temp_db_nodes = get_service_endpoints_by_dns(format!("{}:{}", consul_address, consul_dns_port).as_str(), "grpc", "database-service").await?;
|
||||
debug!("{:?}", temp_db_nodes);
|
||||
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
|
||||
|
||||
// Register service with Consul
|
||||
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
|
||||
let version = env!("CARGO_PKG_VERSION").to_string();
|
||||
let tags = vec![version, "grpc".to_string()];
|
||||
let mut meta = HashMap::new();
|
||||
meta.insert("name".to_string(), "Athena".to_string());
|
||||
consul_registration::register_service(
|
||||
&consul_url,
|
||||
service_id.as_str(),
|
||||
service_name.as_str(),
|
||||
service_address.as_str(),
|
||||
service_port.parse().unwrap_or(50054),
|
||||
tags,
|
||||
meta,
|
||||
Some("http"),
|
||||
Some(&health_check_url),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Start health-check endpoint
|
||||
consul_registration::start_health_check(addr.as_str()).await?;
|
||||
|
||||
let db_address = db_nodes.get(0).unwrap();
|
||||
let db_url = format!(
|
||||
"http://{}:{}",
|
||||
db_address.ServiceAddress, db_address.ServicePort
|
||||
);
|
||||
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
consul_registration::deregister_service(&consul_url, service_id.as_str())
|
||||
.await
|
||||
.expect("");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user