use crate::auth_client::AuthClient; use crate::bufferpool::BufferPool; use crate::character_client::CharacterClient; use crate::connection_service::ConnectionService; use crate::handlers::*; use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED, PACKET_PROCESSING_TIME}; use crate::packet::Packet; use crate::packet_type::PacketType; use std::error::Error; use std::sync::Arc; use std::time::Instant; use tokio::io::{AsyncReadExt, ReadHalf}; use tokio::net::TcpStream; use tokio::sync::Mutex; use tracing::{debug, warn}; #[derive(Clone, Debug)] pub struct PacketRouter { pub auth_client: Arc>, pub character_client: Arc>, pub connection_service: Arc, } impl PacketRouter { pub async fn handle_connection( &self, mut stream: ReadHalf, pool: Arc, connection_id: String, peer_addr: String, ) -> Result<(), Box> { ACTIVE_CONNECTIONS.inc(); while let Some(mut buffer) = pool.acquire().await { // Read data into the buffer let packet_size: usize; { stream.read_exact(&mut buffer[..6]).await?; packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize; if packet_size > 6 { stream.read_exact(&mut buffer[6..packet_size]).await?; } } PACKETS_RECEIVED.inc(); // Process the packet let start = Instant::now(); match Packet::from_raw(&buffer[..packet_size]) { Ok(packet) => { debug!("Parsed Packet: {:?}", packet); self.route_packet(packet, connection_id.clone()).await?; } Err(e) => warn!("Failed to parse packet: {}", e), } let duration = start.elapsed(); PACKET_PROCESSING_TIME.observe(duration.as_secs_f64()); pool.release(buffer).await; } if let Some(state) = self.connection_service.get_connection(&connection_id) { let session_id = state.session_id.unwrap_or_default(); if !session_id.is_empty() { let mut auth_client = self.auth_client.lock().await; auth_client.logout(&session_id).await?; } else { warn!("No session found for {}", peer_addr); } } ACTIVE_CONNECTIONS.dec(); Ok(()) } #[rustfmt::skip] pub async fn route_packet( &self, packet: Packet, connection_id: String, ) -> Result<(), Box> { debug!("Routing packet: {:?}", packet); match packet.packet_type { // Generic Server Packets PacketType::PakcsAlive => auth::handle_alive_req( packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsAcceptReq => auth::handle_accept_req(packet, self.connection_service.clone(), connection_id).await, PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, // Login Packets PacketType::PakcsLoginTokenReq => auth::handle_login_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsLogoutReq => auth::handle_logout_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(packet, self.connection_service.clone(), connection_id).await, PacketType::PakcsChannelListReq => auth::handle_channel_list_req(packet, self.connection_service.clone(), connection_id).await, // Character Packets PacketType::PakcsCharListReq => character::handle_char_list_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsCreateCharReq => character::handle_create_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsSelectCharReq => character::handle_select_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, // World Packets PacketType::PakcsChangeMapReq => world::handle_change_map_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await, PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(packet, self.connection_service.clone(), connection_id).await, PacketType::PakcsToggleMove => world::handle_togggle_move_req(packet, self.connection_service.clone(), connection_id).await, PacketType::PakcsSetAnimation => world::handle_set_animation_req(packet, self.connection_service.clone(), connection_id).await, // Chat Packets PacketType::PakcsNormalChat => chat::handle_normal_chat(packet, self.connection_service.clone(), connection_id).await, PacketType::PakcsShoutChat => chat::handle_shout_chat(packet, self.connection_service.clone(), connection_id).await, // 1 => chat::handle_chat(packet).await?, // 2 => movement::handle_movement(packet).await?, _ => { warn!("Unhandled packet type: {:?}", packet.packet_type); Ok(()) } } } }