Updated world service to become a manager of game logic instances for each map.

This commit is contained in:
2025-06-08 16:04:50 -04:00
parent 9088c04bc8
commit 6f18b53913
5 changed files with 302 additions and 9 deletions

52
proto/game_logic.proto Normal file
View File

@@ -0,0 +1,52 @@
syntax = "proto3";
package game_logic;
service GameLogicService {
rpc GetCharacter(CharacterRequest) returns (CharacterResponse);
rpc MoveCharacter(CharacterMoveRequest) returns (CharacterMoveResponse);
rpc GetTargetHp(ObjectHpRequest) returns (ObjectHpResponse);
}
message CharacterRequest {
string token = 1;
string user_id = 2;
string char_id = 3;
string session_id = 4;
}
message CharacterResponse {
int32 count = 1;
}
message CharacterMoveRequest {
string session_id = 1;
uint32 target_id = 2;
float x = 3;
float y = 4;
float z = 5;
}
message CharacterMoveResponse {
int32 id = 1;
int32 target_id = 2;
int32 distance = 3;
float x = 4;
float y = 5;
float z = 6;
}
message AttackRequest {
string session_id = 1;
uint32 target_id = 2;
}
message ObjectHpRequest {
string session_id = 1;
uint32 target_id = 2;
}
message ObjectHpResponse {
uint32 target_id = 1;
int32 hp = 2;
}

View File

@@ -7,13 +7,10 @@ edition = "2021"
utils = { path = "../utils" }
dotenv = "0.15"
tokio = { version = "1.41.1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "chrono"] }
tonic = "0.12.3"
prost = "0.13.4"
warp = "0.3.7"
tonic-health = "0.12.3"
serde_json = "1.0.140"
kube = { version = "1.1.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.25.0", features = ["latest"] }
tracing = "0.1.41"
[build-dependencies]
tonic-build = "0.12.3"
tonic-build = "0.12.3"

View File

@@ -11,6 +11,6 @@ fn main() {
tonic_build::configure()
.build_server(false) // Generate gRPC client code
.compile_well_known_types(true)
.compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto"], &["../proto"])
.compile_protos(&["../proto/user_db_api.proto", "../proto/auth.proto", "../proto/character.proto", "../proto/character_common.proto", "../proto/chat.proto", "../proto/game_logic.proto"], &["../proto"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}

View File

@@ -0,0 +1,174 @@
use kube::{
api::{Api, PostParams, DeleteParams},
Client,
};
use k8s_openapi::api::core::v1::Pod;
use serde_json::json;
use std::error::Error;
use tokio::time::{sleep, Duration, Instant};
/// Struct representing connection info for a game-logic instance.
#[derive(Debug)]
pub struct ConnectionInfo {
pub ip: String,
pub port: u16,
}
/// The `K8sOrchestrator` struct wraps a Kubernetes client and the
/// namespace where your game logic instances will be created.
pub struct K8sOrchestrator {
client: Client,
namespace: String,
}
impl K8sOrchestrator {
/// Creates a new orchestrator for the given namespace.
pub async fn new(namespace: &str) -> Result<Self, Box<dyn Error>> {
let client = Client::try_default().await?;
Ok(Self {
client,
namespace: namespace.to_string(),
})
}
/// Creates a new game-logic Pod with the given `instance_name` and container `image`.
/// Adjust the pod manifest as needed for your game-logic container.
pub async fn create_game_logic_instance(
&self,
instance_name: &str,
image: &str,
map_id: u32,
) -> Result<Pod, Box<dyn Error>> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
// Define the pod manifest for the new game-logic instance.
let map_id_str = map_id.to_string();
let pod_manifest = json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": instance_name,
"labels": {
"app": "game-logic",
"map_id": map_id_str
}
},
"spec": {
"containers": [{
"name": "game-logic",
"image": image,
"ports": [{
"containerPort": 50056,
"name": "grpc"
}],
"env": [{
"name": "MAP_ID",
"value": map_id_str
}],
"volumeMounts": [{
"name": "game-data",
"mountPath": "/opt/data",
"readOnly": true
}]
}],
"volumes": [{
"name": "game-data",
"persistentVolumeClaim": {
"claimName": "game-data-pvc"
}
}]
}
});
// Deserialize the JSON manifest into a Pod struct.
let pod: Pod = serde_json::from_value(pod_manifest)?;
// Create the Pod in Kubernetes.
let created_pod = pods.create(&PostParams::default(), &pod).await?;
Ok(created_pod)
}
/// Retrieves the updated Pod object for a given instance name.
pub async fn get_instance(&self, instance_name: &str)
-> Result<Pod, Box<dyn Error>> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
let pod = pods.get(instance_name).await?;
Ok(pod)
}
/// Checks the status of the game-logic Pod and returns its gRPC connection info.
/// It attempts to determine the port from the pod's container spec (searching for a port
/// named "grpc"). If not found, it falls back to the default port 50051.
pub async fn get_connection_info(&self, instance_name: &str)
-> Result<Option<ConnectionInfo>, Box<dyn Error>>
{
let pod = self.get_instance(instance_name).await?;
if let Some(status) = pod.status {
if let Some(pod_ip) = status.pod_ip {
// Try to extract the container port dynamically.
if let Some(spec) = pod.spec {
if let Some(container) = spec.containers.first() {
if let Some(ports) = &container.ports {
// Look for a port with the name "grpc"
if let Some(grpc_port) = ports.iter().find(|p| {
p.name.as_ref().map_or(false, |n| n == "grpc")
}) {
return Ok(Some(ConnectionInfo {
ip: pod_ip,
port: grpc_port.container_port as u16,
}));
}
// Or use the first container port if no named port was found.
if let Some(first_port) = ports.first() {
return Ok(Some(ConnectionInfo {
ip: pod_ip,
port: first_port.container_port as u16,
}));
}
}
}
}
// Use fallback port if no port information is available.
return Ok(Some(ConnectionInfo { ip: pod_ip, port: 50051 }));
}
}
Ok(None)
}
/// Polls for connection info until successful or a timeout is reached.
/// `timeout_secs` specifies the maximum time in seconds to wait.
pub async fn poll_connection_info(
&self,
instance_name: &str,
timeout_secs: u64,
) -> Result<ConnectionInfo, Box<dyn Error>> {
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
// Poll every 2 seconds
let poll_interval = Duration::from_secs(2);
loop {
if start.elapsed() > timeout {
return Err(format!(
"Timeout reached while polling connection info for pod {}",
instance_name
)
.into());
}
if let Some(conn_info) = self.get_connection_info(instance_name).await? {
return Ok(conn_info);
}
sleep(poll_interval).await;
}
}
/// Shuts down (deletes) the game-logic Pod with the given name.
pub async fn shutdown_instance(&self, instance_name: &str)
-> Result<(), Box<dyn Error>> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
// DeleteParams::default() is sufficient for a forceful deletion.
pods.delete(instance_name, &DeleteParams::default()).await?;
Ok(())
}
}

