Files
osirose-new/packet-service/src/router.rs

114 lines
5.3 KiB
Rust

use crate::auth_client::AuthClient;
use crate::bufferpool::BufferPool;
use crate::character_client::CharacterClient;
use crate::connection_service::ConnectionService;
use crate::handlers::*;
use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED};
use crate::packet::Packet;
use crate::packet_type::PacketType;
use std::error::Error;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tracing::{debug, warn};
#[derive(Clone, Debug)]
pub struct PacketRouter {
pub auth_client: Arc<Mutex<AuthClient>>,
pub character_client: Arc<Mutex<CharacterClient>>,
pub connection_service: Arc<ConnectionService>,
}
impl PacketRouter {
pub async fn handle_connection(
&self,
stream: &mut TcpStream,
pool: Arc<BufferPool>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
ACTIVE_CONNECTIONS.inc();
while let Some(mut buffer) = pool.acquire().await {
// Read data into the buffer
let mut header_handle = stream.take(6);
let n = header_handle.read(&mut buffer).await?;
if n == 0 {
break; // Connection closed
}
let packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize;
if packet_size > 6 {
let mut body_handle = stream.take((packet_size - 6) as u64);
let n = body_handle.read(&mut buffer[6..]).await?;
if n == 0 {
break; // Connection closed
}
}
PACKETS_RECEIVED.inc();
// Process the packet
match Packet::from_raw(&buffer[..packet_size]) {
Ok(packet) => {
debug!("Parsed Packet: {:?}", packet);
// Handle the parsed packet (route it, process it, etc.)
self.route_packet(stream, packet, connection_id.clone())
.await?;
}
Err(e) => warn!("Failed to parse packet: {}", e),
}
pool.release(buffer).await;
}
if let Some(state) = self.connection_service.get_connection(&connection_id) {
let session_id = state.session_id.unwrap_or_default();
if !session_id.is_empty() {
let mut auth_client = self.auth_client.lock().await;
auth_client.logout(&session_id).await?;
} else {
warn!("No session found for {}", stream.peer_addr()?);
}
}
ACTIVE_CONNECTIONS.dec();
Ok(())
}
#[rustfmt::skip]
pub async fn route_packet(
&self,
stream: &mut TcpStream,
packet: Packet,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
debug!("Routing packet: {:?}", packet);
match packet.packet_type {
// Generic Server Packets
PacketType::PakcsAlive => auth::handle_alive_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
// Login Packets
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await,
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
// Character Packets
PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
// World Packets
PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await,
// 1 => chat::handle_chat(packet).await?,
// 2 => movement::handle_movement(packet).await?,
_ => {
warn!("Unhandled packet type: {:?}", packet.packet_type);
Ok(())
}
}
}
}