Updated handlers by spliting the TcpStream in half to allow reading and writing data at the same time.
This fixes an issue where you are unable to get chat messages until the client sends a packet to the server

Fixed client id's by adding the id manager
Added shout chat handling
This commit is contained in:
2025-06-07 00:36:02 -04:00
parent d4dadf5170
commit aa2be43f4e
17 changed files with 480 additions and 157 deletions

View File

@@ -8,15 +8,21 @@ pub struct GuildChat;
impl ChatChannel for GuildChat {
fn handle_message(&self, message: ChatMessage, sender_id: &str, clients: &Clients) {
// This is a placeholder. In a real implementation, verify
// guild membership by consulting your Character or Guild service.
let clients_lock = clients.lock().unwrap();
for (id, tx) in clients_lock.iter() {
// For demonstration, send only to clients whose IDs contain
// "guild". Replace this logic with your actual membership check.
if id != sender_id && id.contains("guild") {
let _ = tx.try_send(message.clone());
}
}
//TODO: Make sure the clients actually are apart of the same guild.
let new_message = ChatMessage {
client_id: sender_id.to_string(),
r#type: message.r#type,
message: message.message,
target_id: message.target_id,
sender: message.sender,
};
// let clients_lock = clients.lock().unwrap();
// for (id, tx) in clients_lock.iter() {
// if id != sender_id && id.contains("guild") {
// let _ = tx.try_send(new_message.clone());
// }
// }
}
}

View File

@@ -9,14 +9,18 @@ pub struct LocalChat;
impl ChatChannel for LocalChat {
fn handle_message(&self, message: ChatMessage, sender_id: &str, clients: &Clients) {
// In a full implementation, you might query for nearby clients.
// For demo purposes, we simply broadcast to all clients except the sender.
debug!("LocalChat::handle_message: {:?}", message);
let new_message = ChatMessage {
client_id: sender_id.to_string(), // Make sure the client isn't attempting bad things
r#type: message.r#type,
message: message.message,
target_id: message.target_id,
sender: message.sender,
};
let clients_lock = clients.lock().unwrap();
for (id, tx) in clients_lock.iter() {
// if id != sender_id {
let _ = tx.try_send(message.clone());
// }
let _ = tx.try_send(new_message.clone());
}
}
}

View File

