- fix: no longer fail to parse packet if multiple were received in one read call
- update: max packet size to match client
This commit is contained in:
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{Semaphore, Mutex};
|
use tokio::sync::{Semaphore, Mutex};
|
||||||
|
|
||||||
const MAX_PACKET_SIZE: usize = 1024;
|
const MAX_PACKET_SIZE: usize = 0xFFF;
|
||||||
|
|
||||||
pub struct BufferPool {
|
pub struct BufferPool {
|
||||||
buffers: Mutex<VecDeque<Vec<u8>>>,
|
buffers: Mutex<VecDeque<Vec<u8>>>,
|
||||||
|
|||||||
@@ -1,9 +1,18 @@
|
|||||||
use utils::null_string::NullTerminatedString;
|
use crate::auth_client::AuthClient;
|
||||||
|
use crate::character_client::CharacterClient;
|
||||||
|
use crate::connection_service::ConnectionService;
|
||||||
|
use crate::dataconsts::*;
|
||||||
|
use crate::enums;
|
||||||
use crate::packet::{send_packet, Packet, PacketPayload};
|
use crate::packet::{send_packet, Packet, PacketPayload};
|
||||||
use crate::packet_type::PacketType;
|
use crate::packet_type::PacketType;
|
||||||
use crate::packets::cli_char_list_req::CliCharListReq;
|
use crate::packets::cli_char_list_req::CliCharListReq;
|
||||||
|
use crate::packets::cli_create_char_req::CliCreateCharReq;
|
||||||
|
use crate::packets::cli_delete_char_req::CliDeleteCharReq;
|
||||||
|
use crate::packets::cli_select_char_req::CliSelectCharReq;
|
||||||
|
use crate::packets::srv_create_char_reply::SrvCreateCharReply;
|
||||||
|
use crate::packets::srv_delete_char_reply::SrvDeleteCharReply;
|
||||||
|
use crate::packets::srv_switch_server::SrvSwitchServer;
|
||||||
use crate::packets::*;
|
use crate::packets::*;
|
||||||
use crate::dataconsts::*;
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
@@ -12,15 +21,7 @@ use tokio::net::TcpStream;
|
|||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tonic::{Code, Status};
|
use tonic::{Code, Status};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use crate::auth_client::AuthClient;
|
use utils::null_string::NullTerminatedString;
|
||||||
use crate::character_client::CharacterClient;
|
|
||||||
use crate::connection_service::ConnectionService;
|
|
||||||
use crate::enums;
|
|
||||||
use crate::packets::cli_create_char_req::CliCreateCharReq;
|
|
||||||
use crate::packets::cli_delete_char_req::CliDeleteCharReq;
|
|
||||||
use crate::packets::cli_select_char_req::CliSelectCharReq;
|
|
||||||
use crate::packets::srv_create_char_reply::SrvCreateCharReply;
|
|
||||||
use crate::packets::srv_delete_char_reply::SrvDeleteCharReply;
|
|
||||||
|
|
||||||
fn convert_slot(slot: i32) -> srv_char_list_reply::EquippedPosition {
|
fn convert_slot(slot: i32) -> srv_char_list_reply::EquippedPosition {
|
||||||
match enums::EquippedPosition::from_i32(slot) {
|
match enums::EquippedPosition::from_i32(slot) {
|
||||||
@@ -32,7 +33,7 @@ fn convert_slot(slot: i32) -> srv_char_list_reply::EquippedPosition {
|
|||||||
Some(enums::EquippedPosition::Boots) => srv_char_list_reply::EquippedPosition::Boots,
|
Some(enums::EquippedPosition::Boots) => srv_char_list_reply::EquippedPosition::Boots,
|
||||||
Some(enums::EquippedPosition::WeaponR) => srv_char_list_reply::EquippedPosition::WeaponR,
|
Some(enums::EquippedPosition::WeaponR) => srv_char_list_reply::EquippedPosition::WeaponR,
|
||||||
Some(enums::EquippedPosition::WeaponL) => srv_char_list_reply::EquippedPosition::WeaponL,
|
Some(enums::EquippedPosition::WeaponL) => srv_char_list_reply::EquippedPosition::WeaponL,
|
||||||
_ => srv_char_list_reply::EquippedPosition::MaxItems
|
_ => srv_char_list_reply::EquippedPosition::MaxItems,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +53,7 @@ pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet,
|
|||||||
let mut character_client = character_client.lock().await;
|
let mut character_client = character_client.lock().await;
|
||||||
let character_list = character_client.get_character_list(&user_id.to_string()).await?;
|
let character_list = character_client.get_character_list(&user_id.to_string()).await?;
|
||||||
let mut characters = vec![];
|
let mut characters = vec![];
|
||||||
let mut character_id_list:Vec<u8> = Vec::new();
|
let mut character_id_list: Vec<u8> = Vec::new();
|
||||||
for character in character_list.characters {
|
for character in character_list.characters {
|
||||||
let mut item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = core::array::from_fn(|i| EquippedItem::default());
|
let mut item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] = core::array::from_fn(|i| EquippedItem::default());
|
||||||
|
|
||||||
@@ -90,7 +91,7 @@ pub(crate) async fn handle_char_list_req(stream: &mut TcpStream, packet: Packet,
|
|||||||
let data = SrvCharListReply { characters };
|
let data = SrvCharListReply { characters };
|
||||||
let response_packet = Packet::new(PacketType::PakccCharListReply, &data)?;
|
let response_packet = Packet::new(PacketType::PakccCharListReply, &data)?;
|
||||||
send_packet(stream, &response_packet).await?;
|
send_packet(stream, &response_packet).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,7 +105,7 @@ pub(crate) async fn handle_create_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
user_id = state.user_id.expect("Missing user id in connection state");
|
user_id = state.user_id.expect("Missing user id in connection state");
|
||||||
session_id = state.session_id.expect("Missing session id in connection state");
|
session_id = state.session_id.expect("Missing session id in connection state");
|
||||||
}
|
}
|
||||||
|
|
||||||
// send the data to the character service to create the character
|
// send the data to the character service to create the character
|
||||||
let mut character_client = character_client.lock().await;
|
let mut character_client = character_client.lock().await;
|
||||||
let create_character_response = character_client.create_character(&user_id.to_string(), request.name, request.race, request.face, request.hair, request.stone).await?;
|
let create_character_response = character_client.create_character(&user_id.to_string(), request.name, request.race, request.face, request.hair, request.stone).await?;
|
||||||
@@ -116,9 +117,9 @@ pub(crate) async fn handle_create_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
3 => srv_create_char_reply::Result::InvalidValue,
|
3 => srv_create_char_reply::Result::InvalidValue,
|
||||||
4 => srv_create_char_reply::Result::TooManyChars,
|
4 => srv_create_char_reply::Result::TooManyChars,
|
||||||
5 => srv_create_char_reply::Result::Blocked,
|
5 => srv_create_char_reply::Result::Blocked,
|
||||||
_ => srv_create_char_reply::Result::Failed
|
_ => srv_create_char_reply::Result::Failed,
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = SrvCreateCharReply { result, platininum: 0 };
|
let data = SrvCreateCharReply { result, platininum: 0 };
|
||||||
let response_packet = Packet::new(PacketType::PakccCreateCharReply, &data)?;
|
let response_packet = Packet::new(PacketType::PakccCreateCharReply, &data)?;
|
||||||
send_packet(stream, &response_packet).await?;
|
send_packet(stream, &response_packet).await?;
|
||||||
@@ -132,8 +133,8 @@ pub(crate) async fn handle_delete_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
|
|
||||||
let mut user_id = 0;
|
let mut user_id = 0;
|
||||||
let session_id;
|
let session_id;
|
||||||
let mut character_id_list:Vec<u8> = Vec::new();
|
let mut character_id_list: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
||||||
user_id = state.user_id.expect("Missing user id in connection state");
|
user_id = state.user_id.expect("Missing user id in connection state");
|
||||||
session_id = state.session_id.expect("Missing session id in connection state");
|
session_id = state.session_id.expect("Missing session id in connection state");
|
||||||
@@ -144,7 +145,10 @@ pub(crate) async fn handle_delete_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
let delete_response = character_client.delete_character(&user_id.to_string(), &character_id_list[request.char_id as usize].to_string(), request.is_delete as i32).await?;
|
let delete_response = character_client.delete_character(&user_id.to_string(), &character_id_list[request.char_id as usize].to_string(), request.is_delete as i32).await?;
|
||||||
|
|
||||||
let character_name = request.name;
|
let character_name = request.name;
|
||||||
let data = SrvDeleteCharReply { remaining_time: delete_response.remaining_time as u32, name: character_name };
|
let data = SrvDeleteCharReply {
|
||||||
|
remaining_time: delete_response.remaining_time as u32,
|
||||||
|
name: character_name,
|
||||||
|
};
|
||||||
let response_packet = Packet::new(PacketType::PakccDeleteCharReply, &data)?;
|
let response_packet = Packet::new(PacketType::PakccDeleteCharReply, &data)?;
|
||||||
send_packet(stream, &response_packet).await?;
|
send_packet(stream, &response_packet).await?;
|
||||||
|
|
||||||
@@ -155,19 +159,29 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
use crate::packets::srv_select_char_reply::*;
|
use crate::packets::srv_select_char_reply::*;
|
||||||
use crate::packets::srv_inventory_data::*;
|
use crate::packets::srv_inventory_data::*;
|
||||||
use crate::packets::srv_quest_data::*;
|
use crate::packets::srv_quest_data::*;
|
||||||
|
use crate::packets::srv_select_char_reply::*;
|
||||||
use crate::packets::srv_billing_message::*;
|
use crate::packets::srv_billing_message::*;
|
||||||
use crate::types::{HotbarItem, StatusEffect};
|
use crate::types::{HotbarItem, StatusEffect};
|
||||||
let request = CliSelectCharReq::decode(packet.payload.as_slice())?;
|
let request = CliSelectCharReq::decode(packet.payload.as_slice())?;
|
||||||
debug!("{:?}", request);
|
debug!("{:?}", request);
|
||||||
|
|
||||||
let mut user_id = 0;
|
let mut user_id = 0;
|
||||||
let mut character_id_list:Vec<u8> = Vec::new();
|
let mut character_id_list: Vec<u8> = Vec::new();
|
||||||
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
||||||
user_id = state.user_id.expect("Missing user id in connection state");
|
user_id = state.user_id.expect("Missing user id in connection state");
|
||||||
character_id_list = state.character_list.clone().expect("Missing character id list");
|
character_id_list = state.character_list.clone().expect("Missing character id list");
|
||||||
state.character_id = Some(request.char_id as i8);
|
state.character_id = Some(request.char_id as i8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let data = SrvSwitchServer {
|
||||||
|
port: 0,
|
||||||
|
session_id: 0,
|
||||||
|
session_seed: 0,
|
||||||
|
ip: NullTerminatedString("".to_string()),
|
||||||
|
};
|
||||||
|
let response_packet = Packet::new(PacketType::PakccSwitchServer, &data)?;
|
||||||
|
send_packet(stream, &response_packet).await?;
|
||||||
|
|
||||||
let mut character_client = character_client.lock().await;
|
let mut character_client = character_client.lock().await;
|
||||||
let character_data = character_client.get_character(&user_id.to_string(), character_id_list[request.char_id as usize]).await?;
|
let character_data = character_client.get_character(&user_id.to_string(), character_id_list[request.char_id as usize]).await?;
|
||||||
|
|
||||||
@@ -195,7 +209,7 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
header: srv_inventory_data::Header {
|
header: srv_inventory_data::Header {
|
||||||
type_: item.item_type as u8,
|
type_: item.item_type as u8,
|
||||||
id: item.item_id as u16,
|
id: item.item_id as u16,
|
||||||
is_created:item.is_created as u8,
|
is_created: item.is_created as u8,
|
||||||
},
|
},
|
||||||
data: srv_inventory_data::Data {
|
data: srv_inventory_data::Data {
|
||||||
gem_opt: item.gem_option as u16,
|
gem_opt: item.gem_option as u16,
|
||||||
@@ -214,8 +228,8 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
let data = SrvSelectCharReply {
|
let data = SrvSelectCharReply {
|
||||||
race: looks.race as u8,
|
race: looks.race as u8,
|
||||||
map: position.map_id as u16,
|
map: position.map_id as u16,
|
||||||
x: position.x,
|
x: position.x * 100.0,
|
||||||
y: position.y,
|
y: position.y * 100.0,
|
||||||
spawn: position.spawn_id as u16,
|
spawn: position.spawn_id as u16,
|
||||||
body_face: looks.face as u32,
|
body_face: looks.face as u32,
|
||||||
body_hair: looks.hair as u32,
|
body_hair: looks.hair as u32,
|
||||||
@@ -254,7 +268,7 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
pat_cooldown_time: stats.pat_cooldown_time as u32,
|
pat_cooldown_time: stats.pat_cooldown_time as u32,
|
||||||
skills: [0u16; (MAX_SKILL_COUNT as usize)],
|
skills: [0u16; (MAX_SKILL_COUNT as usize)],
|
||||||
hotbar: hotbar_list,
|
hotbar: hotbar_list,
|
||||||
tag: 0,
|
tag: user_id as u32,
|
||||||
name: request.name,
|
name: request.name,
|
||||||
};
|
};
|
||||||
let response_packet = Packet::new(PacketType::PakwcSelectCharReply, &data)?;
|
let response_packet = Packet::new(PacketType::PakwcSelectCharReply, &data)?;
|
||||||
@@ -281,7 +295,7 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
switches: [0; (MAX_SWITCHES as usize)],
|
switches: [0; (MAX_SWITCHES as usize)],
|
||||||
wishlist,
|
wishlist,
|
||||||
};
|
};
|
||||||
let response_packet = Packet::new(PacketType::PakwcInventoryData, &data)?;
|
let response_packet = Packet::new(PacketType::PakwcQuestData, &data)?;
|
||||||
send_packet(stream, &response_packet).await?;
|
send_packet(stream, &response_packet).await?;
|
||||||
|
|
||||||
// Send the billing message (we don't actually use this so we just send the defaults to allow)
|
// Send the billing message (we don't actually use this so we just send the defaults to allow)
|
||||||
@@ -293,4 +307,4 @@ pub(crate) async fn handle_select_char_req(stream: &mut TcpStream, packet: Packe
|
|||||||
send_packet(stream, &response_packet).await?;
|
send_packet(stream, &response_packet).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
use bincode::{Encode, Decode};
|
use crate::packet_type::PacketType;
|
||||||
|
use bincode::{Decode, Encode};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tracing::{debug, error};
|
use tracing::{debug, error};
|
||||||
use crate::packet_type::PacketType;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Packet {
|
pub struct Packet {
|
||||||
@@ -15,8 +15,7 @@ pub struct Packet {
|
|||||||
|
|
||||||
pub trait PacketPayload: Encode + Decode {
|
pub trait PacketPayload: Encode + Decode {
|
||||||
fn encode(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
|
fn encode(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let config = bincode::config::standard()
|
let config = bincode::config::standard().with_fixed_int_encoding();
|
||||||
.with_fixed_int_encoding();
|
|
||||||
Ok(bincode::encode_to_vec(self, config)?)
|
Ok(bincode::encode_to_vec(self, config)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -24,8 +23,7 @@ pub trait PacketPayload: Encode + Decode {
|
|||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
{
|
{
|
||||||
let config = bincode::config::standard()
|
let config = bincode::config::standard().with_fixed_int_encoding();
|
||||||
.with_fixed_int_encoding();
|
|
||||||
Ok(bincode::decode_from_slice(data, config)?.0)
|
Ok(bincode::decode_from_slice(data, config)?.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -47,6 +45,7 @@ impl Packet {
|
|||||||
|
|
||||||
pub fn from_raw(data: &[u8]) -> Result<Self, Box<dyn Error + Send + Sync>> {
|
pub fn from_raw(data: &[u8]) -> Result<Self, Box<dyn Error + Send + Sync>> {
|
||||||
if data.len() < 6 {
|
if data.len() < 6 {
|
||||||
|
error!("Invalid packet: Size too small {}", data.len());
|
||||||
return Err("Invalid packet: Too short".into());
|
return Err("Invalid packet: Too short".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,18 +55,26 @@ impl Packet {
|
|||||||
let packet_crc = u16::from_le_bytes(data[4..6].try_into()?);
|
let packet_crc = u16::from_le_bytes(data[4..6].try_into()?);
|
||||||
|
|
||||||
// Validate the size
|
// Validate the size
|
||||||
if packet_size as usize != data.len() {
|
if packet_size as usize > data.len() {
|
||||||
error!("Invalid packet: Size mismatch, expected: {}, actual: {}", packet_size, data.len());
|
error!(
|
||||||
|
"Invalid packet {:#X}: Size mismatch, expected: {}, received: {}",
|
||||||
|
raw_packet_type,
|
||||||
|
packet_size,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
return Err("Invalid packet: Size mismatch".into());
|
return Err("Invalid packet: Size mismatch".into());
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("size: {:#X}, raw_type: {:#X}, crc: {:#X}", packet_size, raw_packet_type, packet_crc);
|
debug!(
|
||||||
|
"size: {:#X}, raw_type: {:#X}, crc: {:#X}",
|
||||||
|
packet_size, raw_packet_type, packet_crc
|
||||||
|
);
|
||||||
|
|
||||||
// Convert raw packet type into `PacketType` enum
|
// Convert raw packet type into `PacketType` enum
|
||||||
let packet_type = PacketType::try_from(raw_packet_type)?;
|
let packet_type = PacketType::try_from(raw_packet_type)?;
|
||||||
|
|
||||||
// Extract the payload
|
// Extract the payload
|
||||||
let payload = data[6..].to_vec();
|
let payload = data[6..packet_size as usize].to_vec();
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
packet_size,
|
packet_size,
|
||||||
@@ -89,7 +96,7 @@ impl Packet {
|
|||||||
|
|
||||||
raw
|
raw
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse<T: PacketPayload>(&self) -> Result<T, Box<dyn Error + Send + Sync>> {
|
pub fn parse<T: PacketPayload>(&self) -> Result<T, Box<dyn Error + Send + Sync>> {
|
||||||
<T as PacketPayload>::decode(&self.payload)
|
<T as PacketPayload>::decode(&self.payload)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,14 +25,24 @@ impl PacketRouter {
|
|||||||
ACTIVE_CONNECTIONS.inc();
|
ACTIVE_CONNECTIONS.inc();
|
||||||
while let Some(mut buffer) = pool.acquire().await {
|
while let Some(mut buffer) = pool.acquire().await {
|
||||||
// Read data into the buffer
|
// Read data into the buffer
|
||||||
let n = stream.read(&mut buffer).await?;
|
let mut header_handle = stream.take(6);
|
||||||
|
let n = header_handle.read(&mut buffer).await?;
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
break; // Connection closed
|
break; // Connection closed
|
||||||
}
|
}
|
||||||
|
let packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize;
|
||||||
|
if packet_size > 6 {
|
||||||
|
let mut body_handle = stream.take((packet_size-6) as u64);
|
||||||
|
let n = body_handle.read(&mut buffer[6..]).await?;
|
||||||
|
if n == 0 {
|
||||||
|
break; // Connection closed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
PACKETS_RECEIVED.inc();
|
PACKETS_RECEIVED.inc();
|
||||||
|
|
||||||
// Process the packet
|
// Process the packet
|
||||||
match Packet::from_raw(&buffer[..n]) {
|
match Packet::from_raw(&buffer[..packet_size]) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
debug!("Parsed Packet: {:?}", packet);
|
debug!("Parsed Packet: {:?}", packet);
|
||||||
// Handle the parsed packet (route it, process it, etc.)
|
// Handle the parsed packet (route it, process it, etc.)
|
||||||
|
|||||||
Reference in New Issue
Block a user