Updated how we handle client ids in the world service and logic service
Implemented the bidirectional comms stream between the world service and game logic service
This commit is contained in:
@@ -5,14 +5,7 @@ use tokio::time::{sleep, Duration};
|
||||
use tonic::transport::Channel;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use futures::StreamExt;
|
||||
|
||||
pub mod world {
|
||||
tonic::include_proto!("world");
|
||||
}
|
||||
|
||||
pub mod game_logic {
|
||||
tonic::include_proto!("game_logic");
|
||||
}
|
||||
use crate::proto::{world, game_logic};
|
||||
|
||||
use world::world_game_logic_service_client::WorldGameLogicServiceClient;
|
||||
use game_logic::game_logic_service_client::GameLogicServiceClient;
|
||||
@@ -163,7 +156,7 @@ impl GameLogicClientManager {
|
||||
pub async fn get_nearby_objects(
|
||||
&self,
|
||||
map_id: u32,
|
||||
client_id: u32,
|
||||
client_id: &str,
|
||||
x: f32,
|
||||
y: f32,
|
||||
z: f32,
|
||||
@@ -173,7 +166,7 @@ impl GameLogicClientManager {
|
||||
if let Some(client) = clients.get_mut(&map_id) {
|
||||
if let Some(service_client) = &mut client.service_client {
|
||||
let request = game_logic::NearbyObjectsRequest {
|
||||
client_id,
|
||||
client_id: client_id.parse().unwrap(),
|
||||
x,
|
||||
y,
|
||||
z,
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
mod k8s_orchestrator;
|
||||
mod world_service;
|
||||
mod game_logic_client;
|
||||
mod proto;
|
||||
|
||||
use dotenv::dotenv;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{sleep, Duration, timeout};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::transport::Server;
|
||||
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_endpoints_by_dns};
|
||||
use utils::{health_check, logging};
|
||||
@@ -14,8 +16,7 @@ use tracing::{debug, error, info, warn};
|
||||
use crate::k8s_orchestrator::K8sOrchestrator;
|
||||
use crate::world_service::{MyWorldService, MyWorldGameLogicService};
|
||||
use crate::game_logic_client::GameLogicClientManager;
|
||||
use world_service::world::world_service_server::WorldServiceServer;
|
||||
use world_service::world::world_game_logic_service_server::WorldGameLogicServiceServer;
|
||||
use crate::proto::world::{GameLogicEvent, world_service_server::WorldServiceServer, world_game_logic_service_server::WorldGameLogicServiceServer};
|
||||
|
||||
fn get_service_name() -> String {
|
||||
env::var("WORLD_SERVICE_NAME").unwrap_or_else(|_| "default-service".to_string())
|
||||
@@ -137,7 +138,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create game logic client manager and world service
|
||||
let game_logic_manager = Arc::new(GameLogicClientManager::new());
|
||||
let world_service_shared = Arc::new(MyWorldService::new_with_game_logic_manager(game_logic_manager.clone()));
|
||||
let world_service = MyWorldService::new_with_game_logic_manager(game_logic_manager.clone());
|
||||
let world_service = world_service_shared.clone();
|
||||
let world_game_logic_service = MyWorldGameLogicService::new(world_service_shared.clone());
|
||||
|
||||
// Get retry configuration for connection info
|
||||
@@ -262,6 +263,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
error!("No game logic instances could be connected! Service may not function properly.");
|
||||
} else {
|
||||
info!("World service startup completed with {} active game logic connections", successful_connections);
|
||||
|
||||
// Start bidirectional streaming for all connected game logic services
|
||||
debug!("Starting bidirectional event streams for {} game logic connections...", successful_connections);
|
||||
let connected_maps = game_logic_manager.list_connected_maps().await;
|
||||
|
||||
for map_id in connected_maps {
|
||||
// Create channels for bidirectional communication
|
||||
let (outbound_tx, outbound_rx): (mpsc::UnboundedSender<GameLogicEvent>, mpsc::UnboundedReceiver<GameLogicEvent>) = mpsc::unbounded_channel();
|
||||
|
||||
// Start the event stream
|
||||
match game_logic_manager.start_event_stream(map_id, outbound_rx).await {
|
||||
Ok(inbound_rx) => {
|
||||
debug!("Successfully started event stream for map {}", map_id);
|
||||
|
||||
// Add the game logic connection to the world service
|
||||
world_service_shared.add_game_logic_connection(map_id, outbound_tx.clone());
|
||||
|
||||
// Spawn a task to handle incoming events from game logic
|
||||
let world_service_clone = world_service_shared.clone();
|
||||
let game_logic_manager_clone = game_logic_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut inbound_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(inbound_rx);
|
||||
while let Some(event) = inbound_stream.next().await {
|
||||
match &event.event {
|
||||
_ => {
|
||||
debug!("Received event from game logic service for map {}: {:?}", map_id, event);
|
||||
// TODO: Process the event and potentially broadcast to clients
|
||||
}
|
||||
}
|
||||
}
|
||||
warn!("Event stream from game logic service for map {} ended", map_id);
|
||||
|
||||
// TODO: Implement reconnection logic here
|
||||
// For now, just log the disconnection
|
||||
error!("Game logic stream for map {} disconnected, reconnection not yet implemented", map_id);
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to start event stream for map {}: {}", map_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Event stream initialization completed");
|
||||
}
|
||||
|
||||
// Set the gRPC server address
|
||||
@@ -291,7 +336,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let server_task = tokio::spawn(async move {
|
||||
let server = Server::builder()
|
||||
.add_service(WorldServiceServer::new(world_service))
|
||||
.add_service(WorldServiceServer::new((*world_service).clone()))
|
||||
.add_service(WorldGameLogicServiceServer::new(world_game_logic_service))
|
||||
.serve_with_shutdown(grpc_addr, async {
|
||||
shutdown_rx.await.ok();
|
||||
|
||||
9
world-service/src/proto.rs
Normal file
9
world-service/src/proto.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
// Shared proto definitions to avoid duplication across modules
|
||||
|
||||
pub mod world {
|
||||
tonic::include_proto!("world");
|
||||
}
|
||||
|
||||
pub mod game_logic {
|
||||
tonic::include_proto!("game_logic");
|
||||
}
|
||||
@@ -6,15 +6,9 @@ use tonic::{Request, Response, Status, Streaming};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use crate::game_logic_client::{GameLogicClientManager, game_logic as client_game_logic};
|
||||
|
||||
pub mod world {
|
||||
tonic::include_proto!("world");
|
||||
}
|
||||
|
||||
pub mod game_logic {
|
||||
tonic::include_proto!("game_logic");
|
||||
}
|
||||
use crate::game_logic_client::GameLogicClientManager;
|
||||
use crate::proto::game_logic as client_game_logic;
|
||||
use crate::proto::{world, game_logic};
|
||||
|
||||
use world::world_service_server::WorldService;
|
||||
use world::world_game_logic_service_server::WorldGameLogicService;
|
||||
@@ -26,6 +20,16 @@ pub struct MyWorldService {
|
||||
pub game_logic_manager: Option<Arc<GameLogicClientManager>>,
|
||||
}
|
||||
|
||||
impl Clone for MyWorldService {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
client_connections: self.client_connections.clone(),
|
||||
game_logic_connections: self.game_logic_connections.clone(),
|
||||
game_logic_manager: self.game_logic_manager.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ClientConnection {
|
||||
pub session_id: String,
|
||||
pub client_id: String,
|
||||
@@ -139,10 +143,10 @@ impl MyWorldService {
|
||||
// Get client_id from the client connection
|
||||
let client_id = {
|
||||
let connections = self.client_connections.lock().unwrap();
|
||||
connections.get(session_id).map(|conn| conn.client_id.parse::<u32>().unwrap_or(0)).unwrap_or(0)
|
||||
connections.get(session_id).map(|conn| conn.client_id.clone()).unwrap_or("".to_string())
|
||||
};
|
||||
|
||||
match game_logic_manager.get_nearby_objects(map_id as u32, client_id, x, y, z).await {
|
||||
match game_logic_manager.get_nearby_objects(map_id as u32, &client_id, x, y, z).await {
|
||||
Ok(response) => {
|
||||
debug!("Received {} nearby objects from game logic service", response.objects.len());
|
||||
|
||||
@@ -396,6 +400,29 @@ impl WorldService for MyWorldService {
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let player_connect_event = world::GameLogicEvent {
|
||||
client_ids: vec![],
|
||||
map_id: client_event.map_id,
|
||||
event: Some(world::game_logic_event::Event::PlayerConnect(world::PlayerConnectEvent {
|
||||
session_id: client_event.session_id.clone(),
|
||||
client_id: client_event.client_id.clone(),
|
||||
map_id: client_event.map_id,
|
||||
x: connect_event.x,
|
||||
y: connect_event.y,
|
||||
z: connect_event.z,
|
||||
})),
|
||||
};
|
||||
|
||||
// Send to game logic service for the appropriate map
|
||||
let game_logic_connections = game_logic_connections.lock().unwrap();
|
||||
if let Some(connection) = game_logic_connections.get(&(client_event.map_id as u32)) {
|
||||
if let Err(e) = connection.sender.send(player_connect_event) {
|
||||
warn!("Failed to send player connect event to game logic for map {}: {}", client_event.map_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send all nearby objects to the client
|
||||
// Create a service instance with the proper game logic manager
|
||||
let world_service = MyWorldService {
|
||||
@@ -415,19 +442,26 @@ impl WorldService for MyWorldService {
|
||||
debug!("Client {} moved to ({}, {}, {})", client_event.client_id, move_event.x, move_event.y, move_event.z);
|
||||
|
||||
// Update client position
|
||||
{
|
||||
let actual_map_id = {
|
||||
let mut connections = client_connections.lock().unwrap();
|
||||
if let Some(connection) = connections.get_mut(&client_event.session_id) {
|
||||
connection.x = move_event.x;
|
||||
connection.y = move_event.y;
|
||||
connection.z = move_event.z;
|
||||
connection.map_id
|
||||
} else {
|
||||
warn!("No connection found for session_id: {}", client_event.session_id);
|
||||
return; // Skip processing if no connection found
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Using actual map_id {} for client {} (client_event.map_id was {})",
|
||||
actual_map_id, client_event.client_id, client_event.map_id);
|
||||
|
||||
// Send PlayerMoveEvent to game logic service
|
||||
let player_move_event = world::GameLogicEvent {
|
||||
client_ids: vec![],
|
||||
map_id: client_event.map_id,
|
||||
map_id: actual_map_id,
|
||||
event: Some(world::game_logic_event::Event::PlayerMove(world::PlayerMoveEvent {
|
||||
session_id: client_event.session_id.clone(),
|
||||
client_id: client_event.client_id.clone(),
|
||||
@@ -439,10 +473,15 @@ impl WorldService for MyWorldService {
|
||||
|
||||
// Send to game logic service for the appropriate map
|
||||
let game_logic_connections = game_logic_connections.lock().unwrap();
|
||||
if let Some(connection) = game_logic_connections.get(&(client_event.map_id as u32)) {
|
||||
let target_map_id = actual_map_id as u32;
|
||||
|
||||
if let Some(connection) = game_logic_connections.get(&target_map_id) {
|
||||
if let Err(e) = connection.sender.send(player_move_event) {
|
||||
warn!("Failed to send player move event to game logic for map {}: {}", client_event.map_id, e);
|
||||
}
|
||||
} else {
|
||||
warn!("No game logic connection found for map {} (target_map_id: {}).",
|
||||
actual_map_id, target_map_id);
|
||||
}
|
||||
}
|
||||
Some(client_event::Event::MapChange(map_change_event)) => {
|
||||
|
||||
Reference in New Issue
Block a user