More work.

Added chat service
Updated packet service to pass the tcp stream around in a Arc type.
Updated character position data to not require multiplying the coords
Added more debug logs
Added an interceptor for gRPC comms with the chat server
Updated build and push script for the chat server changes
This commit is contained in:
2025-06-06 17:52:29 -04:00
parent 85d41c0239
commit ad6ba2c8e6
32 changed files with 787 additions and 92 deletions

View File

@@ -36,6 +36,8 @@ dashmap = "6.1.0"
uuid = { version = "1.11.0", features = ["v4"] }
chrono = "0.4.39"
prometheus_exporter = "0.8.5"
futures = "0.3.31"
tokio-stream = "0.1.17"
[build-dependencies]
tonic-build = "0.12.3"

View File

@@ -7,6 +7,7 @@ fn main() {
.compile_protos(
&[
"../proto/auth.proto",
"../proto/chat.proto",
"../proto/character.proto",
"../proto/character_common.proto",
],

View File

@@ -1,12 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::fmt;
#[derive(Clone, Debug)]
use crate::handlers::chat_client::ChatClientHandler;
#[derive(Clone)]
pub struct ConnectionState {
pub user_id: Option<String>,
pub session_id: Option<String>,
pub character_id: Option<i8>,
pub character_list: Option<Vec<u32>>,
pub additional_data: HashMap<String, String>, // Flexible data storage
pub chat_handler: Option<Arc<ChatClientHandler>>,
}
impl ConnectionState {
@@ -17,6 +22,20 @@ impl ConnectionState {
character_id: None,
character_list: None,
additional_data: HashMap::new(),
chat_handler: None,
}
}
}
impl fmt::Debug for ConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionState")
.field("user_id", &self.user_id)
.field("session_id", &self.session_id)
.field("character_id", &self.character_id)
.field("character_list", &self.character_list)
.field("additional_data", &self.additional_data)
.field("chat_handler", &self.chat_handler.as_ref().map(|_| "<chat handler>"))
.finish()
}
}

View File

