Compare commits

...

6 Commits

Author SHA256 Message Date
f4bc414ebd - update: code update to use kube api instead of consul 2025-03-18 02:00:11 -04:00
4734b7560a Merge branch 'consul_services_update' into helm 2025-03-18 00:12:25 -04:00
e28219c8b7 - update: auto generate the database url using the saved secret 2025-03-18 00:11:08 -04:00
eebf5c58e0 - add: helm chart
- add: python script for building and pushing the containers
2025-03-17 23:59:33 -04:00
6a8ea2521a - add: logout route to api service 2025-03-17 21:52:12 -04:00
f353a73658 - update: characters cache key lifetime 2025-03-17 21:51:09 -04:00
23 changed files with 640 additions and 436 deletions

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{info, Level};
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
mod axum_gateway;
@@ -24,41 +24,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("API_SERVICE_PORT").unwrap_or_else(|_| "8080".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8079".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "api-service".to_string());
let service_address = env::var("API_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(8080),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let auth_node = get_service_address(&consul_url, "auth-service").await?;
let auth_node = get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?;
let auth_address = auth_node.get(0).unwrap();
let auth_service_address = format!(
"http://{}:{}",
auth_address.ServiceAddress, auth_address.ServicePort
);
let auth_service_address = format!("http://{}", auth_address);
// Connect to the gRPC auth-service
let grpc_client = AuthServiceClient::connect(auth_service_address.to_string()).await?;
@@ -69,10 +41,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(axum_gateway::serve_rest_api(grpc_client));
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -12,7 +12,7 @@ use tonic::transport::Server;
use tracing::{debug, info, Level};
use utils::consul_registration;
use utils::multi_service_load_balancer::{LoadBalancingStrategy, MultiServiceLoadBalancer};
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -29,59 +29,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string());
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
let consul_url = format!("http://{}:{}", consul_address, consul_port);
let consul_dns_url = format!("{}:{}", consul_address, consul_dns_port);
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string());
let service_address = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let lb = MultiServiceLoadBalancer::new(&consul_dns_url, LoadBalancingStrategy::RoundRobin);
let mut db_url = "".to_string();
match lb.get_endpoint("database-service", "grpc").await? {
Some(endpoint) => {
db_url = format!("http://{}", endpoint);
},
None => {
println!("No endpoints available for database-service");
}
}
let mut session_service_address = "".to_string();
match lb.get_endpoint("session-service", "grpc").await? {
Some(endpoint) => {
session_service_address = format!("http://{}", endpoint);
},
None => {
println!("No endpoints available for session-service");
}
}
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
let session_service_address = format!("http://{}",get_kube_service_endpoints_by_dns("session-service","tcp","session-service").await?.get(0).unwrap());
let db_client = Arc::new(DatabaseClient::connect(&db_url).await?);
let session_client = Arc::new(SessionServiceClient::connect(session_service_address).await?);
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
let auth_service = MyAuthService {
@@ -103,10 +56,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -14,7 +14,7 @@ use std::str::FromStr;
use std::sync::Arc;
use tracing::Level;
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -29,40 +29,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("CHARACTER_SERVICE_PORT").unwrap_or_else(|_| "50053".to_string());
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "character-service".to_string());
let service_address =
env::var("CHARACTER_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let db_nodes = get_service_address(&consul_url, "database-service").await?;
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let mut meta = HashMap::new();
meta.insert("name".to_string(), "Rose".to_string());
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
let db_address = db_nodes.get(0).unwrap();
let db_url = format!(
"http://{}:{}",
db_address.ServiceAddress, db_address.ServicePort
);
let character_db_client = Arc::new(CharacterDbClient::connect(&db_url).await?);
let character_service = MyCharacterService {
character_db_client,
@@ -77,9 +48,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,24 @@
apiVersion: v2
name: osirose-new
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.1.0"

View File

@@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "osirose-new.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "osirose-new.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "osirose-new.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "osirose-new.labels" -}}
helm.sh/chart: {{ include "osirose-new.chart" . }}
{{ include "osirose-new.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "osirose-new.selectorLabels" -}}
app.kubernetes.io/name: {{ include "osirose-new.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "osirose-new.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "osirose-new.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,14 @@
{{- range .Values.services }}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .name }}-env
data:
{{- range $key, $value := $.Values.global.env }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- range $key, $value := .env }}
{{ $key }}: "{{ $value }}"
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,38 @@
{{- range .Values.services }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .name }}
labels:
app: {{ .name }}
spec:
replicas: {{ .replicas }}
selector:
matchLabels:
app: {{ .name }}
template:
metadata:
labels:
app: {{ .name }}
spec:
containers:
- name: {{ .name }}
image: "{{ $.Values.repository }}/{{ .image }}"
ports:
- containerPort: {{ .port }}
env:
- name: DATABASE_URL
value: "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@postgres:5432/$(POSTGRES_DB)"
envFrom:
- configMapRef:
name: {{ .name }}-env
- secretRef:
name: postgres-secrets
volumeMounts:
- name: service-ids
mountPath: /services
volumes:
- name: service-ids
emptyDir: {}
---
{{- end }}

View File

@@ -0,0 +1,33 @@
{{- if .Values.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "osirose-new.fullname" . }}
labels:
{{- include "osirose-new.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "osirose-new.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,25 @@
{{- range .Values.services }}
{{- if .ingress.enabled }}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ .name }}-ingress
labels:
app: {{ .name }}
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: {{ .ingress.hostname }}
http:
paths:
- path: {{ .ingress.path }}
pathType: Prefix
backend:
service:
name: {{ .name }}
port:
number: {{ .port }}
---
{{- end }}
{{- end }}

View File

@@ -0,0 +1,20 @@
{{- range .Values.services }}
{{- if .tcp.enabled }}
apiVersion: v1
kind: Service
metadata:
name: {{ .name }}
labels:
app: {{ .name }}
spec:
type: {{ .tcp.type | default "ClusterIP" }}
ports:
- name: {{ .tcp.portName | default "tcp" }}
port: {{ .tcp.port }}
targetPort: {{ .tcp.targetPort | default .tcp.port }}
protocol: {{ .tcp.protocol | default "TCP" }}
selector:
app: {{ .name }}
{{- end }}
---
{{- end }}

View File

@@ -0,0 +1,20 @@
{{- if .Values.tests.enabled }}
{{- range .Values.tests.services }}
apiVersion: v1
kind: Pod
metadata:
name: {{ .name }}-test
annotations:
"helm.sh/hook": test
"helm.sh/hook-delete-policy": before-hook-creation
labels:
app: {{ .name }}
spec:
containers:
- name: {{ .name }}-test
image: {{ .image }}
command: {{ .testCommand }}
restartPolicy: Never
---
{{- end }}
{{- end }}

View File

@@ -0,0 +1,115 @@
repository: gitea.azgstudio.com/raven
autoscaling:
enabled: false
global:
env:
LOG_LEVEL: "debug"
APP_ENV: "dev"
DATABASE_URL: "" # This is a placeholder. Will be dynamically constructed
REDIS_URL: "redis://valkey:6379/0"
LISTEN_ADDR: "0.0.0.0"
services:
- name: api-service
replicas: 1
image: api-service:latest
port: 8080
tcp:
enabled: true
portName: api-service
port: 8080
targetPort: 8080
protocol: TCP
ingress:
enabled: true
hostname: game-api.azgstudio.com
path: "/"
port: 8080
- name: auth-service
replicas: 1
image: auth-service:latest
port: 50051
tcp:
enabled: true
portName: auth-service
port: 50051
targetPort: 50051
protocol: TCP
ingress:
enabled: false
- name: character-service
replicas: 1
image: character-service:latest
port: 50053
tcp:
enabled: true
portName: character-service
port: 50053
targetPort: 50053
protocol: TCP
ingress:
enabled: false
- name: database-service
replicas: 1
image: database-service:latest
port: 50052
tcp:
enabled: true
portName: database-service
port: 50052
targetPort: 50052
protocol: TCP
ingress:
enabled: false
- name: packet-service
replicas: 1
image: packet-service:latest
port: 29000
tcp:
enabled: true
portName: game-packet-service
port: 29000
targetPort: 29000
protocol: TCP
ingress:
enabled: false
- name: session-service
replicas: 1
image: session-service:latest
port: 50055
tcp:
enabled: true
portName: session-service
port: 50055
targetPort: 50055
protocol: TCP
ingress:
enabled: false
- name: world-service
replicas: 1
image: world-service:latest
port: 50054
tcp:
enabled: true
portName: world-service
port: 50054
targetPort: 50054
protocol: TCP
ingress:
enabled: false
tests:
enabled: false
services:
- name: api-service
testCommand: ["curl", "-f", "http://api-service:8080/health"]
image: curlimages/curl:latest

View File

@@ -61,7 +61,7 @@ impl CharacterRepository {
self.cache
.lock()
.await
.set(&cache_key, &character, 300)
.set(&cache_key, &character, 0)
.await
.map_err(|_| sqlx::Error::RowNotFound)?;
Ok(character)
@@ -164,7 +164,7 @@ impl CharacterRepository {
self.cache
.lock()
.await
.set(&cache_key, &characters, 300)
.set(&cache_key, &characters, 0)
.await
.map_err(|_| sqlx::Error::RowNotFound)?;
Ok(characters)

View File

@@ -27,35 +27,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string());
let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string());
let service_address =
env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
None,
None,
)
.await?;
consul_registration::start_health_check(addr.as_str()).await?;
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
@@ -83,9 +56,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

View File

@@ -25,6 +25,7 @@ use tonic::{Code, Status};
use tracing::{debug, error, info, warn};
use utils::null_string::NullTerminatedString;
use utils::service_discovery;
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info};
pub(crate) async fn handle_alive_req(
stream: &mut TcpStream,
@@ -161,60 +162,59 @@ pub(crate) async fn handle_login_req(
state.session_id = Some(response.session_id.parse().unwrap());
}
let consul_url =
env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let servers =
service_discovery::get_service_address(&consul_url, "character-service")
.await
.unwrap_or_else(|err| {
warn!(err);
Vec::new()
});
if servers.len() == 0 {
let data = SrvLoginReply {
result: srv_login_reply::Result::Failed,
right: 0,
type_: 0,
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
let mut server_info: Vec<ServerInfo> = Vec::new();
let mut id = 0;
for server in servers {
let mut name = server
.ServiceMeta
.get("name")
.unwrap_or(&"".to_string())
.clone();
let is_test = server.ServiceTags.contains(&"test".to_string())
|| server.ServiceTags.contains(&"staging".to_string());
if is_test {
name = format!("@{}", name);
} else {
name = format!(" {}", name);
}
server_info.push(ServerInfo {
test: u8::from(is_test),
name: NullTerminatedString::new(&name),
id,
});
id = id + 1;
}
debug!("Server info: {:?}", server_info);
let mut server_info: Vec<ServerInfo> = Vec::new();
match get_service_info("default", "character-service").await {
Ok(service_info) => {
if let Some(annotations) = service_info.annotations {
let mut server_name = "".to_string();
let mut is_test = false;
for (key, value) in annotations {
match key.as_str() {
"name" => {
server_name = value;
}
"tags" => {
let data = SrvLoginReply {
result: srv_login_reply::Result::Ok,
right: 0,
type_: 0,
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
}
_ => {}
}
}
if is_test {
server_name = format!("@{}", server_name);
} else {
server_name = format!(" {}", server_name);
}
server_info.push(ServerInfo {
test: u8::from(is_test),
name: NullTerminatedString::new(&server_name),
id,
});
let data = SrvLoginReply {
result: srv_login_reply::Result::Ok,
right: 0,
type_: 0,
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
}
}
Err(err) => {
let data = SrvLoginReply {
result: srv_login_reply::Result::Failed,
right: 0,
type_: 0,
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
}
}
}
Err(status) => {
@@ -300,52 +300,47 @@ pub(crate) async fn handle_channel_list_req(
let request = CliChannelListReq::decode(packet.payload.as_slice());
debug!("{:?}", request);
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let channels = service_discovery::get_service_address(&consul_url, "world-service")
.await
.unwrap_or_else(|err| {
warn!(err);
Vec::new()
});
if channels.len() == 0 {
let data = SrvChannelListReply {
id: request?.server_id,
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
debug!("Server info: {:?}", channels);
let mut channel_info: Vec<ChannelInfo> = Vec::new();
let mut id = 1;
for channel in channels {
let name = format!(
"{}",
channel
.ServiceMeta
.get("name")
.unwrap_or(&"".to_string())
.clone()
);
channel_info.push(ChannelInfo {
id: id,
low_age: 0,
high_age: 0,
capacity: 0,
name: NullTerminatedString::new(&name),
});
id = id + 1;
}
debug!("Channel info: {:?}", channel_info);
let mut channel_info: Vec<ChannelInfo> = Vec::new();
match get_service_info("default", "world-service").await {
Ok(service_info) => {
if let Some(annotations) = service_info.annotations {
let mut server_name = "".to_string();
for (key, value) in annotations {
match key.as_str() {
"name" => {
server_name = value;
}
"tags" => {}
_ => {}
}
}
channel_info.push(ChannelInfo {
id: id,
low_age: 0,
high_age: 0,
capacity: 0,
name: NullTerminatedString::new(&server_name),
});
id = id + 1;
let data = SrvChannelListReply {
id: request?.server_id,
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
let data = SrvChannelListReply {
id: request?.server_id,
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
}
}
Err(err) => {
let data = SrvChannelListReply {
id: request?.server_id,
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
return Ok(());
}
}
Ok(())
}

View File

@@ -292,9 +292,9 @@ pub(crate) async fn handle_select_char_req(
for item in items {
if item.slot < MAX_VISIBLE_ITEMS as i32 {
let slot = convert_type_to_body_part(item.slot) as usize - 2;
let slot = convert_type_to_body_part(item.slot) as isize - 2;
if slot >= 0 {
equipped_item_list[slot] = EquippedItem {
equipped_item_list[slot as usize] = EquippedItem {
id: item.item_id as u16,
gem_opt: item.gem_option as u16,
socket: item.socket as i8,

View File

@@ -21,7 +21,7 @@ use tokio::{select, signal};
use tracing::Level;
use tracing::{debug, error, info, warn};
use utils::consul_registration;
use utils::service_discovery::get_service_address;
use utils::service_discovery::{get_kube_service_endpoints_by_dns};
use warp::Filter;
mod auth_client;
@@ -69,50 +69,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string());
let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string());
let service_address =
env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
let auth_node = get_service_address(&consul_url, "auth-service").await?;
let character_node = get_service_address(&consul_url, "character-service").await?;
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "tcp".to_string()];
let mut meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50052),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
let auth_url = format!("http://{}",get_kube_service_endpoints_by_dns("auth-service","tcp","auth-service").await?.get(0).unwrap());
let character_url = format!("http://{}",get_kube_service_endpoints_by_dns("character-service","tcp","character-service").await?.get(0).unwrap());
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let auth_address = auth_node.get(0).unwrap();
let auth_url = format!(
"http://{}:{}",
auth_address.ServiceAddress, auth_address.ServicePort
);
let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?));
let character_address = character_node.get(0).unwrap();
let character_url = format!(
"http://{}:{}",
character_address.ServiceAddress, character_address.ServicePort
);
let character_client = Arc::new(Mutex::new(CharacterClient::connect(&character_url).await?));
let full_addr = format!("{}:{}", &addr, port);
@@ -160,10 +123,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
prometheus_exporter::start(binding.parse().unwrap()).unwrap();
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
info!("service {} deregistered", service_name);
Ok(())
}

72
scripts/build_and_push.py Normal file
View File

@@ -0,0 +1,72 @@
import subprocess
import os
# Define your images, tags, and Dockerfile paths
images = ["api-service", "auth-service", "character-service", "database-service", "packet-service", "session-service", "world-service"]
dockerfile_paths = [
"../api-service/Dockerfile",
"../auth-service/Dockerfile",
"../character-service/Dockerfile",
"../database-service/Dockerfile",
"../packet-service/Dockerfile",
"../session-service/Dockerfile",
"../world-service/Dockerfile",
]
common_tag = "latest"
version_tag = "v0.1.1"
image_tag_prefix = "gitea.azgstudio.com/raven/"
build_context = "../"
def run_command(command):
"""Run a shell command and handle errors."""
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"Error: Command '{' '.join(command)}' failed with exit code {e.returncode}")
exit(1)
def build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context):
"""Build all Docker images."""
for image, dockerfile_path in zip(images, dockerfile_paths):
# Add the prefix to the image name
full_image_name = f"{image_tag_prefix}{image}"
# Build the image with both tags
print(f"Building {full_image_name}:{version_tag} and {full_image_name}:{common_tag} using Dockerfile at {dockerfile_path}...")
run_command([
"docker", "build",
"-t", f"{full_image_name}:{version_tag}",
"-t", f"{full_image_name}:{common_tag}",
"-f", dockerfile_path,
build_context
])
def push_images(images, common_tag, version_tag, image_tag_prefix):
"""Push all Docker images."""
for image in images:
# Add the prefix to the image name
full_image_name = f"{image_tag_prefix}{image}"
# Push both tags
print(f"Pushing {full_image_name}:{version_tag}...")
run_command(["docker", "push", f"{full_image_name}:{version_tag}"])
print(f"Pushing {full_image_name}:{common_tag}...")
run_command(["docker", "push", f"{full_image_name}:{common_tag}"])
if __name__ == "__main__":
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
# Build all images first
print("Starting the build phase...")
build_images(images, dockerfile_paths, common_tag, version_tag, image_tag_prefix, build_context)
# Push all images after builds are complete
print("Starting the push phase...")
push_images(images, common_tag, version_tag, image_tag_prefix)

View File

@@ -34,32 +34,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("SESSION_SERVICE_PORT").unwrap_or_else(|_| "50055".to_string());
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "session-service".to_string());
let service_address =
env::var("SESSION_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let meta = HashMap::new();
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50055),
tags,
meta,
None,
None,
)
.await?;
let full_addr = format!("{}:{}", &addr, port);
let address = full_addr.parse().expect("Invalid address");
@@ -77,9 +53,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}

View File

@@ -17,3 +17,5 @@ async-trait = "0.1.87"
serde_json = "1.0.140"
hickory-resolver = "0.24.4"
rand = "0.8.5"
kube = { version = "0.99.0", features = ["derive"] }
k8s-openapi = { version = "0.24.0", features = ["v1_32"] }

View File

@@ -1,11 +1,11 @@
use hickory_resolver::config::*;
use hickory_resolver::{Resolver, TokioAsyncResolver};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::runtime::Runtime;
use tracing::log::debug;
use kube::{Client, Api};
use k8s_openapi::api::core::v1::Service;
use std::collections::{BTreeMap};
use hickory_resolver::system_conf::read_system_conf;
pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let mut rc = ResolverConfig::new();
@@ -29,68 +29,92 @@ pub async fn get_service_endpoints_by_dns(consul_url: &str, service_protocol: &s
Ok(endpoints)
}
#[derive(Debug, Deserialize)]
pub struct ServiceNode {
pub ServiceAddress: String,
pub ServicePort: u16,
pub ServiceTags: Vec<String>,
pub ServiceMeta: HashMap<String, String>,
pub async fn get_kube_service_endpoints_by_dns(port_name: &str, service_protocol: &str, service_name: &str) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
let (config, options) = read_system_conf()?;
let resolver = TokioAsyncResolver::tokio(config, options);
let srv_name = format!("_{}._{}._{}", port_name, service_protocol, service_name);
let srv_record = resolver.srv_lookup(&srv_name).await?;
let mut endpoints = Vec::new();
for record in srv_record {
let hostname = record.target();
let lookup_responses = resolver.lookup_ip(hostname.to_string()).await?;
for response in lookup_responses {
endpoints.push(SocketAddr::from_str(&format!("{}:{}", &response.to_string(), record.port()))?);
}
}
Ok(endpoints)
}
pub async fn get_service_address(
consul_url: &str,
#[derive(Debug)]
pub struct ServiceInfo {
pub name: String,
pub namespace: String,
pub annotations: Option<BTreeMap<String, String>>,
pub labels: Option<BTreeMap<String, String>>,
}
pub async fn get_service_info(
namespace: &str,
service_name: &str,
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let consul_service_url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
) -> Result<ServiceInfo, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::try_default().await?;
let response = client.get(&consul_service_url).send().await?;
// Create an API object for services in the specified namespace
let services: Api<Service> = Api::namespaced(client, namespace);
if !response.status().is_success() {
return Err(format!(
"Failed to fetch service nodes for '{}': {}",
service_name,
response.status()
)
.into());
}
// Get the service object
let service = services.get(service_name).await?;
// Deserialize the response into a Vec<ServiceNode>
let nodes: Vec<ServiceNode> = response.json().await?;
// Extract metadata
let name = service.metadata.name.unwrap_or_default();
let namespace = service.metadata.namespace.unwrap_or_default();
let annotations = service.metadata.annotations.clone();
let labels = service.metadata.labels.clone();
if nodes.is_empty() {
Err(format!("No nodes found for service '{}'", service_name).into())
} else {
Ok(nodes)
}
// Return the service info
Ok(ServiceInfo {
name,
namespace,
annotations,
labels,
})
}
async fn get_services_with_tag(
service_name: &str,
tag: &str,
consul_url: &str,
) -> Result<Vec<ServiceNode>, Box<dyn std::error::Error>> {
let url = format!("{}/v1/catalog/service/{}", consul_url, service_name);
let client = reqwest::Client::new();
let response = client.get(&url).send().await?;
pub async fn get_services_by_label(
namespace: &str,
label_selector: &str,
) -> Result<Vec<ServiceInfo>, Box<dyn std::error::Error>> {
let client = Client::try_default().await?;
if !response.status().is_success() {
return Err(format!(
"Failed to fetch service nodes for '{}': {}",
service_name,
response.status()
)
.into());
// Create an API object for services in the specified namespace
let services: Api<Service> = Api::namespaced(client, namespace);
// Use ListParams to filter services by label
let list_params = kube::api::ListParams::default().labels(label_selector);
// List services that match the label selector
let service_list = services.list(&list_params).await?;
// Convert the list of services into a vector of ServiceInfo
let mut service_infos = Vec::new();
for service in service_list.items {
let name = service.metadata.name.clone().unwrap_or_default();
let namespace = service.metadata.namespace.clone().unwrap_or_default();
// Convert BTreeMap to HashMap for annotations and labels
let annotations = service.metadata.annotations.map(|btree| btree.into_iter().collect());
let labels = service.metadata.labels.map(|btree| btree.into_iter().collect());
service_infos.push(ServiceInfo {
name,
namespace,
annotations,
labels,
});
}
// Deserialize the response into a Vec<ServiceNode>
let nodes: Vec<ServiceNode> = response.json().await?;
// Filter nodes that include the specified tag
let filtered_nodes = nodes
.into_iter()
.filter(|node| node.ServiceTags.contains(&tag.to_string()))
.collect();
Ok(filtered_nodes)
Ok(service_infos)
}

View File

@@ -4,7 +4,7 @@ use std::env;
use std::str::FromStr;
use tracing::{debug, Level};
use utils::consul_registration;
use utils::service_discovery::{get_service_address, get_service_endpoints_by_dns};
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -19,54 +19,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("WORLD_SERVICE_PORT").unwrap_or_else(|_| "50054".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8084".to_string());
// let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
let consul_address = env::var("CONSUL_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
let consul_port = env::var("CONSUL_PORT").unwrap_or_else(|_| "8500".to_string());
let consul_dns_port = env::var("CONSUL_DNS_PORT").unwrap_or_else(|_| "8600".to_string());
let consul_url = format!("http://{}:{}", consul_address, consul_port);
let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "world-service".to_string());
let service_address =
env::var("WORLD_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let service_port = port.clone();
let health_check_url = format!("http://{}:{}/health", service_address, health_port);
let db_nodes = get_service_address(&consul_url, "database-service").await?;
let temp_db_nodes = get_service_endpoints_by_dns(format!("{}:{}", consul_address, consul_dns_port).as_str(), "grpc", "database-service").await?;
debug!("{:?}", temp_db_nodes);
let db_url = format!("http://{}",get_kube_service_endpoints_by_dns("database-service","tcp","database-service").await?.get(0).unwrap());
// Register service with Consul
let service_id = consul_registration::get_or_generate_service_id(env!("CARGO_PKG_NAME"));
let version = env!("CARGO_PKG_VERSION").to_string();
let tags = vec![version, "grpc".to_string()];
let mut meta = HashMap::new();
meta.insert("name".to_string(), "Athena".to_string());
consul_registration::register_service(
&consul_url,
service_id.as_str(),
service_name.as_str(),
service_address.as_str(),
service_port.parse().unwrap_or(50054),
tags,
meta,
Some("http"),
Some(&health_check_url),
)
.await?;
// Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?;
let db_address = db_nodes.get(0).unwrap();
let db_url = format!(
"http://{}:{}",
db_address.ServiceAddress, db_address.ServicePort
);
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())
.await
.expect("");
Ok(())
}