Tons of fixes
Added movement updates Updated how entities are checked Events sending between packet service all the way to the logic service
This commit is contained in:
267
world-service/src/game_logic_client.rs
Normal file
267
world-service/src/game_logic_client.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tonic::transport::Channel;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use futures::StreamExt;
|
||||
|
||||
pub mod world {
|
||||
tonic::include_proto!("world");
|
||||
}
|
||||
|
||||
pub mod game_logic {
|
||||
tonic::include_proto!("game_logic");
|
||||
}
|
||||
|
||||
use world::world_game_logic_service_client::WorldGameLogicServiceClient;
|
||||
use game_logic::game_logic_service_client::GameLogicServiceClient;
|
||||
|
||||
pub struct GameLogicClientManager {
|
||||
clients: Arc<Mutex<HashMap<u32, GameLogicClient>>>,
|
||||
}
|
||||
|
||||
pub struct GameLogicClient {
|
||||
pub map_id: u32,
|
||||
pub endpoint: String,
|
||||
pub service_client: Option<GameLogicServiceClient<Channel>>,
|
||||
pub world_client: Option<WorldGameLogicServiceClient<Channel>>,
|
||||
pub event_sender: Option<mpsc::UnboundedSender<world::GameLogicEvent>>,
|
||||
}
|
||||
|
||||
impl GameLogicClientManager {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
clients: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get retry configuration from environment variables with sensible defaults
|
||||
fn get_retry_config() -> (u32, Duration, Duration) {
|
||||
let max_retries = std::env::var("GAME_LOGIC_MAX_RETRIES")
|
||||
.unwrap_or_else(|_| "3".to_string())
|
||||
.parse::<u32>()
|
||||
.unwrap_or(5);
|
||||
|
||||
let initial_delay_ms = std::env::var("GAME_LOGIC_INITIAL_DELAY_MS")
|
||||
.unwrap_or_else(|_| "500".to_string())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(500);
|
||||
|
||||
let max_delay_ms = std::env::var("GAME_LOGIC_MAX_DELAY_MS")
|
||||
.unwrap_or_else(|_| "10000".to_string())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(10000);
|
||||
|
||||
(
|
||||
max_retries,
|
||||
Duration::from_millis(initial_delay_ms),
|
||||
Duration::from_millis(max_delay_ms),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn add_client(&self, map_id: u32, endpoint: String) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let (max_retries, initial_delay, max_delay) = Self::get_retry_config();
|
||||
self.add_client_with_retry(map_id, endpoint, max_retries, initial_delay, max_delay).await
|
||||
}
|
||||
|
||||
pub async fn add_client_with_retry(
|
||||
&self,
|
||||
map_id: u32,
|
||||
endpoint: String,
|
||||
max_retries: u32,
|
||||
initial_delay: Duration,
|
||||
max_delay: Duration,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut clients = self.clients.lock().await;
|
||||
|
||||
if clients.contains_key(&map_id) {
|
||||
warn!("Game logic client for map {} already exists", map_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Release the lock before attempting connections
|
||||
drop(clients);
|
||||
|
||||
let mut client = GameLogicClient {
|
||||
map_id,
|
||||
endpoint: endpoint.clone(),
|
||||
service_client: None,
|
||||
world_client: None,
|
||||
event_sender: None,
|
||||
};
|
||||
|
||||
// Retry logic for connecting to the game logic service
|
||||
let mut delay = initial_delay;
|
||||
let mut last_error = None;
|
||||
|
||||
for attempt in 0..=max_retries {
|
||||
match GameLogicServiceClient::connect(endpoint.clone()).await {
|
||||
Ok(service_client) => {
|
||||
client.service_client = Some(service_client);
|
||||
debug!("Connected to game logic service for map {} at {} (attempt {})", map_id, endpoint, attempt + 1);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
last_error = Some(e);
|
||||
if attempt < max_retries {
|
||||
warn!("Failed to connect to game logic service for map {} at {} (attempt {}): {}. Retrying in {:?}...",
|
||||
map_id, endpoint, attempt + 1, last_error.as_ref().unwrap(), delay);
|
||||
sleep(delay).await;
|
||||
delay = std::cmp::min(delay * 2, max_delay);
|
||||
} else {
|
||||
error!("Failed to connect to game logic service for map {} at {} after {} attempts: {}",
|
||||
map_id, endpoint, max_retries + 1, last_error.as_ref().unwrap());
|
||||
return Err(last_error.unwrap().into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset delay for the second connection
|
||||
delay = initial_delay;
|
||||
last_error = None;
|
||||
|
||||
// Retry logic for connecting to the world-game-logic service
|
||||
for attempt in 0..=max_retries {
|
||||
match WorldGameLogicServiceClient::connect(endpoint.clone()).await {
|
||||
Ok(world_client) => {
|
||||
client.world_client = Some(world_client);
|
||||
debug!("Connected to game logic world service for map {} at {} (attempt {})", map_id, endpoint, attempt + 1);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
last_error = Some(e);
|
||||
if attempt < max_retries {
|
||||
warn!("Failed to connect to game logic world service for map {} at {} (attempt {}): {}. Retrying in {:?}...",
|
||||
map_id, endpoint, attempt + 1, last_error.as_ref().unwrap(), delay);
|
||||
sleep(delay).await;
|
||||
delay = std::cmp::min(delay * 2, max_delay);
|
||||
} else {
|
||||
error!("Failed to connect to game logic world service for map {} at {} after {} attempts: {}",
|
||||
map_id, endpoint, max_retries + 1, last_error.as_ref().unwrap());
|
||||
return Err(last_error.unwrap().into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-acquire the lock and insert the client
|
||||
let mut clients = self.clients.lock().await;
|
||||
clients.insert(map_id, client);
|
||||
debug!("Successfully added game logic client for map {} at {}", map_id, endpoint);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_client(&self, map_id: u32) {
|
||||
let mut clients = self.clients.lock().await;
|
||||
if let Some(_client) = clients.remove(&map_id) {
|
||||
debug!("Removed game logic client for map {}", map_id);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_nearby_objects(
|
||||
&self,
|
||||
map_id: u32,
|
||||
client_id: u32,
|
||||
x: f32,
|
||||
y: f32,
|
||||
z: f32,
|
||||
) -> Result<game_logic::NearbyObjectsResponse, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut clients = self.clients.lock().await;
|
||||
|
||||
if let Some(client) = clients.get_mut(&map_id) {
|
||||
if let Some(service_client) = &mut client.service_client {
|
||||
let request = game_logic::NearbyObjectsRequest {
|
||||
client_id,
|
||||
x,
|
||||
y,
|
||||
z,
|
||||
map_id: map_id as i32,
|
||||
};
|
||||
|
||||
let response = service_client.get_nearby_objects(request).await?;
|
||||
return Ok(response.into_inner());
|
||||
}
|
||||
}
|
||||
|
||||
Err(format!("No game logic client found for map {}", map_id).into())
|
||||
}
|
||||
|
||||
pub async fn start_event_stream(
|
||||
&self,
|
||||
map_id: u32,
|
||||
outbound_receiver: mpsc::UnboundedReceiver<world::GameLogicEvent>,
|
||||
) -> Result<mpsc::UnboundedReceiver<world::GameLogicEvent>, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut clients = self.clients.lock().await;
|
||||
|
||||
if let Some(client) = clients.get_mut(&map_id) {
|
||||
if let Some(mut world_client) = client.world_client.take() {
|
||||
let (inbound_sender, inbound_receiver) = mpsc::unbounded_channel();
|
||||
|
||||
// Create the bidirectional stream
|
||||
let outbound_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(outbound_receiver);
|
||||
|
||||
let response = world_client.stream_game_events(outbound_stream).await?;
|
||||
let mut inbound_stream = response.into_inner();
|
||||
|
||||
// Spawn task to handle incoming events
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = inbound_stream.next().await {
|
||||
match event {
|
||||
Ok(game_event) => {
|
||||
debug!("Received event from game logic for map {}: {:?}", map_id, game_event);
|
||||
if let Err(e) = inbound_sender.send(game_event) {
|
||||
error!("Failed to forward event from game logic for map {}: {}", map_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error receiving event from game logic for map {}: {}", map_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("Event stream from game logic for map {} ended", map_id);
|
||||
});
|
||||
|
||||
return Ok(inbound_receiver);
|
||||
}
|
||||
}
|
||||
|
||||
Err(format!("No world client found for map {}", map_id).into())
|
||||
}
|
||||
|
||||
pub async fn send_event(&self, map_id: u32, event: world::GameLogicEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let clients = self.clients.lock().await;
|
||||
|
||||
if let Some(client) = clients.get(&map_id) {
|
||||
if let Some(sender) = &client.event_sender {
|
||||
sender.send(event)?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Err(format!("No event sender found for map {}", map_id).into())
|
||||
}
|
||||
|
||||
pub async fn list_connected_maps(&self) -> Vec<u32> {
|
||||
let clients = self.clients.lock().await;
|
||||
clients.keys().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl GameLogicClient {
|
||||
pub async fn connect(map_id: u32, endpoint: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let service_client = GameLogicServiceClient::connect(endpoint.clone()).await?;
|
||||
let world_client = WorldGameLogicServiceClient::connect(endpoint.clone()).await?;
|
||||
|
||||
Ok(Self {
|
||||
map_id,
|
||||
endpoint,
|
||||
service_client: Some(service_client),
|
||||
world_client: Some(world_client),
|
||||
event_sender: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ use kube::{
|
||||
api::{Api, PostParams, DeleteParams},
|
||||
Client,
|
||||
};
|
||||
use k8s_openapi::api::core::v1::Pod;
|
||||
use k8s_openapi::api::core::v1::{Pod, Service};
|
||||
use serde_json::json;
|
||||
use std::error::Error;
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
@@ -36,8 +36,8 @@ impl K8sOrchestrator {
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new game-logic Pod with the given `instance_name` and container `image`.
|
||||
/// Adjust the pod manifest as needed for your game-logic container.
|
||||
/// Creates a new game-logic Pod and Service with the given `instance_name` and container `image`.
|
||||
/// This creates both the Pod and a corresponding Service for proper networking.
|
||||
pub async fn create_game_logic_instance(
|
||||
&self,
|
||||
instance_name: &str,
|
||||
@@ -55,7 +55,8 @@ impl K8sOrchestrator {
|
||||
"name": instance_name,
|
||||
"labels": {
|
||||
"app": "game-logic",
|
||||
"map_id": map_id_str
|
||||
"map_id": map_id_str,
|
||||
"instance": instance_name
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
@@ -96,9 +97,55 @@ impl K8sOrchestrator {
|
||||
|
||||
// Create the Pod in Kubernetes.
|
||||
let created_pod = pods.create(&PostParams::default(), &pod).await?;
|
||||
|
||||
// Create a corresponding Service for the Pod
|
||||
self.create_service_for_instance(instance_name, map_id).await?;
|
||||
|
||||
Ok(created_pod)
|
||||
}
|
||||
|
||||
/// Creates a Kubernetes Service for the game-logic instance
|
||||
async fn create_service_for_instance(
|
||||
&self,
|
||||
instance_name: &str,
|
||||
map_id: u32,
|
||||
) -> Result<Service, Box<dyn Error>> {
|
||||
let services: Api<Service> = Api::namespaced(self.client.clone(), &self.namespace);
|
||||
|
||||
let map_id_str = map_id.to_string();
|
||||
let service_name = format!("{}-service", instance_name);
|
||||
|
||||
let service_manifest = json!({
|
||||
"apiVersion": "v1",
|
||||
"kind": "Service",
|
||||
"metadata": {
|
||||
"name": service_name,
|
||||
"labels": {
|
||||
"app": "game-logic",
|
||||
"map_id": map_id_str
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"selector": {
|
||||
"app": "game-logic",
|
||||
"map_id": map_id_str,
|
||||
"instance": instance_name
|
||||
},
|
||||
"ports": [{
|
||||
"name": "grpc",
|
||||
"port": 50056,
|
||||
"targetPort": 50056,
|
||||
"protocol": "TCP"
|
||||
}],
|
||||
"type": "ClusterIP"
|
||||
}
|
||||
});
|
||||
|
||||
let service: Service = serde_json::from_value(service_manifest)?;
|
||||
let created_service = services.create(&PostParams::default(), &service).await?;
|
||||
Ok(created_service)
|
||||
}
|
||||
|
||||
/// Retrieves the updated Pod object for a given instance name.
|
||||
pub async fn get_instance(&self, instance_name: &str)
|
||||
-> Result<Pod, Box<dyn Error>> {
|
||||
@@ -107,40 +154,48 @@ impl K8sOrchestrator {
|
||||
Ok(pod)
|
||||
}
|
||||
|
||||
/// Checks the status of the game-logic Pod and returns its gRPC connection info.
|
||||
/// It attempts to determine the port from the pod's container spec (searching for a port
|
||||
/// named "grpc"). If not found, it falls back to the default port 50051.
|
||||
/// Gets connection info for the game-logic instance using the Kubernetes Service.
|
||||
/// This provides a stable endpoint that doesn't change when pods restart.
|
||||
pub async fn get_connection_info(&self, instance_name: &str)
|
||||
-> Result<Option<ConnectionInfo>, Box<dyn Error>>
|
||||
{
|
||||
// Check if the pod is ready first
|
||||
let pod = self.get_instance(instance_name).await?;
|
||||
if let Some(status) = pod.status {
|
||||
if let Some(pod_ip) = status.pod_ip {
|
||||
// Try to extract the container port dynamically.
|
||||
if let Some(spec) = pod.spec {
|
||||
if let Some(status) = &pod.status {
|
||||
// Check if pod is running and ready
|
||||
if status.phase.as_ref().map_or(false, |phase| phase == "Running") {
|
||||
// Use the Service DNS name instead of Pod IP
|
||||
let service_name = format!("{}-service", instance_name);
|
||||
let service_dns = format!("{}.{}.svc.cluster.local", service_name, self.namespace);
|
||||
|
||||
// Get the port from the pod spec or use default
|
||||
let port = if let Some(spec) = &pod.spec {
|
||||
if let Some(container) = spec.containers.first() {
|
||||
if let Some(ports) = &container.ports {
|
||||
// Look for a port with the name "grpc"
|
||||
if let Some(grpc_port) = ports.iter().find(|p| {
|
||||
p.name.as_ref().map_or(false, |n| n == "grpc")
|
||||
}) {
|
||||
return Ok(Some(ConnectionInfo {
|
||||
ip: pod_ip,
|
||||
port: grpc_port.container_port as u16,
|
||||
}));
|
||||
}
|
||||
// Or use the first container port if no named port was found.
|
||||
if let Some(first_port) = ports.first() {
|
||||
return Ok(Some(ConnectionInfo {
|
||||
ip: pod_ip,
|
||||
port: first_port.container_port as u16,
|
||||
}));
|
||||
grpc_port.container_port as u16
|
||||
} else if let Some(first_port) = ports.first() {
|
||||
first_port.container_port as u16
|
||||
} else {
|
||||
50056 // Default port
|
||||
}
|
||||
} else {
|
||||
50056 // Default port
|
||||
}
|
||||
} else {
|
||||
50056 // Default port
|
||||
}
|
||||
}
|
||||
// Use fallback port if no port information is available.
|
||||
return Ok(Some(ConnectionInfo { ip: pod_ip, port: 50051 }));
|
||||
} else {
|
||||
50056 // Default port
|
||||
};
|
||||
|
||||
return Ok(Some(ConnectionInfo {
|
||||
ip: service_dns,
|
||||
port,
|
||||
}));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
@@ -174,12 +229,19 @@ impl K8sOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
/// Shuts down (deletes) the game-logic Pod with the given name.
|
||||
/// Shuts down (deletes) the game-logic Pod and Service with the given name.
|
||||
pub async fn shutdown_instance(&self, instance_name: &str)
|
||||
-> Result<(), Box<dyn Error>> {
|
||||
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.namespace);
|
||||
// DeleteParams::default() is sufficient for a forceful deletion.
|
||||
let services: Api<Service> = Api::namespaced(self.client.clone(), &self.namespace);
|
||||
|
||||
// Delete the Pod
|
||||
pods.delete(instance_name, &DeleteParams::default()).await?;
|
||||
|
||||
// Delete the corresponding Service
|
||||
let service_name = format!("{}-service", instance_name);
|
||||
services.delete(&service_name, &DeleteParams::default()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,21 @@
|
||||
mod k8s_orchestrator;
|
||||
mod world_service;
|
||||
mod game_logic_client;
|
||||
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration, timeout};
|
||||
use tokio::sync::oneshot;
|
||||
use tonic::transport::Server;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
|
||||
use utils::{health_check, logging};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use crate::k8s_orchestrator::K8sOrchestrator;
|
||||
use crate::world_service::{MyWorldService, MyWorldGameLogicService};
|
||||
use crate::game_logic_client::GameLogicClientManager;
|
||||
use world_service::world::world_service_server::WorldServiceServer;
|
||||
use world_service::world::world_game_logic_service_server::WorldGameLogicServiceServer;
|
||||
|
||||
fn get_service_name() -> String {
|
||||
env::var("WORLD_SERVICE_NAME").unwrap_or_else(|_| "default-service".to_string())
|
||||
@@ -21,6 +31,70 @@ fn get_map_ids() -> Vec<u32> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get connection retry configuration from environment variables with sensible defaults
|
||||
fn get_connection_retry_config() -> (u32, Duration, Duration) {
|
||||
let max_retries = env::var("CONNECTION_INFO_MAX_RETRIES")
|
||||
.unwrap_or_else(|_| "3".to_string())
|
||||
.parse::<u32>()
|
||||
.unwrap_or(3);
|
||||
|
||||
let initial_delay_ms = env::var("CONNECTION_INFO_INITIAL_DELAY_MS")
|
||||
.unwrap_or_else(|_| "2000".to_string())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(2000);
|
||||
|
||||
let max_delay_ms = env::var("CONNECTION_INFO_MAX_DELAY_MS")
|
||||
.unwrap_or_else(|_| "10000".to_string())
|
||||
.parse::<u64>()
|
||||
.unwrap_or(10000);
|
||||
|
||||
(
|
||||
max_retries,
|
||||
Duration::from_millis(initial_delay_ms),
|
||||
Duration::from_millis(max_delay_ms),
|
||||
)
|
||||
}
|
||||
|
||||
/// Retry wrapper for getting connection info with exponential backoff
|
||||
async fn get_connection_info_with_retry(
|
||||
orchestrator: &K8sOrchestrator,
|
||||
instance_name: &str,
|
||||
poll_timeout_secs: u64,
|
||||
max_retries: u32,
|
||||
initial_delay: Duration,
|
||||
max_delay: Duration,
|
||||
) -> Result<crate::k8s_orchestrator::ConnectionInfo, String> {
|
||||
let mut delay = initial_delay;
|
||||
let mut last_error_msg = String::new();
|
||||
|
||||
for attempt in 0..=max_retries {
|
||||
match orchestrator.poll_connection_info(instance_name, poll_timeout_secs).await {
|
||||
Ok(conn_info) => {
|
||||
if attempt > 0 {
|
||||
info!("Successfully retrieved connection info for {} instance after {} attempts", instance_name, attempt + 1);
|
||||
}
|
||||
return Ok(conn_info);
|
||||
}
|
||||
Err(e) => {
|
||||
last_error_msg = e.to_string();
|
||||
if attempt < max_retries {
|
||||
warn!("Failed to retrieve connection info for {} instance (attempt {}): {}. Retrying in {:?}...",
|
||||
instance_name, attempt + 1, last_error_msg, delay);
|
||||
sleep(delay).await;
|
||||
delay = std::cmp::min(delay * 2, max_delay);
|
||||
} else {
|
||||
error!("Failed to retrieve connection info for {} instance after {} attempts: {}",
|
||||
instance_name, max_retries + 1, last_error_msg);
|
||||
return Err(last_error_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This should never be reached due to the loop logic above
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv().ok();
|
||||
@@ -49,7 +123,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
Err(e) => {
|
||||
if e.to_string().contains("AlreadyExists") {
|
||||
warn!("Game-logic instance already exists: {}", e);
|
||||
debug!("Game-logic instance already exists: {}", e);
|
||||
// No reason to return an error here.
|
||||
//TODO: We may want to check to make sure the pod is working correctly.
|
||||
} else {
|
||||
@@ -60,45 +134,228 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
}
|
||||
|
||||
for instance_name in instance_names.clone() {
|
||||
match orchestrator.poll_connection_info(&instance_name, 30).await {
|
||||
Ok(conn_info) => {
|
||||
// Create game logic client manager and world service
|
||||
let game_logic_manager = Arc::new(GameLogicClientManager::new());
|
||||
let world_service_shared = Arc::new(MyWorldService::new_with_game_logic_manager(game_logic_manager.clone()));
|
||||
let world_service = MyWorldService::new_with_game_logic_manager(game_logic_manager.clone());
|
||||
let world_game_logic_service = MyWorldGameLogicService::new(world_service_shared.clone());
|
||||
|
||||
// Get retry configuration for connection info
|
||||
let (conn_max_retries, conn_initial_delay, conn_max_delay) = get_connection_retry_config();
|
||||
|
||||
// Connect to game logic instances in parallel using futures::future::join_all
|
||||
info!("Connecting to {} game logic instances in parallel...", map_ids.len());
|
||||
|
||||
// Create an Arc to share the orchestrator across tasks
|
||||
let orchestrator = Arc::new(orchestrator);
|
||||
|
||||
let connection_futures: Vec<_> = map_ids.iter().zip(instance_names.clone())
|
||||
.map(|(map_id, instance_name)| {
|
||||
let orchestrator = orchestrator.clone();
|
||||
let game_logic_manager = game_logic_manager.clone();
|
||||
let map_id = *map_id;
|
||||
let instance_name = instance_name.clone();
|
||||
|
||||
async move {
|
||||
// Handle the connection info retrieval and convert error to String immediately
|
||||
let conn_info = match get_connection_info_with_retry(
|
||||
&orchestrator,
|
||||
&instance_name,
|
||||
30, // poll timeout in seconds (existing behavior)
|
||||
conn_max_retries,
|
||||
conn_initial_delay,
|
||||
conn_max_delay
|
||||
).await {
|
||||
Ok(info) => info,
|
||||
Err(e) => {
|
||||
let error_msg = format!("Map {}: Connection info retrieval failed - {}", map_id, e);
|
||||
error!("Error retrieving connection info for {} instance after retries: {}", instance_name, e);
|
||||
warn!("Skipping map {} due to connection info retrieval failure", map_id);
|
||||
return Err(error_msg);
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Successfully retrieved connection info for {} instance: {:?}", instance_name, conn_info);
|
||||
//TODO: Store the connection info for later use.
|
||||
let endpoint = format!("http://{}:{}", conn_info.ip, conn_info.port);
|
||||
info!("Attempting to connect to game logic service for map {} at endpoint: {}", map_id, endpoint);
|
||||
|
||||
// Try to resolve the DNS name to see if it's reachable
|
||||
match tokio::net::lookup_host(&format!("{}:{}", conn_info.ip, conn_info.port)).await {
|
||||
Ok(addrs) => {
|
||||
let resolved_addrs: Vec<_> = addrs.collect();
|
||||
info!("DNS resolution successful for {}: {:?}", conn_info.ip, resolved_addrs);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("DNS resolution failed for {}: {}", conn_info.ip, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Give the service a moment to be ready
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
|
||||
// Try alternative endpoint formats if the full DNS name doesn't work
|
||||
let alternative_endpoints = vec![
|
||||
// endpoint.clone(), // this one doesn't seem to work
|
||||
format!("http://{}-service:{}", instance_name, conn_info.port), // Short service name
|
||||
format!("http://{}:{}", conn_info.ip.split('.').next().unwrap_or(&conn_info.ip), conn_info.port), // Just service name without domain
|
||||
];
|
||||
|
||||
let mut connection_successful = false;
|
||||
let mut last_connection_error = String::new();
|
||||
|
||||
// Try each endpoint format
|
||||
for (i, test_endpoint) in alternative_endpoints.iter().enumerate() {
|
||||
debug!("Trying endpoint format {}: {}", i + 1, test_endpoint);
|
||||
match game_logic_manager.add_client(map_id, test_endpoint.clone()).await {
|
||||
Ok(()) => {
|
||||
info!("Successfully connected to game logic service for map {} using endpoint: {}", map_id, test_endpoint);
|
||||
connection_successful = true;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to connect using endpoint {}: {}", test_endpoint, e);
|
||||
last_connection_error = format!("Endpoint {}: {}", test_endpoint, e);
|
||||
if i < alternative_endpoints.len() - 1 {
|
||||
debug!("Trying next endpoint format...");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if connection_successful {
|
||||
Ok(map_id)
|
||||
} else {
|
||||
error!("Failed to connect to game logic service for map {} using any endpoint format. Last error: {}", map_id, last_connection_error);
|
||||
Err(format!("Map {}: Failed to connect using any endpoint format - {}", map_id, last_connection_error))
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error retrieving connection info for {} instance: {}", instance_name, e);
|
||||
return Err(e);
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Wait for all connection attempts to complete
|
||||
let connection_results = futures::future::join_all(connection_futures).await;
|
||||
|
||||
// Process connection results
|
||||
let mut successful_connections = 0;
|
||||
let mut failed_connections = Vec::new();
|
||||
|
||||
for result in connection_results {
|
||||
match result {
|
||||
Ok(map_id) => {
|
||||
successful_connections += 1;
|
||||
debug!("Connection for map {} completed successfully", map_id);
|
||||
}
|
||||
Err(error_msg) => {
|
||||
failed_connections.push(error_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Game logic connection summary: {} successful, {} failed",
|
||||
successful_connections, failed_connections.len());
|
||||
|
||||
if !failed_connections.is_empty() {
|
||||
warn!("Failed connections: {:?}", failed_connections);
|
||||
}
|
||||
|
||||
if successful_connections == 0 {
|
||||
error!("No game logic instances could be connected! Service may not function properly.");
|
||||
} else {
|
||||
info!("World service startup completed with {} active game logic connections", successful_connections);
|
||||
}
|
||||
|
||||
// Set the gRPC server address
|
||||
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
|
||||
let port = env::var("SERVICE_PORT").unwrap_or_else(|_| "50054".to_string());
|
||||
let db_url = format!(
|
||||
"http://{}",
|
||||
get_kube_service_endpoints_by_dns("database-service", "tcp", "database-service")
|
||||
.await?
|
||||
.get(0)
|
||||
.unwrap()
|
||||
);
|
||||
let chat_service = format!(
|
||||
"http://{}",
|
||||
get_kube_service_endpoints_by_dns("chat-service", "tcp", "chat-service")
|
||||
.await?
|
||||
.get(0)
|
||||
.unwrap()
|
||||
);
|
||||
// let db_url = format!(
|
||||
// "http://{}",
|
||||
// get_kube_service_endpoints_by_dns("database-service", "tcp", "database-service")
|
||||
// .await?
|
||||
// .get(0)
|
||||
// .unwrap()
|
||||
// );
|
||||
// let chat_service = format!(
|
||||
// "http://{}",
|
||||
// get_kube_service_endpoints_by_dns("chat-service", "tcp", "chat-service")
|
||||
// .await?
|
||||
// .get(0)
|
||||
// .unwrap()
|
||||
// );
|
||||
|
||||
// Start gRPC server with graceful shutdown support
|
||||
let grpc_addr = format!("{}:{}", addr, port).parse()?;
|
||||
info!("Starting World Service gRPC server on {}", grpc_addr);
|
||||
|
||||
// Create shutdown signal channel
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||
|
||||
let server_task = tokio::spawn(async move {
|
||||
let server = Server::builder()
|
||||
.add_service(WorldServiceServer::new(world_service))
|
||||
.add_service(WorldGameLogicServiceServer::new(world_game_logic_service))
|
||||
.serve_with_shutdown(grpc_addr, async {
|
||||
shutdown_rx.await.ok();
|
||||
debug!("gRPC server shutdown signal received");
|
||||
});
|
||||
|
||||
if let Err(e) = server.await {
|
||||
error!("gRPC server error: {}", e);
|
||||
} else {
|
||||
debug!("gRPC server shut down gracefully");
|
||||
}
|
||||
});
|
||||
|
||||
// Register service with Consul
|
||||
health_check::start_health_check(addr.as_str()).await?;
|
||||
|
||||
// Wait for shutdown signal
|
||||
info!("World service is running. Waiting for shutdown signal...");
|
||||
utils::signal_handler::wait_for_signal().await;
|
||||
|
||||
// Shutdown all game-logic instances
|
||||
let instances: Vec<_> = instance_names.iter().map(|instance_name| orchestrator.shutdown_instance(instance_name)).collect();
|
||||
for instance in instances {
|
||||
instance.await?;
|
||||
|
||||
info!("Shutdown signal received. Beginning graceful shutdown...");
|
||||
|
||||
// Step 1: Signal the gRPC server to stop accepting new connections
|
||||
if let Err(_) = shutdown_tx.send(()) {
|
||||
warn!("Failed to send shutdown signal to gRPC server (receiver may have been dropped)");
|
||||
}
|
||||
|
||||
// Step 2: Wait for the gRPC server to finish with a timeout
|
||||
match timeout(Duration::from_secs(30), server_task).await {
|
||||
Ok(result) => {
|
||||
if let Err(e) = result {
|
||||
error!("gRPC server task failed: {}", e);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("gRPC server shutdown timed out after 30 seconds");
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Shutdown all game-logic instances
|
||||
info!("Shutting down {} game-logic instances...", instance_names.len());
|
||||
let mut shutdown_errors = Vec::new();
|
||||
|
||||
for instance_name in &instance_names {
|
||||
match timeout(Duration::from_secs(10), orchestrator.shutdown_instance(instance_name)).await {
|
||||
Ok(Ok(())) => {
|
||||
info!("Successfully shut down game-logic instance: {}", instance_name);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!("Failed to shutdown game-logic instance {}: {}", instance_name, e);
|
||||
shutdown_errors.push(format!("Instance {}: {}", instance_name, e));
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Timeout shutting down game-logic instance: {}", instance_name);
|
||||
shutdown_errors.push(format!("Instance {}: timeout", instance_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shutdown_errors.is_empty() {
|
||||
info!("All components shut down successfully");
|
||||
} else {
|
||||
warn!("Some components failed to shut down cleanly: {:?}", shutdown_errors);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
559
world-service/src/world_service.rs
Normal file
559
world-service/src/world_service.rs
Normal file
@@ -0,0 +1,559 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use crate::game_logic_client::{GameLogicClientManager, game_logic as client_game_logic};
|
||||
|
||||
pub mod world {
|
||||
tonic::include_proto!("world");
|
||||
}
|
||||
|
||||
pub mod game_logic {
|
||||
tonic::include_proto!("game_logic");
|
||||
}
|
||||
|
||||
use world::world_service_server::WorldService;
|
||||
use world::world_game_logic_service_server::WorldGameLogicService;
|
||||
use world::*;
|
||||
|
||||
pub struct MyWorldService {
|
||||
pub client_connections: Arc<Mutex<HashMap<String, ClientConnection>>>,
|
||||
pub game_logic_connections: Arc<Mutex<HashMap<u32, GameLogicConnection>>>,
|
||||
pub game_logic_manager: Option<Arc<GameLogicClientManager>>,
|
||||
}
|
||||
|
||||
pub struct ClientConnection {
|
||||
pub session_id: String,
|
||||
pub client_id: String,
|
||||
pub map_id: i32,
|
||||
pub x: f32,
|
||||
pub y: f32,
|
||||
pub z: f32,
|
||||
pub sender: mpsc::UnboundedSender<WorldEvent>,
|
||||
}
|
||||
|
||||
pub struct GameLogicConnection {
|
||||
pub map_id: u32,
|
||||
pub sender: mpsc::UnboundedSender<world::GameLogicEvent>,
|
||||
}
|
||||
|
||||
impl MyWorldService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
client_connections: Arc::new(Mutex::new(HashMap::new())),
|
||||
game_logic_connections: Arc::new(Mutex::new(HashMap::new())),
|
||||
game_logic_manager: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_game_logic_manager(game_logic_manager: Arc<GameLogicClientManager>) -> Self {
|
||||
Self {
|
||||
client_connections: Arc::new(Mutex::new(HashMap::new())),
|
||||
game_logic_connections: Arc::new(Mutex::new(HashMap::new())),
|
||||
game_logic_manager: Some(game_logic_manager),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_client_connection(&self, session_id: String, client_id: String, map_id: i32, sender: mpsc::UnboundedSender<WorldEvent>) {
|
||||
let mut connections = self.client_connections.lock().unwrap();
|
||||
connections.insert(session_id.clone(), ClientConnection {
|
||||
session_id,
|
||||
client_id,
|
||||
map_id,
|
||||
x: 0.0,
|
||||
y: 0.0,
|
||||
z: 0.0,
|
||||
sender,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn remove_client_connection(&self, session_id: &str) {
|
||||
let mut connections = self.client_connections.lock().unwrap();
|
||||
connections.remove(session_id);
|
||||
}
|
||||
|
||||
pub fn add_game_logic_connection(&self, map_id: u32, sender: mpsc::UnboundedSender<world::GameLogicEvent>) {
|
||||
let mut connections = self.game_logic_connections.lock().unwrap();
|
||||
connections.insert(map_id, GameLogicConnection {
|
||||
map_id,
|
||||
sender,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn remove_game_logic_connection(&self, map_id: u32) {
|
||||
let mut connections = self.game_logic_connections.lock().unwrap();
|
||||
connections.remove(&map_id);
|
||||
}
|
||||
|
||||
pub fn broadcast_to_clients_in_map(&self, map_id: i32, event: WorldEvent) {
|
||||
let connections = self.client_connections.lock().unwrap();
|
||||
for connection in connections.values() {
|
||||
if connection.map_id == map_id {
|
||||
if let Err(e) = connection.sender.send(event.clone()) {
|
||||
warn!("Failed to send event to client {}: {}", connection.client_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_to_game_logic(&self, map_id: u32, event: world::GameLogicEvent) {
|
||||
let connections = self.game_logic_connections.lock().unwrap();
|
||||
if let Some(connection) = connections.get(&map_id) {
|
||||
if let Err(e) = connection.sender.send(event) {
|
||||
warn!("Failed to send event to game logic for map {}: {}", map_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_nearby_objects_for_client(&self, session_id: &str, x: f32, y: f32, z: f32, map_id: i32, radius: f32) -> Vec<WorldObject> {
|
||||
// This is a placeholder implementation
|
||||
// In a real implementation, you would query the game logic service or maintain a spatial index
|
||||
debug!("Getting nearby objects for client {} at ({}, {}, {}) in map {} with radius {}",
|
||||
session_id, x, y, z, map_id, radius);
|
||||
|
||||
// Return empty list for now - this will be populated by game logic service
|
||||
vec![]
|
||||
}
|
||||
|
||||
/// Send nearby objects to a connecting client
|
||||
pub async fn send_nearby_objects_to_client(&self, session_id: &str, client_id: &str, x: f32, y: f32, z: f32, map_id: i32) {
|
||||
debug!("Sending nearby objects to client {} at ({}, {}, {}) in map {}", client_id, x, y, z, map_id);
|
||||
|
||||
// Get the client connection to send events
|
||||
let client_sender = {
|
||||
let connections = self.client_connections.lock().unwrap();
|
||||
connections.get(session_id).map(|conn| conn.sender.clone())
|
||||
};
|
||||
|
||||
let Some(sender) = client_sender else {
|
||||
warn!("No client connection found for session {}", session_id);
|
||||
return;
|
||||
};
|
||||
|
||||
// Get nearby objects from game logic service if available
|
||||
if let Some(game_logic_manager) = &self.game_logic_manager {
|
||||
// Get client_id from the client connection
|
||||
let client_id = {
|
||||
let connections = self.client_connections.lock().unwrap();
|
||||
connections.get(session_id).map(|conn| conn.client_id.parse::<u32>().unwrap_or(0)).unwrap_or(0)
|
||||
};
|
||||
|
||||
match game_logic_manager.get_nearby_objects(map_id as u32, client_id, x, y, z).await {
|
||||
Ok(response) => {
|
||||
debug!("Received {} nearby objects from game logic service", response.objects.len());
|
||||
|
||||
if !response.objects.is_empty() {
|
||||
// Convert game logic objects to WorldObjects for batch sending
|
||||
let world_objects: Vec<WorldObject> = response.objects
|
||||
.into_iter()
|
||||
.filter_map(|obj| {
|
||||
// Filter out players for now, only include NPCs and Mobs
|
||||
match obj.r#type {
|
||||
1 => {
|
||||
// Player - skip for now
|
||||
debug!("Skipping player object {} for nearby objects", obj.id);
|
||||
None
|
||||
}
|
||||
2 | 3 => {
|
||||
// NPC or Mob - include in batch
|
||||
Some(WorldObject {
|
||||
id: obj.id as u32,
|
||||
object_type: obj.r#type,
|
||||
x: obj.x,
|
||||
y: obj.y,
|
||||
z: obj.z,
|
||||
map_id: map_id,
|
||||
name: format!("Object_{}", obj.id), // Default name
|
||||
hp: obj.hp, // Default HP
|
||||
max_hp: obj.max_hp, // Default max HP
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
debug!("Unknown object type {} for object {}", obj.r#type, obj.id);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !world_objects.is_empty() {
|
||||
// Send all nearby objects in a single batch update
|
||||
let batch_event = WorldEvent {
|
||||
client_ids: vec![client_id.to_string()],
|
||||
event: Some(world_event::Event::NearbyUpdate(NearbyObjectsUpdate {
|
||||
objects: world_objects.clone(),
|
||||
})),
|
||||
};
|
||||
|
||||
if let Err(e) = sender.send(batch_event) {
|
||||
warn!("Failed to send nearby objects batch update to client {}: {}", client_id, e);
|
||||
} else {
|
||||
debug!("Sent {} nearby objects in batch to client {}", world_objects.len(), client_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to get nearby objects from game logic service for client {}: {}", client_id, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("No game logic manager available, skipping nearby objects");
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a game logic object to a WorldEvent based on its type
|
||||
fn convert_game_logic_object_to_world_event(&self, obj: &client_game_logic::Object, client_id: &str) -> Option<WorldEvent> {
|
||||
// Object types: 1 = Player, 2 = NPC, 3 = Mob (based on common MMORPG conventions)
|
||||
match obj.r#type {
|
||||
2 => {
|
||||
// NPC
|
||||
Some(WorldEvent {
|
||||
client_ids: vec![client_id.to_string()],
|
||||
event: Some(world_event::Event::NpcSpawn(NpcSpawnEvent {
|
||||
id: obj.id as u32,
|
||||
pos_x: obj.x,
|
||||
pos_y: obj.y,
|
||||
dest_pos_x: obj.x,
|
||||
dest_pos_y: obj.y,
|
||||
command: 0,
|
||||
target_id: 0,
|
||||
move_mode: 0,
|
||||
hp: 100, // Default HP
|
||||
team_id: 0,
|
||||
status_flag: 0,
|
||||
npc_id: obj.id as u32,
|
||||
quest_id: 0,
|
||||
angle: 0.0,
|
||||
event_status: 0,
|
||||
})),
|
||||
})
|
||||
}
|
||||
3 => {
|
||||
// Mob
|
||||
Some(WorldEvent {
|
||||
client_ids: vec![client_id.to_string()],
|
||||
event: Some(world_event::Event::MobSpawn(MobSpawnEvent {
|
||||
id: obj.id as u32,
|
||||
pos_x: obj.x,
|
||||
pos_y: obj.y,
|
||||
dest_pos_x: obj.x,
|
||||
dest_pos_y: obj.y,
|
||||
command: 0,
|
||||
target_id: 0,
|
||||
move_mode: 0,
|
||||
hp: 100, // Default HP
|
||||
team_id: 0,
|
||||
status_flag: 0,
|
||||
npc_id: obj.id as u32,
|
||||
quest_id: 0,
|
||||
})),
|
||||
})
|
||||
}
|
||||
1 => {
|
||||
// Player - for now we don't send player spawn events to other players on connect
|
||||
// This would typically be handled differently (e.g., through a separate player list)
|
||||
debug!("Skipping player object {} for nearby objects", obj.id);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
debug!("Unknown object type {} for object {}", obj.r#type, obj.id);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl WorldService for MyWorldService {
|
||||
async fn get_character(&self, request: Request<CharacterRequest>) -> Result<Response<CharacterResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("GetCharacter request: {:?}", req);
|
||||
|
||||
let response = CharacterResponse {
|
||||
count: 1,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
|
||||
async fn change_map(&self, request: Request<ChangeMapRequest>) -> Result<Response<ChangeMapResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("ChangeMap request: {:?}", req);
|
||||
|
||||
let response = ChangeMapResponse {
|
||||
id: req.id,
|
||||
map_id: 1,
|
||||
x: req.x,
|
||||
y: req.y,
|
||||
move_mode: 0,
|
||||
ride_mode: 0,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
|
||||
async fn move_character(&self, request: Request<CharacterMoveRequest>) -> Result<Response<CharacterMoveResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("MoveCharacter request: {:?}", req);
|
||||
|
||||
// Update client position
|
||||
{
|
||||
let mut connections = self.client_connections.lock().unwrap();
|
||||
if let Some(connection) = connections.get_mut(&req.session_id) {
|
||||
connection.x = req.x;
|
||||
connection.y = req.y;
|
||||
connection.z = req.z;
|
||||
}
|
||||
}
|
||||
|
||||
let response = CharacterMoveResponse {
|
||||
id: 1,
|
||||
target_id: req.target_id as i32,
|
||||
distance: 0,
|
||||
x: req.x,
|
||||
y: req.y,
|
||||
z: req.z,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
|
||||
async fn get_target_hp(&self, request: Request<ObjectHpRequest>) -> Result<Response<ObjectHpResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("GetTargetHp request: {:?}", req);
|
||||
|
||||
let response = ObjectHpResponse {
|
||||
target_id: req.target_id,
|
||||
hp: 100, // Placeholder
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
|
||||
async fn get_nearby_objects(&self, request: Request<NearbyObjectsRequest>) -> Result<Response<NearbyObjectsResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
debug!("GetNearbyObjects request: {:?}", req);
|
||||
|
||||
let objects = self.get_nearby_objects_for_client(&req.session_id, req.x, req.y, req.z, req.map_id, req.radius);
|
||||
|
||||
let response = NearbyObjectsResponse {
|
||||
objects,
|
||||
};
|
||||
Ok(Response::new(response))
|
||||
}
|
||||
|
||||
type StreamClientEventsStream = Pin<Box<dyn Stream<Item = Result<WorldEvent, Status>> + Send + Sync + 'static>>;
|
||||
|
||||
async fn stream_client_events(
|
||||
&self,
|
||||
request: Request<Streaming<ClientEvent>>,
|
||||
) -> Result<Response<Self::StreamClientEventsStream>, Status> {
|
||||
trace!("New client stream connection established");
|
||||
|
||||
let mut inbound_stream = request.into_inner();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let client_connections = self.client_connections.clone();
|
||||
let game_logic_connections = self.game_logic_connections.clone();
|
||||
let game_logic_manager = self.game_logic_manager.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut current_session_id: Option<String> = None;
|
||||
|
||||
while let Some(event) = inbound_stream.next().await {
|
||||
match event {
|
||||
Ok(client_event) => {
|
||||
debug!("Received client event: {:?}", client_event);
|
||||
|
||||
match client_event.event {
|
||||
Some(client_event::Event::Connect(connect_event)) => {
|
||||
debug!("Client {} connected to map {} at position ({}, {}, {})",
|
||||
client_event.client_id, client_event.map_id,
|
||||
connect_event.x, connect_event.y, connect_event.z);
|
||||
|
||||
let session_id = client_event.session_id.clone();
|
||||
let client_id = client_event.client_id.clone();
|
||||
let map_id = client_event.map_id;
|
||||
let x = connect_event.x;
|
||||
let y = connect_event.y;
|
||||
let z = connect_event.z;
|
||||
|
||||
// Track the session ID for cleanup
|
||||
current_session_id = Some(session_id.clone());
|
||||
|
||||
// Add client connection to the HashMap with position from connect event
|
||||
{
|
||||
let mut connections = client_connections.lock().unwrap();
|
||||
connections.insert(session_id.clone(), ClientConnection {
|
||||
session_id: session_id.clone(),
|
||||
client_id: client_id.clone(),
|
||||
map_id,
|
||||
x,
|
||||
y,
|
||||
z,
|
||||
sender: tx.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Send all nearby objects to the client
|
||||
// Create a service instance with the proper game logic manager
|
||||
let world_service = MyWorldService {
|
||||
client_connections: client_connections.clone(),
|
||||
game_logic_connections: game_logic_connections.clone(),
|
||||
game_logic_manager: game_logic_manager.clone(),
|
||||
};
|
||||
|
||||
// Send nearby objects to the connecting client
|
||||
world_service.send_nearby_objects_to_client(&session_id, &client_id, x, y, z, map_id).await;
|
||||
}
|
||||
Some(client_event::Event::Disconnect(_)) => {
|
||||
debug!("Client {} disconnected", client_event.client_id);
|
||||
// Handle client disconnection
|
||||
}
|
||||
Some(client_event::Event::Move(move_event)) => {
|
||||
debug!("Client {} moved to ({}, {}, {})", client_event.client_id, move_event.x, move_event.y, move_event.z);
|
||||
|
||||
// Update client position
|
||||
{
|
||||
let mut connections = client_connections.lock().unwrap();
|
||||
if let Some(connection) = connections.get_mut(&client_event.session_id) {
|
||||
connection.x = move_event.x;
|
||||
connection.y = move_event.y;
|
||||
connection.z = move_event.z;
|
||||
}
|
||||
}
|
||||
|
||||
// Send PlayerMoveEvent to game logic service
|
||||
let player_move_event = world::GameLogicEvent {
|
||||
client_ids: vec![],
|
||||
map_id: client_event.map_id,
|
||||
event: Some(world::game_logic_event::Event::PlayerMove(world::PlayerMoveEvent {
|
||||
session_id: client_event.session_id.clone(),
|
||||
client_id: client_event.client_id.clone(),
|
||||
x: move_event.x,
|
||||
y: move_event.y,
|
||||
z: move_event.z,
|
||||
})),
|
||||
};
|
||||
|
||||
// Send to game logic service for the appropriate map
|
||||
let game_logic_connections = game_logic_connections.lock().unwrap();
|
||||
if let Some(connection) = game_logic_connections.get(&(client_event.map_id as u32)) {
|
||||
if let Err(e) = connection.sender.send(player_move_event) {
|
||||
warn!("Failed to send player move event to game logic for map {}: {}", client_event.map_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(client_event::Event::MapChange(map_change_event)) => {
|
||||
debug!("Client {} changed from map {} to map {}", client_event.client_id, map_change_event.old_map_id, map_change_event.new_map_id);
|
||||
// Handle map change
|
||||
}
|
||||
None => {
|
||||
warn!("Received client event with no event data");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error receiving client event: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up client connection when stream ends
|
||||
if let Some(session_id) = current_session_id {
|
||||
let mut connections = client_connections.lock().unwrap();
|
||||
connections.remove(&session_id);
|
||||
debug!("Client connection {} removed from connections", session_id);
|
||||
}
|
||||
|
||||
trace!("Client event stream ended");
|
||||
});
|
||||
|
||||
let outbound_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||
Ok(Response::new(Box::pin(outbound_stream) as Self::StreamClientEventsStream))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MyWorldGameLogicService {
|
||||
pub world_service: Arc<MyWorldService>,
|
||||
}
|
||||
|
||||
impl MyWorldGameLogicService {
|
||||
pub fn new(world_service: Arc<MyWorldService>) -> Self {
|
||||
Self {
|
||||
world_service,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl WorldGameLogicService for MyWorldGameLogicService {
|
||||
type StreamGameEventsStream = Pin<Box<dyn Stream<Item = Result<world::GameLogicEvent, Status>> + Send + Sync + 'static>>;
|
||||
|
||||
async fn stream_game_events(
|
||||
&self,
|
||||
request: Request<Streaming<world::GameLogicEvent>>,
|
||||
) -> Result<Response<Self::StreamGameEventsStream>, Status> {
|
||||
trace!("New game logic stream connection established");
|
||||
|
||||
let mut inbound_stream = request.into_inner();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let world_service = self.world_service.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = inbound_stream.next().await {
|
||||
match event {
|
||||
Ok(game_event) => {
|
||||
debug!("Received game event: {:?}", game_event);
|
||||
|
||||
// Convert game logic events to world events and broadcast to relevant clients
|
||||
match game_event.event {
|
||||
Some(world::game_logic_event::Event::NpcSpawn(npc_spawn)) => {
|
||||
let _world_event = WorldEvent {
|
||||
client_ids: game_event.client_ids.clone(),
|
||||
event: Some(world_event::Event::NpcSpawn(npc_spawn)),
|
||||
};
|
||||
// Broadcast to clients - for now broadcast to all clients in the map
|
||||
// In a real implementation, you'd determine the map from the event
|
||||
// world_service.broadcast_to_clients_in_map(game_event.map_id, world_event);
|
||||
}
|
||||
Some(world::game_logic_event::Event::MobSpawn(mob_spawn)) => {
|
||||
let _world_event = WorldEvent {
|
||||
client_ids: game_event.client_ids.clone(),
|
||||
event: Some(world_event::Event::MobSpawn(mob_spawn)),
|
||||
};
|
||||
// Broadcast to clients
|
||||
}
|
||||
Some(world::game_logic_event::Event::ObjectDespawn(despawn)) => {
|
||||
let _world_event = WorldEvent {
|
||||
client_ids: game_event.client_ids.clone(),
|
||||
event: Some(world_event::Event::ObjectDespawn(despawn)),
|
||||
};
|
||||
// Broadcast to clients
|
||||
}
|
||||
_ => {
|
||||
debug!("Unhandled game logic event type");
|
||||
}
|
||||
}
|
||||
|
||||
// Echo the event back for now
|
||||
if let Err(e) = tx.send(game_event) {
|
||||
error!("Failed to send game event: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error receiving game event: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("Game logic event stream ended");
|
||||
});
|
||||
|
||||
let outbound_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx).map(|msg| Ok(msg));
|
||||
Ok(Response::new(Box::pin(outbound_stream) as Self::StreamGameEventsStream))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user