View File

@@ -1,13 +1,77 @@
mod k8s_orchestrator;
use dotenv::dotenv;
use std::env;
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
use utils::{health_check, logging};
use tracing::{debug, error, info, warn};
use crate::k8s_orchestrator::K8sOrchestrator;
fn get_service_name() -> String {
env::var("WORLD_SERVICE_NAME").unwrap_or_else(|_| "default-service".to_string())
}
fn get_map_ids() -> Vec<u32> {
// Get the `MAP_IDS` environment variable, such as "42,43,44,45"
let map_ids_str = env::var("MAP_IDS").unwrap_or_default();
// Split the string by commas and parse each into a u32. Ignore invalid entries.
map_ids_str
.split(',')
.filter_map(|s| s.trim().parse::<u32>().ok())
.collect()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
let app_name = env!("CARGO_PKG_NAME");
logging::setup_logging(app_name, &["world_service", "health_check"]);
// Get the list of map IDs from the environment variable
let map_ids = get_map_ids();
// Get the service name from the environment variable
let service_name = get_service_name();
let instance_names = map_ids.iter().map(|map_id| format!("world-{}-{}", service_name, map_id).to_lowercase()).collect::<Vec<_>>();
// Create a game-logic instance for each map ID we want to manage
let orchestrator = K8sOrchestrator::new("default").await?;
let image = "gitea.azgstudio.com/raven/game-logic-service:latest";
for (map_id, instance_name) in map_ids.iter().zip(instance_names.iter()) {
match orchestrator.create_game_logic_instance(&instance_name, image, *map_id).await
{
Ok(created_pod) => {
debug!(
"Successfully created game-logic instance: {:?}",
created_pod.metadata.name,
);
}
Err(e) => {
if e.to_string().contains("AlreadyExists") {
info!("Game-logic instance already exists: {}", e);
// No reason to return an error here.
//TODO: We may want to check to make sure the pod is working correctly.
} else {
error!("Error creating game-logic instance: {}", e);
return Err(e);
}
}
}
}
for instance_name in instance_names.clone() {
match orchestrator.poll_connection_info(&instance_name, 30).await {
Ok(conn_info) => {
debug!("Successfully retrieved connection info for {} instance: {:?}", instance_name, conn_info);
//TODO: Store the connection info for later use.
}
Err(e) => {
error!("Error retrieving connection info for {} instance: {}", instance_name, e);
return Err(e);
}
}
}
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
@@ -30,5 +94,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Register service with Consul
health_check::start_health_check(addr.as_str()).await?;
utils::signal_handler::wait_for_signal().await;
// Shutdown all game-logic instances
let instances: Vec<_> = instance_names.iter().map(|instance_name| orchestrator.shutdown_instance(instance_name)).collect();
for instance in instances {
instance.await?;
}
Ok(())
}