@@ -2,16 +2,6 @@ use crate::auth_client::AuthClient;
use crate::connection_service::ConnectionService;
use crate::packet::{send_packet, Packet, PacketPayload};
use crate::packet_type::PacketType;
use crate::packets::cli_channel_list_req::CliChannelListReq;
use crate::packets::cli_join_server_token_req::CliJoinServerTokenReq;
use crate::packets::cli_login_token_req::CliLoginTokenReq;
use crate::packets::cli_srv_select_req::CliSrvSelectReq;
use crate::packets::srv_accept_reply::SrvAcceptReply;
use crate::packets::srv_channel_list_reply::{ChannelInfo, SrvChannelListReply};
use crate::packets::srv_join_server_reply::SrvJoinServerReply;
use crate::packets::srv_login_reply::{ServerInfo, SrvLoginReply};
use crate::packets::srv_logout_reply::SrvLogoutReply;
use crate::packets::srv_srv_select_reply::SrvSrvSelectReply;
use crate::packets::*;
use std::collections::HashMap;
use std::env;
@@ -26,9 +16,11 @@ use tracing::{debug, error, info, warn};
use utils::null_string::NullTerminatedString;
use utils::service_discovery;
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info};
use crate::handlers::chat::create_chat_client_handler;
use crate::handlers::chat_client::ChatClientHandler;
pub(crate) async fn handle_alive_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
@@ -50,26 +42,30 @@ pub(crate) async fn handle_alive_req(
}
pub(crate) async fn handle_accept_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::srv_accept_reply::SrvAcceptReply;
let data = SrvAcceptReply {
result: srv_accept_reply::Result::Accepted,
rand_value: 0,
};
let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_join_server_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_join_server_token_req::CliJoinServerTokenReq;
use crate::packets::srv_join_server_reply::SrvJoinServerReply;
let request = CliJoinServerTokenReq::decode(packet.payload.as_slice());
debug!("{:?}", request);
@@ -86,7 +82,8 @@ pub(crate) async fn handle_join_server_req(
pay_flag: 0,
};
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
return Err("Session not valid".into());
}
@@ -96,7 +93,8 @@ pub(crate) async fn handle_join_server_req(
pay_flag: 0,
};
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
} else {
Err("Unable to find connection state".into())
@@ -104,12 +102,13 @@ pub(crate) async fn handle_join_server_req(
}
pub(crate) async fn handle_logout_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::srv_logout_reply::SrvLogoutReply;
if let Some(mut state) = connection_service.get_connection(&connection_id) {
let session_id = state.session_id.clone().unwrap();
let mut auth_client = auth_client.lock().await;
@@ -117,8 +116,9 @@ pub(crate) async fn handle_logout_req(
let data = SrvLogoutReply { wait_time: 1 };
let response_packet = Packet::new(PacketType::PakwcLogoutReply, &data)?;
send_packet(stream, &response_packet).await?;
stream.shutdown().await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
locked_stream.shutdown().await?;
Ok(())
} else {
Err("Unable to find connection state".into())
@@ -126,13 +126,14 @@ pub(crate) async fn handle_logout_req(
}
pub(crate) async fn handle_login_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
auth_client: Arc<Mutex<AuthClient>>,
connection_service: Arc<ConnectionService>,
connection_id: String,
addr: SocketAddr,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_login_token_req::CliLoginTokenReq;
use crate::packets::srv_login_reply::{ServerInfo, SrvLoginReply};
debug!("decoding packet payload of size {}", packet.payload.as_slice().len());
let data = CliLoginTokenReq::decode(packet.payload.as_slice())?;
debug!("{:?}", data);
@@ -140,6 +141,7 @@ pub(crate) async fn handle_login_req(
let mut auth_client = auth_client.lock().await;
match auth_client.validate_session(&data.token.0).await {
Ok(response) => {
debug!("Response: {:?}", response);
if response.valid == false {
info!("Login failed: Invalid credentials");
@@ -150,14 +152,32 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
} else {
debug!("Successfully logged in");
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
debug!("Response: {:?}", response);
state.user_id = Some(response.user_id);
state.session_id = Some(response.session_id);
state.session_id = Some(response.session_id.clone());
}
let chat_url = format!(
"http://{}",
get_kube_service_endpoints_by_dns("chat-service", "tcp", "chat-service")
.await
.expect("Failed to get chat service endpoints")
.get(0)
.unwrap()
);
let handler = ChatClientHandler::new(chat_url, connection_id.clone(), response.session_id.clone()).await?;
let chat_handler = Arc::new(handler);
create_chat_client_handler(stream.clone(), chat_handler.clone()).await?;
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
state.chat_handler = Some(chat_handler);
}
let mut id = 0;
@@ -196,7 +216,8 @@ pub(crate) async fn handle_login_req(
servers_info: server_info,
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
Err(err) => {
@@ -208,7 +229,8 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
return Ok(());
}
}
@@ -227,7 +249,8 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Code::Unavailable => {
warn!("Login failed: Service is unavailable");
@@ -238,7 +261,8 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
_ => {
error!("Unexpected error: {}", tonic_status.message());
@@ -249,7 +273,8 @@ pub(crate) async fn handle_login_req(
servers_info: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
}
@@ -260,11 +285,13 @@ pub(crate) async fn handle_login_req(
}
pub(crate) async fn handle_server_select_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_srv_select_req::CliSrvSelectReq;
use crate::packets::srv_srv_select_reply::SrvSrvSelectReply;
let request = CliSrvSelectReq::decode(packet.payload.as_slice())?;
debug!("{:?}", request);
@@ -286,14 +313,17 @@ pub(crate) async fn handle_server_select_req(
};
let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_channel_list_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_channel_list_req::CliChannelListReq;
use crate::packets::srv_channel_list_reply::{ChannelInfo, SrvChannelListReply};
let request = CliChannelListReq::decode(packet.payload.as_slice());
debug!("{:?}", request);
@@ -326,7 +356,8 @@ pub(crate) async fn handle_channel_list_req(
channels: channel_info,
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
}
Err(err) => {
@@ -335,7 +366,8 @@ pub(crate) async fn handle_channel_list_req(
channels: Vec::new(),
};
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
return Ok(());
}
}

View File

@@ -17,7 +17,7 @@ use tonic::{Code, Status};
use tracing::{debug, error, info, warn};
use utils::null_string::NullTerminatedString;
fn string_to_u32(s: &str) -> u32 {
pub(crate) fn string_to_u32(s: &str) -> u32 {
let mut hasher = DefaultHasher::new();
s.hash(&mut hasher);
// Convert the 64-bit hash to a 32-bit number.
@@ -68,7 +68,7 @@ pub(crate) fn convert_type_to_body_part(slot: i32) -> ItemType {
}
pub(crate) async fn handle_char_list_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -91,6 +91,8 @@ pub(crate) async fn handle_char_list_req(
let character_list = character_client.get_character_list(&user_id).await?;
let mut characters = vec![];
let mut character_id_list: Vec<u32> = Vec::new();
// Build the visible inventory
for character in character_list.characters {
let mut item_list: [EquippedItem; (MAX_VISIBLE_ITEMS as usize)] =
core::array::from_fn(|i| EquippedItem::default());
@@ -128,13 +130,14 @@ pub(crate) async fn handle_char_list_req(
let data = SrvCharListReply { characters };
let response_packet = Packet::new(PacketType::PakccCharListReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_create_char_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -176,13 +179,14 @@ pub(crate) async fn handle_create_char_req(
let data = SrvCreateCharReply { result, platininum: 0 };
let response_packet = Packet::new(PacketType::PakccCreateCharReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_delete_char_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -218,13 +222,14 @@ pub(crate) async fn handle_delete_char_req(
name: character_name,
};
let response_packet = Packet::new(PacketType::PakccDeleteCharReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_select_char_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -255,7 +260,10 @@ pub(crate) async fn handle_select_char_req(
ip: NullTerminatedString("".to_string()),
};
let response_packet = Packet::new(PacketType::PakccSwitchServer, &data)?;
send_packet(stream, &response_packet).await?;
{
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
let mut character_client = character_client.lock().await;
let character_data = character_client
@@ -275,12 +283,14 @@ pub(crate) async fn handle_select_char_req(
core::array::from_fn(|i| EquippedItem::default());
let mut inventory: [srv_inventory_data::Item; (MAX_ITEMS as usize)] =
core::array::from_fn(|i| srv_inventory_data::Item::default());
// Build the character learned skill list
let mut skill_list: [u16; (MAX_SKILL_COUNT as usize)] = [0u16; MAX_SKILL_COUNT as usize];
for index in 0..skills.len() {
skill_list[index] = skills[index].id as u16;
}
// Build the character inventory list
for item in items {
if item.slot < MAX_VISIBLE_ITEMS as i32 {
let slot = convert_type_to_body_part(item.slot) as isize - 2;
@@ -318,8 +328,8 @@ pub(crate) async fn handle_select_char_req(
let data = SrvSelectCharReply {
race: looks.race as u8,
map: position.map_id as u16,
x: position.x * 100.0,
y: position.y * 100.0,
x: position.x,
y: position.y,
spawn: position.spawn_id as u16,
body_face: looks.face as u32,
body_hair: looks.hair as u32,
@@ -362,7 +372,10 @@ pub(crate) async fn handle_select_char_req(
name,
};
let response_packet = Packet::new(PacketType::PakwcSelectCharReply, &data)?;
send_packet(stream, &response_packet).await?;
{
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
// here we build the inventory
let data = SrvInventoryData {
@@ -370,7 +383,10 @@ pub(crate) async fn handle_select_char_req(
items: inventory,
};
let response_packet = Packet::new(PacketType::PakwcInventoryData, &data)?;
send_packet(stream, &response_packet).await?;
{
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
// Now we need to build the Quest data
let mut quests: [srv_quest_data::Quest; (MAX_QUESTS as usize)] =
@@ -388,15 +404,21 @@ pub(crate) async fn handle_select_char_req(
wishlist,
};
let response_packet = Packet::new(PacketType::PakwcQuestData, &data)?;
send_packet(stream, &response_packet).await?;
// Send the billing message (we don't actually use this so we just send the defaults to allow)
{
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
// Send the billing message (we don't use this, so we just send the defaults to allow)
let data = SrvBillingMessage {
function_type: 0x1001,
pay_flag: 2,
};
let response_packet = Packet::new(PacketType::PakwcBillingMessage, &data)?;
send_packet(stream, &response_packet).await?;
{
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
}
Ok(())
}

View File

@@ -0,0 +1,136 @@
use crate::character_client::CharacterClient;
use crate::connection_service::ConnectionService;
use crate::packet::{send_packet, Packet, PacketPayload};
use crate::packet_type::PacketType;
use chrono::{Local, Timelike};
use std::error::Error;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::{debug, error};
use utils::null_string::NullTerminatedString;
use utils::service_discovery::get_kube_service_endpoints_by_dns;
use crate::handlers::chat_client::chat::{ChatMessage, MessageType};
use crate::handlers::chat_client::ChatClientHandler;
pub async fn create_chat_client_handler(
stream_for_task: Arc<Mutex<TcpStream>>,
task_chat_handler: Arc<ChatClientHandler>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::srv_normal_chat::SrvNormalChat;
use crate::packets::srv_shout_chat::SrvShoutChat;
use crate::packets::srv_party_chat::SrvPartyChat;
use crate::packets::srv_whisper_chat::SrvWhisperChat;
use crate::packets::srv_clan_chat::SrvClanChat;
use crate::packets::srv_allied_chat::SrvAlliedChat;
tokio::spawn({
async move {
debug!("Spawning chat handler task");
loop {
let mut rx = task_chat_handler.inbound_rx.lock().await;
while let Some(chat_msg) = rx.recv().await {
debug!("Packet-Service received chat message: {} (client_id: {}, type {})", chat_msg.message, chat_msg.client_id, chat_msg.r#type);
debug!("Locking stream");
let mut locked_stream = stream_for_task.lock().await;
debug!("Locked stream");
match chat_msg.r#type {
1 => {
// Normal Chat
let data = SrvNormalChat {
char_id: chat_msg.client_id.parse().unwrap_or(696),
message: NullTerminatedString(chat_msg.message),
};
// Send the packet to the client
let response_packet = Packet::new(PacketType::PakwcNormalChat, &data);
debug!("Attempting to send normal chat to client");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send normal chat: {:?}", e);
}
}
2 => {
// Shout Chat
let data = SrvShoutChat {
sender: Default::default(),
message: NullTerminatedString(chat_msg.message),
};
let response_packet = Packet::new(PacketType::PakwcShoutChat, &data);
debug!("Attempting to send shout chat to client");
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send shout chat: {:?}", e);
}
}
3 => {
// Party Chat
}
4 => {
// Whisper Chat
}
5 => {
// Clan Chat
}
6 => {
// Allied Chat
}
_ => {
// Normal Chat
let data = SrvNormalChat {
char_id: 0,
message: NullTerminatedString(chat_msg.message),
};
// Send the packet to the client
let response_packet = Packet::new(PacketType::PakwcNormalChat, &data);
if let Err(e) = send_packet(&mut locked_stream, &response_packet.unwrap()).await
{
error!("unable to send normal chat: {:?}", e);
}
}
}
}
}
debug!("Chat handler task exiting");
}
});
Ok(())
}
pub(crate) async fn handle_normal_chat(
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::packets::cli_normal_chat::*;
use crate::packets::srv_normal_chat::*;
let request = CliNormalChat::decode(packet.payload.as_slice())?;
debug!("{:?}", request);
if let Some(mut state) = connection_service.get_connection(&connection_id) {
let user_id = state.user_id.clone().expect("Missing user id in connection state");
let message = ChatMessage {
client_id: crate::handlers::character::string_to_u32(&user_id).to_string(),
r#type: MessageType::Normal as i32,
message: request.message.clone().0,
target_id: "".to_string(),
};
state.chat_handler.unwrap().send_message(message).await;
}
// We're not sending here because we should get a message back from the chat service
// let data = SrvNormalChat {
// char_id: 0,
// message: request.message,
// };
// let response_packet = Packet::new(PacketType::PakwcNormalChat, &data)?;
// let mut locked_stream = stream.lock().await;
// send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}

View File

@@ -0,0 +1,91 @@
use tonic::{Request, transport::Channel};
use futures::StreamExt;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use std::error::Error;
pub mod chat {
tonic::include_proto!("chat");
}
use chat::chat_service_client::ChatServiceClient;
use chat::ChatMessage;
use crate::interceptors::auth_interceptor::AuthInterceptor;
/// ChatClientHandler encapsulates the bidirectional chat stream.
/// In addition to providing an API to send messages, it also spawns a
/// background task which forwards incoming chat messages through an inbound channel.
pub struct ChatClientHandler {
outbound_tx: mpsc::Sender<ChatMessage>,
/// Inbound messages from the chat service are sent here.
pub inbound_rx: Mutex<mpsc::Receiver<ChatMessage>>,
}
impl ChatClientHandler {
/// Creates and returns a new ChatClientHandler.
///
/// * `chat_url` - Full URL of the Chat Service (for example, "http://127.0.0.1:50051")
/// * `client_id` - The authenticated client ID to be injected into each request.
/// * `session_id` - The authenticated session token to be injected into each request.
pub async fn new(
chat_url: String,
client_id: String,
session_id: String,
) -> Result<Self, Box<dyn Error + Send + Sync>> {
// Create a channel to the Chat Service.
let channel = Channel::from_shared(chat_url)?.connect().await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?;
let interceptor = AuthInterceptor { client_id, session_id };
// Create ChatService client with interceptor.
let mut chat_client = ChatServiceClient::with_interceptor(channel, interceptor);
// Create an mpsc channel for outbound messages.
let (out_tx, out_rx) = mpsc::channel(32);
let outbound_stream = ReceiverStream::new(out_rx);
// This channel will be used to forward inbound messages to the packet-service.
let (in_tx, in_rx) = mpsc::channel(32);
// Establish the bidirectional chat stream.
let request = Request::new(outbound_stream);
let mut response = chat_client.chat_stream(request).await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)?.into_inner();
// Spawn a task to continuously receive messages from the Chat Service.
// Each received message is sent through the 'in_tx' channel.
tokio::spawn(async move {
while let Some(result) = response.next().await {
match result {
Ok(chat_msg) => {
// You might translate or process the chat_msg here,
// then forward it to your packet-service logic.
if let Err(e) = in_tx.send(chat_msg).await {
eprintln!("Failed to forward chat message: {:?}", e);
break;
}
}
Err(e) => {
eprintln!("Error receiving chat stream message: {:?}", e);
break;
}
}
}
println!("Chat inbound stream closed");
});
Ok(Self {
outbound_tx: out_tx,
inbound_rx: Mutex::new(in_rx),
})
}
/// Sends a chat message to the Chat Service.
pub async fn send_message(
&self,
message: ChatMessage,
) -> Result<(), Box<dyn Error + Send + Sync>> {
self.outbound_tx.send(message).await?;
Ok(())
}
}

View File

@@ -1,3 +1,5 @@
pub mod auth;
pub mod character;
pub mod world;
pub mod chat;
pub mod chat_client;

View File

@@ -7,7 +7,9 @@ use std::error::Error;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tracing::debug;
use utils::service_discovery::get_kube_service_endpoints_by_dns;
fn distance(x1: f64, y1: f64, x2: f64, y2: f64) -> u16 {
let dist = ((x2 - x1).powi(2) + (y2 - y1).powi(2)).sqrt();
@@ -15,7 +17,7 @@ fn distance(x1: f64, y1: f64, x2: f64, y2: f64) -> u16 {
}
pub(crate) async fn handle_change_map_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
character_client: Arc<Mutex<CharacterClient>>,
connection_service: Arc<ConnectionService>,
@@ -64,13 +66,14 @@ pub(crate) async fn handle_change_map_req(
team_number: 10,
};
let response_packet = Packet::new(PacketType::PakwcChangeMapReply, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}
pub(crate) async fn handle_mouse_cmd_req(
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_service: Arc<ConnectionService>,
connection_id: String,
@@ -96,6 +99,7 @@ pub(crate) async fn handle_mouse_cmd_req(
z: request.z,
};
let response_packet = Packet::new(PacketType::PakwcMouseCmd, &data)?;
send_packet(stream, &response_packet).await?;
let mut locked_stream = stream.lock().await;
send_packet(&mut locked_stream, &response_packet).await?;
Ok(())
}

View File

@@ -0,0 +1,20 @@
use tonic::{Request, Status, service::Interceptor};
#[derive(Clone, Debug)]
pub struct AuthInterceptor {
pub client_id: String,
pub session_id: String,
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
// Attach the authenticated client ID into the metadata.
request
.metadata_mut()
.insert("x-client-id", self.client_id.parse().unwrap());
request
.metadata_mut()
.insert("x-session-id", self.session_id.parse().unwrap());
Ok(request)
}
}

View File

@@ -0,0 +1 @@
pub mod auth_interceptor;

View File

@@ -5,3 +5,7 @@ pub mod connection_state;
pub mod metrics;
pub mod packet;
pub mod packet_type;
pub mod handlers {
pub mod chat_client;
}
pub mod interceptors;

View File

@@ -39,18 +39,19 @@ mod packet_type;
mod packets;
mod router;
mod types;
mod interceptors;
pub mod common {
tonic::include_proto!("common");
}
pub mod auth {
tonic::include_proto!("auth"); // Path matches the package name in auth.proto
tonic::include_proto!("auth");
}
pub mod character_common {
tonic::include_proto!("character_common"); // Path matches the package name in auth.proto
tonic::include_proto!("character_common");
}
pub mod character {
tonic::include_proto!("character"); // Path matches the package name in auth.proto
tonic::include_proto!("character");
}
const BUFFER_POOL_SIZE: usize = 1000;
@@ -110,13 +111,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = buffer_pool.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
let stream = Arc::new(Mutex::new(socket));
// Spawn a new task for each connection
tokio::spawn(async move {
let _permit = permit;
let connection_id = packet_router.connection_service.add_connection();
if let Err(e) = packet_router
.handle_connection(&mut socket, pool, connection_id.clone())
.handle_connection(stream, pool, connection_id.clone())
.await
{
error!("Error handling connection: {}", e);

View File

@@ -23,24 +23,30 @@ pub struct PacketRouter {
impl PacketRouter {
pub async fn handle_connection(
&self,
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
pool: Arc<BufferPool>,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
ACTIVE_CONNECTIONS.inc();
while let Some(mut buffer) = pool.acquire().await {
// Read data into the buffer
let mut header_handle = stream.take(6);
let n = header_handle.read(&mut buffer).await?;
if n == 0 {
break; // Connection closed
}
let packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize;
if packet_size > 6 {
let mut body_handle = stream.take((packet_size - 6) as u64);
let n = body_handle.read(&mut buffer[6..]).await?;
if n == 0 {
break; // Connection closed
let packet_size: usize;
{
let mut locked_stream = stream.lock().await;
locked_stream.read_exact(&mut buffer[..6]).await?;
// let mut header_handle = locked_stream.take(6);
// let n = header_handle.read(&mut buffer).await?;
// if n == 0 {
// break; // Connection closed
// }
packet_size = u16::from_le_bytes(buffer[0..2].try_into()?) as usize;
if packet_size > 6 {
locked_stream.read_exact(&mut buffer[6..packet_size]).await?;
// let mut body_handle = locked_stream.take((packet_size - 6) as u64);
// let n = body_handle.read(&mut buffer[6..]).await?;
// if n == 0 {
// break; // Connection closed
// }
}
}
@@ -52,7 +58,7 @@ impl PacketRouter {
Ok(packet) => {
debug!("Parsed Packet: {:?}", packet);
// Handle the parsed packet (route it, process it, etc.)
self.route_packet(stream, packet, connection_id.clone()).await?;
self.route_packet(stream.clone(), packet, connection_id.clone()).await?;
}
Err(e) => warn!("Failed to parse packet: {}", e),
}
@@ -67,7 +73,8 @@ impl PacketRouter {
let mut auth_client = self.auth_client.lock().await;
auth_client.logout(&session_id).await?;
} else {
warn!("No session found for {}", stream.peer_addr()?);
let mut locked_stream = stream.lock().await;
warn!("No session found for {}", locked_stream.peer_addr()?);
}
}
ACTIVE_CONNECTIONS.dec();
@@ -77,7 +84,7 @@ impl PacketRouter {
#[rustfmt::skip]
pub async fn route_packet(
&self,
stream: &mut TcpStream,
stream: Arc<Mutex<TcpStream>>,
packet: Packet,
connection_id: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -88,7 +95,7 @@ impl PacketRouter {
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
// Login Packets
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await,
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
@@ -103,6 +110,8 @@ impl PacketRouter {
PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await,
// Chat Packets
PacketType::PakcsNormalChat => chat::handle_normal_chat(stream, packet, self.connection_service.clone(), connection_id).await,
// 1 => chat::handle_chat(packet).await?,
// 2 => movement::handle_movement(packet).await?,
_ => {