- update: packet router to have the various services needed for the packets to be local to it.
- add: character service grpc client calls
This commit is contained in:
@@ -4,35 +4,84 @@ use crate::packet::Packet;
|
||||
use crate::packet_type::PacketType;
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
use crate::bufferpool::BufferPool;
|
||||
use crate::character_client::CharacterClient;
|
||||
use crate::connection_service::ConnectionService;
|
||||
use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED};
|
||||
|
||||
pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>, connection_service: Arc<ConnectionService>, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
debug!("Routing packet: {:?}", packet);
|
||||
match packet.packet_type {
|
||||
PacketType::PakcsAlive => Ok(()),
|
||||
// Generic Server Packets
|
||||
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
|
||||
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, auth_client, connection_service, connection_id).await,
|
||||
// Login Packets
|
||||
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, auth_client, connection_service, connection_id, stream.peer_addr()?).await,
|
||||
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, auth_client, connection_service, connection_id).await,
|
||||
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, connection_service, connection_id).await,
|
||||
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
|
||||
|
||||
// Character Packets
|
||||
PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, connection_service, connection_id).await,
|
||||
PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, connection_service, connection_id).await,
|
||||
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, connection_service, connection_id).await,
|
||||
PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, connection_service, connection_id).await,
|
||||
|
||||
// 1 => chat::handle_chat(packet).await?,
|
||||
// 2 => movement::handle_movement(packet).await?,
|
||||
_ => {
|
||||
warn!("Unhandled packet type: {:?}", packet.packet_type);
|
||||
Ok(())
|
||||
},
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PacketRouter {
|
||||
pub auth_client: Arc<Mutex<AuthClient>>,
|
||||
pub character_client: Arc<Mutex<CharacterClient>>,
|
||||
pub connection_service: Arc<ConnectionService>,
|
||||
}
|
||||
|
||||
impl PacketRouter {
|
||||
pub async fn handle_connection(&self, stream: &mut TcpStream, pool: Arc<BufferPool>, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
ACTIVE_CONNECTIONS.inc();
|
||||
while let Some(mut buffer) = pool.acquire().await {
|
||||
// Read data into the buffer
|
||||
let n = stream.read(&mut buffer).await?;
|
||||
if n == 0 {
|
||||
break; // Connection closed
|
||||
}
|
||||
PACKETS_RECEIVED.inc();
|
||||
|
||||
// Process the packet
|
||||
match Packet::from_raw(&buffer[..n]) {
|
||||
Ok(packet) => {
|
||||
debug!("Parsed Packet: {:?}", packet);
|
||||
// Handle the parsed packet (route it, process it, etc.)
|
||||
self.route_packet(stream, packet, connection_id.clone()).await?;
|
||||
}
|
||||
Err(e) => warn!("Failed to parse packet: {}", e),
|
||||
}
|
||||
|
||||
pool.release(buffer).await;
|
||||
}
|
||||
|
||||
if let Some(state) = self.connection_service.get_connection(&connection_id) {
|
||||
let session_id = state.session_id.unwrap_or_default();
|
||||
if !session_id.is_empty() {
|
||||
let mut auth_client = self.auth_client.lock().await;
|
||||
auth_client.logout(&session_id).await?;
|
||||
} else {
|
||||
warn!("No session found for {}", stream.peer_addr()?);
|
||||
}
|
||||
}
|
||||
ACTIVE_CONNECTIONS.dec();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn route_packet(&self, stream: &mut TcpStream, packet: Packet, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
debug!("Routing packet: {:?}", packet);
|
||||
match packet.packet_type {
|
||||
PacketType::PakcsAlive => Ok(()),
|
||||
// Generic Server Packets
|
||||
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
|
||||
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
// Login Packets
|
||||
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await,
|
||||
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
|
||||
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
|
||||
|
||||
// Character Packets
|
||||
PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
|
||||
|
||||
// 1 => chat::handle_chat(packet).await?,
|
||||
// 2 => movement::handle_movement(packet).await?,
|
||||
_ => {
|
||||
warn!("Unhandled packet type: {:?}", packet.packet_type);
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user