@@ -8,13 +8,17 @@ pub struct ShoutChat;
impl ChatChannel for ShoutChat {
fn handle_message(&self, message: ChatMessage, sender_id: &str, clients: &Clients) {
// For demo purposes, we simply broadcast to all clients except the sender.
// TODO: make sure the clients are on the same map
let new_message = ChatMessage {
client_id: sender_id.to_string(),
r#type: message.r#type,
message: message.message,
target_id: message.target_id,
sender: message.sender,
};
let clients_lock = clients.lock().unwrap();
for (id, tx) in clients_lock.iter() {
if id != sender_id {
let _ = tx.try_send(message.clone());
}
let _ = tx.try_send(new_message.clone());
}
}
}

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tonic::{Request, Response, Status};
use tonic::metadata::MetadataMap;
use tracing::debug;
pub mod common {
@@ -37,6 +38,18 @@ impl MyChatService {
}
}
fn get_authenticated_id(metadata: &MetadataMap) -> Result<String, Status> {
if let Some(client_id_val) = metadata.get("x-client-id") {
// Convert the header to a string.
client_id_val
.to_str()
.map(ToString::to_string)
.map_err(|_| Status::unauthenticated("Invalid client ID header"))
} else {
Err(Status::unauthenticated("Missing client ID header"))
}
}
#[tonic::async_trait]
impl ChatService for MyChatService {
type ChatStreamStream =
@@ -48,11 +61,9 @@ impl ChatService for MyChatService {
) -> Result<Response<Self::ChatStreamStream>, Status> {
debug!("New chat client connected");
debug!("request: {:?}", request);
let mut inbound = request.into_inner();
// Create a new client ID. In production, use authenticated IDs.
let client_id = format!("client-{}", uuid::Uuid::new_v4());
let client_id = get_authenticated_id(request.metadata())?;
let mut inbound = request.into_inner();
// Create a channel for sending outbound messages to this client.
let (tx, rx) = tokio::sync::mpsc::channel(32);

View File

@@ -1,23 +1,30 @@
use crate::connection_state::ConnectionState;
use crate::id_manager::IdManager;
use dashmap::DashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
use tokio::net::TcpStream;
use tokio::io::{WriteHalf};
#[derive(Clone, Debug)]
pub struct ConnectionService {
pub connections: Arc<DashMap<String, ConnectionState>>, // Map connection ID to state
pub id_manager: Arc<Mutex<IdManager>>
}
impl ConnectionService {
pub fn new() -> Self {
Self {
connections: Arc::new(DashMap::new()),
id_manager: Arc::new(Mutex::new(IdManager::new())),
}
}
pub fn add_connection(&self) -> String {
pub fn add_connection(&self, writer: Arc<tokio::sync::Mutex<WriteHalf<TcpStream>>>) -> String {
let connection_id = Uuid::new_v4().to_string();
self.connections.insert(connection_id.clone(), ConnectionState::new());
let mut connection_state = ConnectionState::new();
connection_state.writer = Some(writer);
self.connections.insert(connection_id.clone(), connection_state);
connection_id
}
@@ -35,4 +42,14 @@ impl ConnectionService {
pub fn remove_connection(&self, connection_id: &str) {
self.connections.remove(connection_id);
}
pub fn next_id(&self) -> u16 {
let mut manager = self.id_manager.lock().unwrap();
manager.get_free_id()
}
pub fn free_id(&self, id: u16) {
let mut manager = self.id_manager.lock().unwrap();
manager.release_id(id);
}
}

View File

@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::fmt;
use tokio::net::TcpStream;
use tokio::io::{WriteHalf};
use crate::handlers::chat_client::ChatClientHandler;
@@ -9,9 +11,12 @@ pub struct ConnectionState {
pub user_id: Option<String>,
pub session_id: Option<String>,
pub character_id: Option<i8>,
pub character_name: Option<String>,
pub character_list: Option<Vec<u32>>,
pub additional_data: HashMap<String, String>, // Flexible data storage
pub client_id: u16,
pub chat_handler: Option<Arc<ChatClientHandler>>,
pub writer: Option<Arc<tokio::sync::Mutex<WriteHalf<TcpStream>>>>,
}
impl ConnectionState {
@@ -20,9 +25,12 @@ impl ConnectionState {
user_id: None,
session_id: None,
character_id: None,
character_name: None,
character_list: None,
additional_data: HashMap::new(),
client_id: 0,
chat_handler: None,
writer: None,
}
}
}
@@ -33,9 +41,12 @@ impl fmt::Debug for ConnectionState {
.field("user_id", &self.user_id)
.field("session_id", &self.session_id)
.field("character_id", &self.character_id)
.field("character_name", &self.character_name)
.field("character_list", &self.character_list)
.field("additional_data", &self.additional_data)
.field("client_id", &self.client_id)
.field("chat_handler", &self.chat_handler.as_ref().map(|_| "<chat handler>"))
.field("writer", &self.writer.as_ref().map(|_| "<writer>"))
.finish()
}
}

View File

@@ -20,7 +20,6 @@ use crate::handlers::chat::create_chat_client_handler;
use crate::handlers::chat_client::ChatClientHandler;
pub(crate) async fn handle_alive_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
@@ -42,8 +41,9 @@ pub(crate) async fn handle_alive_req(
}
pub(crate) async fn handle_accept_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::srv_accept_reply::SrvAcceptReply;
let data = SrvAcceptReply {
@@ -52,13 +52,15 @@ pub(crate) async fn handle_accept_req(
};
let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_join_server_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
@@ -82,19 +84,26 @@ pub(crate) async fn handle_join_server_req(
pay_flag: 0,
};
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
return Err("Session not valid".into());
}
let client_id = state.client_id.clone();
let data = SrvJoinServerReply {
result: srv_join_server_reply::Result::Ok,
id: 1,
id: client_id as u32,
pay_flag: 0,
};
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
} else {
Err("Unable to find connection state".into())
@@ -102,7 +111,6 @@ pub(crate) async fn handle_join_server_req(
}
pub(crate) async fn handle_logout_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
@@ -116,9 +124,12 @@ pub(crate) async fn handle_logout_req(
let data = SrvLogoutReply { wait_time: 1 };
let response_packet = Packet::new(PacketType::PakwcLogoutReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
locked_stream.shutdown().await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
locked_stream.shutdown().await?;
}
Ok(())
} else {
Err("Unable to find connection state".into())
@@ -126,7 +137,6 @@ pub(crate) async fn handle_logout_req(
}
pub(crate) async fn handle_login_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
@@ -152,14 +162,20 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
} else {
debug!("Successfully logged in");
let mut client_id = 0;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
state.user_id = Some(response.user_id);
state.session_id = Some(response.session_id.clone());
client_id = connection_service.next_id();
state.client_id = client_id;
}
let chat_url = format!(
@@ -171,10 +187,13 @@ pub(crate) async fn handle_login_req(
.unwrap()
);
let handler = ChatClientHandler::new(chat_url, connection_id.clone(), response.session_id.clone()).await?;
let handler = ChatClientHandler::new(chat_url, client_id.to_string(), response.session_id.clone()).await?;
let chat_handler = Arc::new(handler);
create_chat_client_handler(stream.clone(), chat_handler.clone()).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
create_chat_client_handler(writer_clone, chat_handler.clone()).await?;
}
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
state.chat_handler = Some(chat_handler);
@@ -216,8 +235,11 @@ pub(crate) async fn handle_login_req(
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
}
Err(err) => {
@@ -229,8 +251,11 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
return Ok(());
}
}
@@ -249,8 +274,11 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
Code::Unavailable => {
warn!("Login failed: Service is unavailable");
@@ -261,8 +289,11 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
_ => {
error!("Unexpected error: {}", tonic_status.message());
@@ -273,8 +304,11 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
}
}
@@ -285,7 +319,6 @@ pub(crate) async fn handle_login_req(
}
pub(crate) async fn handle_server_select_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
@@ -313,14 +346,18 @@ pub(crate) async fn handle_server_select_req(
};
let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_channel_list_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_channel_list_req::CliChannelListReq;
use crate::packets::srv_channel_list_reply::{ChannelInfo, SrvChannelListReply};
@@ -356,8 +393,11 @@ pub(crate) async fn handle_channel_list_req(
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
}
Err(err) => {
@@ -366,8 +406,11 @@ pub(crate) async fn handle_channel_list_req(
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
return Ok(());
}
}

View File

@@ -68,7 +68,6 @@ pub(crate) fn convert_type_to_body_part(slot: i32) -> ItemType {
}
pub(crate) async fn handle_char_list_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -130,14 +129,16 @@ pub(crate) async fn handle_char_list_req(
let data = SrvCharListReply { characters };
let response_packet = Packet::new(PacketType::PakccCharListReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_create_char_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -179,14 +180,16 @@ pub(crate) async fn handle_create_char_req(
let data = SrvCreateCharReply { result, platininum: 0 };
let response_packet = Packet::new(PacketType::PakccCreateCharReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_delete_char_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -222,14 +225,16 @@ pub(crate) async fn handle_delete_char_req(
name: character_name,
};
let response_packet = Packet::new(PacketType::PakccDeleteCharReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_select_char_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -260,8 +265,9 @@ pub(crate) async fn handle_select_char_req(
ip: NullTerminatedString("".to_string()),
};
let response_packet = Packet::new(PacketType::PakccSwitchServer, &data)?;
{
let mut locked_stream = stream.lock().await;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
@@ -283,7 +289,11 @@ pub(crate) async fn handle_select_char_req(
core::array::from_fn(|i| EquippedItem::default());
let mut inventory: [srv_inventory_data::Item; (MAX_ITEMS as usize)] =
core::array::from_fn(|i| srv_inventory_data::Item::default());
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
state.character_name = Some(character.name.clone());
}
// Build the character learned skill list
let mut skill_list: [u16; (MAX_SKILL_COUNT as usize)] = [0u16; MAX_SKILL_COUNT as usize];
for index in 0..skills.len() {
@@ -372,8 +382,9 @@ pub(crate) async fn handle_select_char_req(
name,
};
let response_packet = Packet::new(PacketType::PakwcSelectCharReply, &data)?;
{
let mut locked_stream = stream.lock().await;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
@@ -383,8 +394,9 @@ pub(crate) async fn handle_select_char_req(
items: inventory,
};
let response_packet = Packet::new(PacketType::PakwcInventoryData, &data)?;
{
let mut locked_stream = stream.lock().await;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
@@ -404,8 +416,9 @@ pub(crate) async fn handle_select_char_req(
wishlist,
};
let response_packet = Packet::new(PacketType::PakwcQuestData, &data)?;
{
let mut locked_stream = stream.lock().await;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
@@ -415,8 +428,9 @@ pub(crate) async fn handle_select_char_req(
pay_flag: 2,
};
let response_packet = Packet::new(PacketType::PakwcBillingMessage, &data)?;
{
let mut locked_stream = stream.lock().await;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}

View File

@@ -5,6 +5,7 @@ use crate::packet_type::PacketType;
use chrono::{Local, Timelike};
use std::error::Error;
use std::sync::Arc;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tonic::transport::Channel;
@@ -15,7 +16,7 @@ use crate::handlers::chat_client::chat::{ChatMessage, MessageType};
use crate::handlers::chat_client::ChatClientHandler;
pub async fn create_chat_client_handler(
stream_for_task: Arc<Mutex<TcpStream>>,
stream_for_task: Arc<Mutex<WriteHalf<TcpStream>>>,
task_chat_handler: Arc<ChatClientHandler>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::srv_normal_chat::SrvNormalChat;
@@ -31,21 +32,20 @@ pub async fn create_chat_client_handler(
let mut rx = task_chat_handler.inbound_rx.lock().await;
while let Some(chat_msg) = rx.recv().await {
debug!("Packet-Service received chat message: {} (client_id: {}, type {})", chat_msg.message, chat_msg.client_id, chat_msg.r#type);
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
match chat_msg.r#type {
1 => {
// Normal Chat
let data = SrvNormalChat {
char_id: chat_msg.client_id.parse().unwrap_or(696),
char_id: chat_msg.client_id.parse().unwrap(),
message: NullTerminatedString(chat_msg.message),
};
// Send the packet to the client
let response_packet = Packet::new(PacketType::PakwcNormalChat, &data);
debug!("Attempting to send normal chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send normal chat: {:?}", e);
@@ -54,11 +54,14 @@ pub async fn create_chat_client_handler(
2 => {
// Shout Chat
let data = SrvShoutChat {
sender: Default::default(),
sender: NullTerminatedString(chat_msg.sender),
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcShoutChat, &data);
debug!("Attempting to send shout chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send shout chat: {:?}", e);
@@ -66,25 +69,80 @@ pub async fn create_chat_client_handler(
}
3 => {
// Party Chat
let data = SrvPartyChat {
char_id: chat_msg.client_id.parse().unwrap(),
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcPartyChat, &data);
debug!("Attempting to send party chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send party chat: {:?}", e);
}
}
4 => {
// Whisper Chat
let data = SrvWhisperChat {
sender: NullTerminatedString(chat_msg.sender),
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcWhisperChat, &data);
debug!("Attempting to send whisper chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send whisper chat: {:?}", e);
}
}
5 => {
// Clan Chat
let data = SrvClanChat {
sender: NullTerminatedString(chat_msg.sender),
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcClanChat, &data);
debug!("Attempting to send clan chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send clan chat: {:?}", e);
}
}
6 => {
// Allied Chat
let data = SrvAlliedChat {
team: 0,
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcAlliedChat, &data);
debug!("Attempting to send allied chat to client");
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send allied chat: {:?}", e);
}
}
_ => {
// Normal Chat
let data = SrvNormalChat {
char_id: 0,
char_id: chat_msg.client_id.parse().unwrap(),
message: NullTerminatedString(chat_msg.message),
};
// Send the packet to the client
let response_packet = Packet::new(PacketType::PakwcNormalChat, &data);
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send normal chat: {:?}", e);
@@ -101,7 +159,6 @@ pub async fn create_chat_client_handler(
}
pub(crate) async fn handle_normal_chat(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
@@ -114,23 +171,36 @@ pub(crate) async fn handle_normal_chat(
if let Some(mut state) = connection_service.get_connection(&connection_id) {
let user_id = state.user_id.clone().expect("Missing user id in connection state");
let message = ChatMessage {
client_id: crate::handlers::character::string_to_u32(&user_id).to_string(),
client_id: state.client_id.clone().to_string(),
r#type: MessageType::Normal as i32,
message: request.message.clone().0,
target_id: "".to_string(),
sender: state.character_name.clone().unwrap(),
};
state.chat_handler.unwrap().send_message(message).await;
}
Ok(())
}
pub(crate) async fn handle_shout_chat(
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_shout_chat::*;
use crate::packets::srv_shout_chat::*;
let request = CliShoutChat::decode(packet.payload.as_slice())?;
debug!("{:?}", request);
if let Some(mut state) = connection_service.get_connection(&connection_id) {
let message = ChatMessage {
client_id: state.client_id.clone().to_string(),
r#type: MessageType::Shout as i32,
message: request.message.clone().0,
target_id: "".to_string(),
sender: state.character_name.clone().unwrap(),
};
state.chat_handler.unwrap().send_message(message).await;
}
// We're not sending here because we should get a message back from the chat service
// let data = SrvNormalChat {
// char_id: 0,
// message: request.message,
// };
// let response_packet = Packet::new(PacketType::PakwcNormalChat, &data)?;
// let mut locked_stream = stream.lock().await;
// send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}

View File

@@ -3,6 +3,7 @@ use futures::StreamExt;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use std::error::Error;
use tracing::{debug, error};
pub mod chat {
tonic::include_proto!("chat");
@@ -61,17 +62,17 @@ impl ChatClientHandler {
// You might translate or process the chat_msg here,
// then forward it to your packet-service logic.
if let Err(e) = in_tx.send(chat_msg).await {
eprintln!("Failed to forward chat message: {:?}", e);
error!("Failed to forward chat message: {:?}", e);
break;
}
}
Err(e) => {
eprintln!("Error receiving chat stream message: {:?}", e);
error!("Error receiving chat stream message: {:?}", e);
break;
}
}
}
println!("Chat inbound stream closed");
debug!("Chat inbound stream closed");
});
Ok(Self {

View File

@@ -17,7 +17,6 @@ fn distance(x1: f64, y1: f64, x2: f64, y2: f64) -> u16 {
}
pub(crate) async fn handle_change_map_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -30,6 +29,7 @@ pub(crate) async fn handle_change_map_req(
let mut user_id = "".to_string();
let mut char_id = 0;
let mut client_id = 0;
let mut character_id_list: Vec<u32> = Vec::new();
let session_id;
if let Some(mut state) = connection_service.get_connection(&connection_id) {
@@ -37,6 +37,7 @@ pub(crate) async fn handle_change_map_req(
session_id = state.session_id.expect("Missing session id in connection state");
char_id = state.character_id.expect("Missing character id in connection state");
character_id_list = state.character_list.clone().expect("Missing character id list");
client_id = state.client_id;
}
let mut character_client = character_client.lock().await;
@@ -51,7 +52,7 @@ pub(crate) async fn handle_change_map_req(
let time_as_u16 = (now.hour() * 100 + now.minute()) as u16;
let data = SrvChangeMapReply {
object_index: character_id_list[char_id as usize] as u16,
object_index: client_id,
hp: stats.hp as u16,
mp: stats.mp as u16,
xp: stats.xp as u16,
@@ -66,14 +67,16 @@ pub(crate) async fn handle_change_map_req(
team_number: 10,
};
let response_packet = Packet::new(PacketType::PakwcChangeMapReply, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_mouse_cmd_req(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
@@ -84,14 +87,16 @@ pub(crate) async fn handle_mouse_cmd_req(
debug!("{:?}", request);
let mut char_id = 0;
let mut client_id = 0;
let mut character_id_list: Vec<u32> = Vec::new();
if let Some(mut state) = connection_service.get_connection(&connection_id) {
char_id = state.character_id.expect("Missing character id in connection state");
character_id_list = state.character_list.clone().expect("Missing character id list");
client_id = state.client_id;
}
let data = SrvMouseCmd {
id: character_id_list[char_id as usize] as u16,
id: client_id,
target_id: request.target_id,
distance: distance(520000 as f64, 520000 as f64, request.x as f64, request.y as f64),
x: request.x,
@@ -99,7 +104,88 @@ pub(crate) async fn handle_mouse_cmd_req(
z: request.z,
};
let response_packet = Packet::new(PacketType::PakwcMouseCmd, &data)?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_togggle_move_req(
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_toggle_move::*;
use crate::packets::srv_toggle_move::*;
let request = CliToggleMove::decode(packet.payload.as_slice())?;
debug!("{:?}", request);
let mut client_id = 0;
if let Some(mut state) = connection_service.get_connection(&connection_id) {
client_id = state.client_id;
}
let mut toggle_type = 0;;
{
use crate::packets::cli_toggle_move::ToggleMove;
match request.type_ {
ToggleMove::Run => { toggle_type = 2; }
ToggleMove::Sit => { toggle_type = 8; }
ToggleMove::Drive => { toggle_type = 9; }
}
}
use crate::packets::srv_toggle_move::ToggleMove;
let mut final_type = ToggleMove::Run;
{
match toggle_type {
0 => { final_type = ToggleMove::Run; }
1 => { final_type = ToggleMove::Sit; }
2 => { final_type = ToggleMove::Drive; }
_ => { final_type = ToggleMove::Run; }
}
}
let data = SrvToggleMove {
object_id: client_id,
type_: final_type,
run_speed: 0,
};
let response_packet = Packet::new(PacketType::PakwcToggleMove, &data)?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}
pub(crate) async fn handle_set_animation_req(
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_set_animation::*;
use crate::packets::srv_set_animation::*;
let request = CliSetAnimation::decode(packet.payload.as_slice())?;
debug!("{:?}", request);
let mut client_id = 0;
if let Some(mut state) = connection_service.get_connection(&connection_id) {
client_id = state.client_id;
}
let data = SrvSetAnimation {
id: request.id,
value: request.value,
object_id: client_id,
};
let response_packet = Packet::new(PacketType::PacwcSetAnimation, &data)?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
let writer_clone = state.writer.clone().unwrap();
let mut locked_stream = writer_clone.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}

View File

@@ -0,0 +1,61 @@
use std::collections::HashSet;
#[derive(Clone, Debug)]
pub struct IdManager {
free_ids: HashSet<u16>,
max_id: u16, // the first ID is 1
}
impl IdManager {
/// Creates a new IdManager with no free IDs and starting ID of 1.
pub fn new() -> Self {
Self {
free_ids: HashSet::new(),
max_id: 1,
}
}
/// Retrieves an available ID.
///
/// If any are available in the free_ids set, returns one of them.
/// Otherwise, returns a fresh ID and increments max_id.
pub fn get_free_id(&mut self) -> u16 {
if let Some(&id) = self.free_ids.iter().next() {
self.free_ids.remove(&id);
id
} else {
let id = self.max_id;
self.max_id += 1;
id
}
}
/// Releases an ID, making it available for reuse.
pub fn release_id(&mut self, id: u16) {
self.free_ids.insert(id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_id_manager() {
let mut manager = IdManager::new();
let id1 = manager.get_free_id();
let id2 = manager.get_free_id();
assert_eq!(id1, 1);
assert_eq!(id2, 2);
// Release id1 and then get a free id which should return id1
manager.release_id(id1);
let id3 = manager.get_free_id();
assert_eq!(id3, id1);
// Next free id should be id3 (old id2 was already used)
let id4 = manager.get_free_id();
assert_eq!(id4, 3);
}
}

View File

@@ -5,6 +5,7 @@ pub mod connection_state;
pub mod metrics;
pub mod packet;
pub mod packet_type;
pub mod id_manager;
pub mod handlers {
pub mod chat_client;
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, Semaphore};
use tokio::{select, signal};
use tokio::{io, select, signal};
use tracing::Level;
use tracing::{debug, error, info, warn};
use tracing_subscriber::EnvFilter;
@@ -40,6 +40,7 @@ mod packets;
mod router;
mod types;
mod interceptors;
mod id_manager;
pub mod common {
tonic::include_proto!("common");
@@ -111,14 +112,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = buffer_pool.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
let stream = Arc::new(Mutex::new(socket));
let peer_addr = socket.peer_addr().unwrap();
let (mut reader, writer) = io::split(socket);
let writer = Arc::new(tokio::sync::Mutex::new(writer));
// Spawn a new task for each connection
tokio::spawn(async move {
let _permit = permit;
let connection_id = packet_router.connection_service.add_connection();
let connection_id = packet_router.connection_service.add_connection(writer);
if let Err(e) = packet_router
.handle_connection(stream, pool, connection_id.clone())
.handle_connection(reader, pool, connection_id.clone(), peer_addr.to_string())
.await
{
error!("Error handling connection: {}", e);

View File

@@ -2,7 +2,7 @@ use crate::metrics::PACKETS_SENT;
use crate::packet_type::PacketType;
use bincode::{Decode, Encode};
use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncWriteExt, WriteHalf};
use tokio::net::TcpStream;
use tracing::{debug, error};
@@ -110,7 +110,7 @@ impl Packet {
}
}
pub async fn send_packet(stream: &mut TcpStream, packet: &Packet) -> Result<(), std::io::Error> {
pub async fn send_packet(stream: &mut WriteHalf<TcpStream>, packet: &Packet) -> Result<(), std::io::Error> {
let data = packet.to_raw();
debug!("Sending '{:#X}' bytes of data. {:?}", data.len(), data);
PACKETS_SENT.inc();

View File

@@ -8,7 +8,7 @@ use crate::packet::Packet;
use crate::packet_type::PacketType;
use std::error::Error;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tracing::{debug, warn};
@@ -23,30 +23,20 @@ pub struct PacketRouter {
impl PacketRouter {
pub async fn handle_connection(
&self,
stream: Arc<Mutex<TcpStream>>,
mut stream: ReadHalf<TcpStream>,
pool: Arc<BufferPool>,
connection_id: String,
peer_addr: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
ACTIVE_CONNECTIONS.inc();
while let Some(mut buffer) = pool.acquire().await {
// Read data into the buffer
let packet_size: usize;
{
let mut locked_stream = stream.lock().await;
locked_stream.read_exact(&mut buffer[..6]).await?;
// let mut header_handle = locked_stream.take(6);
// let n = header_handle.read(&mut buffer).await?;
// if n == 0 {
// break; // Connection closed
// }
stream.read_exact(&mut buffer[..6]).await?;
packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize;
if packet_size > 6 {
locked_stream.read_exact(&mut buffer[6..packet_size]).await?;
// let mut body_handle = locked_stream.take((packet_size - 6) as u64);
// let n = body_handle.read(&mut buffer[6..]).await?;
// if n == 0 {
// break; // Connection closed
// }
stream.read_exact(&mut buffer[6..packet_size]).await?;
}
}
@@ -57,8 +47,7 @@ impl PacketRouter {
match Packet::from_raw(&buffer[..packet_size]) {
Ok(packet) => {
debug!("Parsed Packet: {:?}", packet);
// Handle the parsed packet (route it, process it, etc.)
self.route_packet(stream.clone(), packet, connection_id.clone()).await?;
self.route_packet(packet, connection_id.clone()).await?;
}
Err(e) => warn!("Failed to parse packet: {}", e),
}
@@ -73,8 +62,7 @@ impl PacketRouter {
let mut auth_client = self.auth_client.lock().await;
auth_client.logout(&session_id).await?;
} else {
let mut locked_stream = stream.lock().await;
warn!("No session found for {}", locked_stream.peer_addr()?);
warn!("No session found for {}", peer_addr);
}
}
ACTIVE_CONNECTIONS.dec();
@@ -84,34 +72,36 @@ impl PacketRouter {
#[rustfmt::skip]
pub async fn route_packet(
&self,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
debug!("Routing packet: {:?}", packet);
match packet.packet_type {
// Generic Server Packets
PacketType::PakcsAlive => auth::handle_alive_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsAlive => auth::handle_alive_req( packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsAcceptReq => auth::handle_accept_req(packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
// Login Packets
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
PacketType::PakcsLoginTokenReq => auth::handle_login_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsLogoutReq => auth::handle_logout_req(packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(packet, self.connection_service.clone(), connection_id).await,
// Character Packets
PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsCharListReq => character::handle_char_list_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsCreateCharReq => character::handle_create_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSelectCharReq => character::handle_select_char_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
// World Packets
PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChangeMapReq => world::handle_change_map_req(packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsToggleMove => world::handle_togggle_move_req(packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsSetAnimation => world::handle_set_animation_req(packet, self.connection_service.clone(), connection_id).await,
// Chat Packets
PacketType::PakcsNormalChat => chat::handle_normal_chat(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsNormalChat => chat::handle_normal_chat(packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsShoutChat => chat::handle_shout_chat(packet, self.connection_service.clone(), connection_id).await,
// 1 => chat::handle_chat(packet).await?,
// 2 => movement::handle_movement(packet).await?,
_ => {

View File

@@ -23,4 +23,5 @@ message ChatMessage {
MessageType type = 2;
string message = 3;
string target_id = 4;
string sender = 5;
}