From e5c961d1b4f87928fefa7f586ceebf01c1ab442945e261bde8ce6401629ea899 Mon Sep 17 00:00:00 2001 From: raven <7156279+RavenX8@users.noreply.github.com> Date: Mon, 9 Dec 2024 23:10:26 -0500 Subject: [PATCH] - add: utils library - add: packet-service to handle game client packets - fix: health check for database-service - fix: health check for auth-service --- Cargo.toml | 3 +- auth-service/Cargo.toml | 3 +- auth-service/src/main.rs | 11 +- database-service/Cargo.toml | 1 + database-service/src/consul_registration.rs | 59 --- database-service/src/main.rs | 11 +- packet-service/Cargo.toml | 11 + packet-service/build.rs | 9 + packet-service/src/auth_client.rs | 24 + packet-service/src/bufferpool.rs | 31 ++ packet-service/src/dataconsts.rs | 72 +++ packet-service/src/enums.rs | 114 +++++ packet-service/src/handlers/auth.rs | 90 ++++ packet-service/src/handlers/mod.rs | 2 + packet-service/src/handlers/null_string.rs | 43 ++ packet-service/src/main.rs | 138 +++++- packet-service/src/metrics.rs | 22 + packet-service/src/packet.rs | 100 +++- packet-service/src/packet_type.rs | 455 ++++++++++++++++++ packet-service/src/router.rs | 28 +- packet-service/src/types.rs | 38 ++ utils/Cargo.toml | 9 + .../src/consul_registration.rs | 0 utils/src/lib.rs | 2 + .../src/service_discovery.rs | 20 +- 25 files changed, 1176 insertions(+), 120 deletions(-) delete mode 100644 database-service/src/consul_registration.rs create mode 100644 packet-service/build.rs create mode 100644 packet-service/src/auth_client.rs create mode 100644 packet-service/src/bufferpool.rs create mode 100644 packet-service/src/dataconsts.rs create mode 100644 packet-service/src/enums.rs create mode 100644 packet-service/src/handlers/auth.rs create mode 100644 packet-service/src/handlers/mod.rs create mode 100644 packet-service/src/handlers/null_string.rs create mode 100644 packet-service/src/metrics.rs create mode 100644 packet-service/src/packet_type.rs create mode 100644 packet-service/src/types.rs create mode 100644 utils/Cargo.toml rename {auth-service => utils}/src/consul_registration.rs (100%) create mode 100644 utils/src/lib.rs rename {auth-service => utils}/src/service_discovery.rs (69%) diff --git a/Cargo.toml b/Cargo.toml index 7bdb19e..3fd53b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "auth-service", "database-service", "packet-service", + "utils", ] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/auth-service/Cargo.toml b/auth-service/Cargo.toml index 80eaa2c..89cafbc 100644 --- a/auth-service/Cargo.toml +++ b/auth-service/Cargo.toml @@ -16,7 +16,7 @@ serde = { version = "1.0", features = ["derive"] } dotenv = "0.15" tracing = "0.1" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "chrono"] } -prost = "0.13.3" +prost = "0.13.4" prost-types = "0.13.3" chrono = { version = "0.4.38", features = ["serde"] } async-trait = "0.1.83" @@ -24,6 +24,7 @@ mockall = "0.13.1" rand = "0.8.5" warp = "0.3.7" reqwest = { version = "0.12.9", features = ["json"] } +utils = { path = "../utils" } [build-dependencies] tonic-build = "0.12.3" diff --git a/auth-service/src/main.rs b/auth-service/src/main.rs index 39767ad..c4ccdcf 100644 --- a/auth-service/src/main.rs +++ b/auth-service/src/main.rs @@ -1,4 +1,3 @@ -use crate::service_discovery::get_service_address; use auth_service::auth::auth_service_server::AuthServiceServer; use auth_service::database_client::DatabaseClient; use auth_service::database_client::DatabaseClientTrait; @@ -11,9 +10,8 @@ use tokio::{select, signal}; use tonic::transport::Server; use tracing::{info, Level}; use warp::Filter; - -mod consul_registration; -mod service_discovery; +use utils::consul_registration; +use utils::service_discovery::get_service_address; #[tokio::main] async fn main() -> Result<(), Box> { @@ -27,13 +25,14 @@ async fn main() -> Result<(), Box> { // Set the gRPC server address let addr = env::var("AUTH_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let port = env::var("AUTH_SERVICE_PORT").unwrap_or_else(|_| "50051".to_string()); + let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8081".to_string()); let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "auth-service".to_string()); let service_address = addr.as_str(); let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, service_port); - let health_check_endpoint_addr = format!("{}:8081", service_address); + let health_check_url = format!("http://{}:{}/health", service_address, health_port); + let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); let db_address = get_service_address(&consul_url, "database-service").await?; // Register service with Consul diff --git a/database-service/Cargo.toml b/database-service/Cargo.toml index 6c77e20..ce3a504 100644 --- a/database-service/Cargo.toml +++ b/database-service/Cargo.toml @@ -24,6 +24,7 @@ async-trait = "0.1.83" mockall = "0.13.1" reqwest = { version = "0.12.9", features = ["json"] } warp = "0.3.7" +utils = { path = "../utils" } [build-dependencies] tonic-build = "0.12.3" \ No newline at end of file diff --git a/database-service/src/consul_registration.rs b/database-service/src/consul_registration.rs deleted file mode 100644 index be77976..0000000 --- a/database-service/src/consul_registration.rs +++ /dev/null @@ -1,59 +0,0 @@ -use reqwest::Client; -use serde::Serialize; - -#[derive(Serialize)] -struct ConsulRegistration { - name: String, - address: String, - port: u16, - check: ConsulCheck, -} - -#[derive(Serialize)] -struct ConsulCheck { - http: String, - interval: String, -} - -pub async fn register_service( - consul_url: &str, - service_name: &str, - service_address: &str, - service_port: u16, - health_check_url: &str, -) -> Result<(), Box> { - let registration = ConsulRegistration { - name: service_name.to_string(), - address: service_address.to_string(), - port: service_port, - check: ConsulCheck { - http: health_check_url.to_string(), - interval: "10s".to_string(), // Health check interval - }, - }; - - let client = Client::new(); - let consul_register_url = format!("{}/v1/agent/service/register", consul_url); - - client - .put(&consul_register_url) - .json(®istration) - .send() - .await? - .error_for_status()?; // Ensure response is successful - - Ok(()) -} - -pub async fn deregister_service(consul_url: &str, service_name: &str) -> Result<(), Box> { - let client = Client::new(); - let consul_deregister_url = format!("{}/v1/agent/service/deregister/{}", consul_url, service_name); - - client - .put(&consul_deregister_url) - .send() - .await? - .error_for_status()?; // Ensure response is successful - - Ok(()) -} \ No newline at end of file diff --git a/database-service/src/main.rs b/database-service/src/main.rs index ead41dc..eceff8d 100644 --- a/database-service/src/main.rs +++ b/database-service/src/main.rs @@ -9,12 +9,11 @@ use std::env; use std::net::ToSocketAddrs; use std::str::FromStr; use std::sync::Arc; - use tokio::{select, signal}; +use tokio::{select, signal}; use tonic::transport::Server; use tracing::{info, Level}; use warp::Filter; - -mod consul_registration; +use utils::consul_registration; #[tokio::main] async fn main() -> Result<(), Box> { @@ -25,6 +24,8 @@ async fn main() -> Result<(), Box> { let addr = env::var("DATABASE_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); let port = env::var("DATABASE_SERVICE_PORT").unwrap_or_else(|_| "50052".to_string()); + let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8080".to_string()); + let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); @@ -32,8 +33,8 @@ async fn main() -> Result<(), Box> { let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "database-service".to_string()); let service_address = addr.as_str(); let service_port = port.clone(); - let health_check_url = format!("http://{}:{}/health", service_address, service_port); - let health_check_endpoint_addr = format!("{}:8080", service_address); + let health_check_url = format!("http://{}:{}/health", service_address, health_port); + let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); // Register service with Consul consul_registration::register_service( diff --git a/packet-service/Cargo.toml b/packet-service/Cargo.toml index 19c595c..eb58931 100644 --- a/packet-service/Cargo.toml +++ b/packet-service/Cargo.toml @@ -14,4 +14,15 @@ serde = { version = "1.0", features = ["derive"] } bytes = { version = "1.8.0", features = ["std", "serde"] } tracing = "0.1" tracing-subscriber = "0.3.18" +bincode = { version = "2.0.0-rc.3", features = ["derive", "serde"] } +thiserror = "2.0.3" +lazy_static = "1.5.0" +prometheus = "0.13.4" +hyper = { version = "1.5.1", features = ["server"] } +tonic = "0.12.3" +prost = "0.13.4" +utils = { path = "../utils" } +warp = "0.3.7" +[build-dependencies] +tonic-build = "0.12.3" diff --git a/packet-service/build.rs b/packet-service/build.rs new file mode 100644 index 0000000..4123838 --- /dev/null +++ b/packet-service/build.rs @@ -0,0 +1,9 @@ +fn main() { + // gRPC Client code + tonic_build::configure() + .build_server(false) // Generate gRPC client code + .compile_well_known_types(true) + .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .compile_protos(&["../proto/auth.proto"], &["../proto"]) + .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); +} diff --git a/packet-service/src/auth_client.rs b/packet-service/src/auth_client.rs new file mode 100644 index 0000000..8838ac5 --- /dev/null +++ b/packet-service/src/auth_client.rs @@ -0,0 +1,24 @@ +use crate::auth::auth_service_client::AuthServiceClient; +use crate::auth::{LoginRequest, LoginResponse}; +use tonic::transport::Channel; + +pub struct AuthClient { + client: AuthServiceClient, +} + +impl AuthClient { + pub async fn connect(endpoint: &str) -> Result> { + let client = AuthServiceClient::connect(endpoint.to_string()).await?; + Ok(AuthClient { client }) + } + + pub async fn login(&mut self, username: &str, password: &str) -> Result> { + let request = LoginRequest { + username: username.to_string(), + password: password.to_string(), + }; + + let response = self.client.login(request).await?; + Ok(response.into_inner()) + } +} diff --git a/packet-service/src/bufferpool.rs b/packet-service/src/bufferpool.rs new file mode 100644 index 0000000..c4d7420 --- /dev/null +++ b/packet-service/src/bufferpool.rs @@ -0,0 +1,31 @@ +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::{Semaphore, Mutex}; + +const MAX_PACKET_SIZE: usize = 1024; + +pub struct BufferPool { + buffers: Mutex>>, + sem: Semaphore, +} + +impl BufferPool { + pub fn new(pool_size: usize) -> Arc { + Arc::new(Self { + buffers: Mutex::new((0..pool_size).map(|_| vec![0u8; MAX_PACKET_SIZE]).collect()), + sem: Semaphore::new(pool_size), + }) + } + + pub async fn acquire(&self) -> Option> { + let _ = self.sem.acquire().await.ok()?; + let mut buffers = self.buffers.lock().await; + buffers.pop_front() + } + + pub async fn release(&self, buffer: Vec) { + let mut buffers = self.buffers.lock().await; + buffers.push_back(buffer); + self.sem.add_permits(1); + } +} diff --git a/packet-service/src/dataconsts.rs b/packet-service/src/dataconsts.rs new file mode 100644 index 0000000..2f76731 --- /dev/null +++ b/packet-service/src/dataconsts.rs @@ -0,0 +1,72 @@ +use crate::enums::{EquippedPosition, BulletType, RidingItem}; +pub(crate) const MIN_SELL_TYPE: u32 = 1; +pub(crate) const MAX_SELL_TYPE: u32 = 11; + +pub(crate) const MAX_STAT: u32 = 300; +pub(crate) const MAX_STACK: u32 = 999; + +pub(crate) const MAX_UNION_COUNT: u32 = 10; +pub(crate) const MAX_BUFF_STATUS: u32 = 40; +pub(crate) const MAX_SKILL_COUNT: u32 = 120; +pub(crate) const MAX_HOTBAR_ITEMS: u32 = 32; + +pub(crate) const MAX_DAMAGE: u32 = 99_999_999; +pub(crate) const WALK_SPEED: u32 = 200; +pub(crate) const BASE_MOVE_SPEED: u32 = 425; + +pub(crate) const DAMAGE_ACTION_IMMEDIATE: u32 = 0x02; +pub(crate) const DAMAGE_ACTION_HIT: u32 = 0x04; +pub(crate) const DAMAGE_ACTION_CRITICAL: u32 = 0x08; +pub(crate) const DAMAGE_ACTION_DEAD: u32 = 0x10; + +pub(crate) const MAX_CONDITIONS_EPISODE: u32 = 5; +pub(crate) const MAX_CONDITIONS_JOB: u32 = 3; +pub(crate) const MAX_CONDITIONS_PLANET: u32 = 7; +pub(crate) const MAX_CONDITIONS_UNION: u32 = 10; +pub(crate) const MAX_QUESTS: u32 = 10; +pub(crate) const MAX_SWITCHES: u32 = 16; + +pub(crate) const MAX_QUEST_SWITCHES: u32 = 32; +pub(crate) const MAX_QUEST_VARS: u32 = 10; +pub(crate) const MAX_QUEST_ITEMS: u32 = 6; + +pub(crate) const TAB_SIZE: u8 = 30; + +pub(crate) const DROP_RANGE: f32 = 50.0; +pub(crate) const MAX_VISIBLE_ITEMS: u32 = 8; +pub(crate) const MAX_INVENTORY: u32 = 120; + +// Assuming BulletType, RidingItem, and EquippedPosition enums are already defined as shown previously +pub(crate) const MAX_ITEMS: u32 = MAX_INVENTORY + + BulletType::MaxBulletTypes as u32 + + RidingItem::MaxRidingItems as u32 + + EquippedPosition::MaxEquipItems as u32; + +pub(crate) const FIRST_BULLET_SLOT: u32 = MAX_INVENTORY + EquippedPosition::MaxEquipItems as u32; + +pub(crate) const MAX_STATUS_EFFECTS: u32 = 40; + +pub(crate) const MAX_WISHLIST: u32 = 30; + +// Classes +pub(crate) const CLASS_VISITOR: u32 = 0; +pub(crate) const CLASS_SOLDIER_111: u32 = 111; +pub(crate) const CLASS_SOLDIER_121: u32 = 121; +pub(crate) const CLASS_SOLDIER_122: u32 = 122; +pub(crate) const CLASS_SOLDIER_131: u32 = 131; +pub(crate) const CLASS_SOLDIER_132: u32 = 132; +pub(crate) const CLASS_MAGICIAN_211: u32 = 211; +pub(crate) const CLASS_MAGICIAN_221: u32 = 221; +pub(crate) const CLASS_MAGICIAN_222: u32 = 222; +pub(crate) const CLASS_MAGICIAN_231: u32 = 231; +pub(crate) const CLASS_MAGICIAN_232: u32 = 232; +pub(crate) const CLASS_MIXER_311: u32 = 311; +pub(crate) const CLASS_MIXER_321: u32 = 321; +pub(crate) const CLASS_MIXER_322: u32 = 322; +pub(crate) const CLASS_MIXER_331: u32 = 331; +pub(crate) const CLASS_MIXER_332: u32 = 332; +pub(crate) const CLASS_MERCHANT_411: u32 = 411; +pub(crate) const CLASS_MERCHANT_421: u32 = 421; +pub(crate) const CLASS_MERCHANT_422: u32 = 422; +pub(crate) const CLASS_MERCHANT_431: u32 = 431; +pub(crate) const CLASS_MERCHANT_432: u32 = 432; diff --git a/packet-service/src/enums.rs b/packet-service/src/enums.rs new file mode 100644 index 0000000..c519f20 --- /dev/null +++ b/packet-service/src/enums.rs @@ -0,0 +1,114 @@ +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum BulletType { + Arrow = 0, + Bullet = 1, + Throw = 2, + MaxBulletTypes, +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum RidingItem { + Body = 0, + Engine = 1, + Legs, + Option, // weapon or back seat + Arms, + MaxRidingItems, +} + +#[repr(u16)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ItemSubType { + Rring = 171, + Nnecklace, + Earring, + OneHSword = 211, + OneHBlunt, + TwoHSword = 221, + Spear = 222, + TwoHAxe = 223, + Bow = 231, + Gun, + Launcher, + Staff = 241, + Wand, + Katar = 251, + DualWield, + Xbow = 271, +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum EquippedPosition { + Goggles = 1, + Helmet = 2, + Armor, + Backpack, + Gauntlet, + Boots, + WeaponR, + WeaponL, + Necklace, + Ring, + Earing, + MaxEquipItems, +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum MoveMode { + Walk = 0, + Run = 1, + Drive = 2, + RideOn = 4, +} + +#[repr(u16)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum Command { + Stop = 0, + Move = 1, + Attack = 2, + Die = 3, + Pickup = 4, + Skill2Self = 6, + Skill2Obj = 7, + Skill2Pos = 8, + Runaway = 0x8009, + Sit = 10, +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ItemType { + None = 0, + ItemGoggles = 1, + ItemHelmet = 2, + ItemArmor = 3, + ItemGauntlet = 4, + ItemBoots = 5, + ItemBackpack = 6, + ItemRing = 7, + ItemWeaponR = 8, + ItemWeaponL = 9, + ItemConsumable = 10, + ItemEtcGem = 11, + ItemEtc = 12, + ItemEtc2 = 13, + ItemRiding = 14, + Zuly = 0x1F, +} + +mod party_req { + #[repr(u8)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum Request { + Make = 0, + Join = 1, + Left, + ChangeOwner, + Kick = 0x81, + } +} diff --git a/packet-service/src/handlers/auth.rs b/packet-service/src/handlers/auth.rs new file mode 100644 index 0000000..9c03271 --- /dev/null +++ b/packet-service/src/handlers/auth.rs @@ -0,0 +1,90 @@ +use std::collections::HashMap; +use tonic::{Code, Status}; +use std::error::Error; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; +use crate::auth_client::AuthClient; +use crate::packet::{send_packet, Packet, PacketPayload}; +use crate::packet_type::PacketType; +use crate::packets::cli_accept_req::CliAcceptReq; +use crate::packets::cli_join_server_req::CliJoinServerReq; +use crate::packets::cli_login_req::CliLoginReq; +use crate::packets::{srv_accept_reply, srv_login_reply}; +use crate::packets::srv_accept_reply::SrvAcceptReply; +use crate::packets::srv_login_reply::SrvLoginReply; + +pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { + let data = CliAcceptReq::decode(packet.payload.as_slice()); + debug!("{:?}", data); + + // We need to do reply to this packet + let data = SrvAcceptReply { result: srv_accept_reply::Result::Accepted, rand_value: 0 }; + let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?; + + debug!("{:?}", response_packet); + send_packet(stream, &response_packet).await?; + Ok(()) +} + +pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { + let data = CliJoinServerReq::decode(packet.payload.as_slice()); + debug!("{:?}", data); + Ok(()) +} + +pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc>) -> Result<(), Box> { + debug!("decoding packet payload of size {}", packet.payload.as_slice().len()); + let data = CliLoginReq::decode(packet.payload.as_slice())?; + debug!("{:?}", data); + + let mut auth_client = auth_client.lock().await; + match auth_client.login(&data.username.0, &data.password.password).await { + Ok(response) => { + debug!("successfully logged in"); + let data = SrvLoginReply { result: srv_login_reply::Result::Ok, right: 0, type_: 0, servers_info: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + Err(status) => { + if let Some(tonic_status) = status.downcast_ref::() { + match tonic_status.code() { + Code::Unauthenticated => { + info!("Login failed: Invalid credentials"); + + let data = SrvLoginReply { result: srv_login_reply::Result::UnknownAccount, right: 0, type_: 0, servers_info: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + Code::Unavailable => { + warn!("Login failed: Service is unavailable"); + let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + _ => { + error!("Unexpected error: {}", tonic_status.message()); + let data = SrvLoginReply { result: srv_login_reply::Result::Failed, right: 0, type_: 0, servers_info: Vec::new() }; + let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?; + send_packet(stream, &response_packet).await?; + } + } + } + } + } + + Ok(()) +} + +pub(crate) async fn handle_server_select_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { + let data = CliJoinServerReq::decode(packet.payload.as_slice()); + debug!("{:?}", data); + Ok(()) +} + +pub(crate) async fn handle_channel_list_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box> { + let data = CliJoinServerReq::decode(packet.payload.as_slice()); + debug!("{:?}", data); + Ok(()) +} \ No newline at end of file diff --git a/packet-service/src/handlers/mod.rs b/packet-service/src/handlers/mod.rs new file mode 100644 index 0000000..3d88be8 --- /dev/null +++ b/packet-service/src/handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod auth; +pub mod null_string; \ No newline at end of file diff --git a/packet-service/src/handlers/null_string.rs b/packet-service/src/handlers/null_string.rs new file mode 100644 index 0000000..4d9420e --- /dev/null +++ b/packet-service/src/handlers/null_string.rs @@ -0,0 +1,43 @@ +use bincode::{Decode, Encode, de::Decoder, enc::Encoder, error::{DecodeError, EncodeError}}; +use std::str; +use bincode::de::read::Reader; +use bincode::enc::write::Writer; + +#[derive(Debug)] +pub struct NullTerminatedString(pub String); + +impl NullTerminatedString { + pub fn new(string: &str) -> Self { + NullTerminatedString(string.into()) + } +} + +impl Encode for NullTerminatedString { + fn encode(&self, encoder: &mut E) -> Result<(), EncodeError> { + let bytes = self.0.as_bytes(); + encoder.writer().write(bytes)?; // Write the string bytes + encoder.writer().write(&[0])?; // Add the null terminator + Ok(()) + } +} + +impl Decode for NullTerminatedString { + fn decode(decoder: &mut D) -> Result { + let mut buffer = Vec::new(); + let mut byte = [0u8; 1]; + + // Read until the null terminator + while decoder.reader().read(&mut byte).is_ok() { + if byte[0] == 0 { + break; // Null terminator found + } + buffer.push(byte[0]); + } + + let string = str::from_utf8(&buffer) + .map_err(|e| DecodeError::OtherString(format!("Invalid UTF-8: {}", e)))? + .to_string(); + + Ok(NullTerminatedString(string)) + } +} diff --git a/packet-service/src/main.rs b/packet-service/src/main.rs index 850b81c..d2f1f99 100644 --- a/packet-service/src/main.rs +++ b/packet-service/src/main.rs @@ -1,13 +1,67 @@ use std::env; +use std::error::Error; +use std::net::ToSocketAddrs; use std::str::FromStr; +use std::sync::Arc; use dotenv::dotenv; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt}; -use tracing::{info, error}; +use tokio::{select, signal}; +use tokio::sync::{Mutex, Semaphore}; +use tracing::{info, error, debug, warn}; use tracing::Level; +use utils::consul_registration; +use utils::service_discovery::get_service_address; +use warp::Filter; +use crate::auth_client::AuthClient; +use crate::bufferpool::BufferPool; +use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED}; +use crate::packet::Packet; +mod packet_type; mod packet; mod router; +mod packets; +mod enums; +mod dataconsts; +mod types; +mod handlers; +mod bufferpool; +mod metrics; +mod auth_client; +pub mod auth { + tonic::include_proto!("auth"); // Path matches the package name in auth.proto +} + +const BUFFER_POOL_SIZE: usize = 1000; +const MAX_CONCURRENT_CONNECTIONS: usize = 100; + + +async fn handle_connection(stream: &mut TcpStream, pool: Arc, auth_client: Arc>) -> Result<(), Box> { + ACTIVE_CONNECTIONS.inc(); + while let Some(mut buffer) = pool.acquire().await { + // Read data into the buffer + let n = stream.read(&mut buffer).await?; + if n == 0 { + break; // Connection closed + } + PACKETS_RECEIVED.inc(); + + // Process the packet + match Packet::from_raw(&buffer[..n]) { + Ok(packet) => { + debug!("Parsed Packet: {:?}", packet); + // Handle the parsed packet (route it, process it, etc.) + router::route_packet(stream, packet, auth_client.clone()).await?; + } + Err(e) => warn!("Failed to parse packet: {}", e), + } + + pool.release(buffer).await; + } + ACTIVE_CONNECTIONS.dec(); + Ok(()) +} #[tokio::main] async fn main() -> Result<(), Box> { @@ -16,25 +70,73 @@ async fn main() -> Result<(), Box> { .with_max_level(Level::from_str(&env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string())).unwrap_or_else(|_| Level::INFO)) .init(); - let listener = TcpListener::bind("127.0.0.1:4000").await?; - info!("Packet service listening on 127.0.0.1:4000"); + // Set the gRPC server address + let addr = env::var("PACKET_SERVICE_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string()); + let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string()); - loop { - let (mut socket, addr) = listener.accept().await?; - info!("New connection from {}", addr); + let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); + let service_name = env::var("SERVICE_NAME").unwrap_or_else(|_| "packet-service".to_string()); + let service_address = addr.as_str(); + let service_port = port.clone(); + let health_check_url = format!("http://{}:{}/health", service_address, health_port); + let health_check_endpoint_addr = format!("{}:{}", service_address, health_port); + let auth_address = get_service_address(&consul_url, "auth-service").await?; - tokio::spawn(async move { - let mut buffer = vec![0u8; 1024]; + // Register service with Consul + consul_registration::register_service( + &consul_url, + service_name.as_str(), + service_address, + service_port.parse().unwrap_or(50052), + &health_check_url, + ) + .await?; - match socket.read(&mut buffer).await { - Ok(n) if n > 0 => { - if let Err(e) = router::route_packet(&buffer[..n]).await { - error!("Failed to route packet: {}", e); - } + // Start health-check endpoint + let health_route = warp::path!("health") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + + tokio::spawn(warp::serve(health_route).run(health_check_endpoint_addr.to_socket_addrs()?.next().unwrap())); + + let auth_url = format!("http://{}:{}", auth_address.Address, auth_address.Port); + let auth_client = Arc::new(Mutex::new(AuthClient::connect(&auth_url).await?)); + + let full_addr = format!("{}:{}", &addr, port); + // let address = full_addr.parse().expect("Invalid address"); + + + + tokio::spawn(async move { + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS)); + let listener = TcpListener::bind(full_addr.clone()).await.unwrap(); + let buffer_pool = BufferPool::new(BUFFER_POOL_SIZE); + + info!("Packet service listening on {}", full_addr); + + loop { + let (mut socket, addr) = listener.accept().await.unwrap(); + let auth_client = auth_client.clone(); + info!("New connection from {}", addr); + + let pool = buffer_pool.clone(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + + // Spawn a new task for each connection + tokio::spawn(async move { + let _permit = permit; + if let Err(e) = handle_connection(&mut socket, pool, auth_client).await { + error!("Error handling connection: {}", e); } - Ok(_) => info!("Connection closed by {}", addr), - Err(e) => error!("Failed to read from socket: {}", e), - } - }); + }); + } + }); + + select! { + _ = signal::ctrl_c() => {}, } + + consul_registration::deregister_service(&consul_url, service_name.as_str()).await.expect(""); + info!("service {} deregistered", service_name); + Ok(()) } diff --git a/packet-service/src/metrics.rs b/packet-service/src/metrics.rs new file mode 100644 index 0000000..5ea483c --- /dev/null +++ b/packet-service/src/metrics.rs @@ -0,0 +1,22 @@ +use prometheus::{Encoder, TextEncoder, Counter, Gauge, Histogram, register_counter, register_gauge, register_histogram}; +use lazy_static::lazy_static; + +lazy_static! { + // Counter to track the number of packets received + pub static ref PACKETS_RECEIVED: Counter = register_counter!( + "packets_received_total", + "Total number of packets received" + ).unwrap(); + + // Gauge to track the number of active connections + pub static ref ACTIVE_CONNECTIONS: Gauge = register_gauge!( + "active_connections", + "Number of currently active connections" + ).unwrap(); + + // Histogram to track packet processing latency + pub static ref PACKET_PROCESSING_TIME: Histogram = register_histogram!( + "packet_processing_time_seconds", + "Histogram of packet processing times" + ).unwrap(); +} diff --git a/packet-service/src/packet.rs b/packet-service/src/packet.rs index 59ffe7e..e1ef946 100644 --- a/packet-service/src/packet.rs +++ b/packet-service/src/packet.rs @@ -1,31 +1,103 @@ -use serde::{Serialize, Deserialize}; +use bincode::{Encode, Decode}; use std::error::Error; -use tracing::debug; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use tracing::{debug, error}; +use crate::packet_type::PacketType; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub struct Packet { pub packet_size: u16, - pub packet_type: u16, + pub packet_type: PacketType, pub packet_crc: u16, pub payload: Vec, } +pub trait PacketPayload: Encode + Decode { + fn encode(&self) -> Result, Box> { + let config = bincode::config::standard() + .with_fixed_int_encoding(); + Ok(bincode::encode_to_vec(self, config)?) + } + + fn decode(data: &[u8]) -> Result> + where + Self: Sized, + { + let config = bincode::config::standard() + .with_fixed_int_encoding(); + Ok(bincode::decode_from_slice(data, config)?.0) + } +} + impl Packet { - pub fn parse(data: &[u8]) -> Result> { + pub fn new(packet_type: PacketType, payload: &T) -> Result> { + let encoded_payload = ::encode(payload)?; + let packet_size = (6 + encoded_payload.len()) as u16; + // let packet_crc = crc::crc16::checksum_x25(&encoded_payload); + let packet_crc = 0; + + Ok(Self { + packet_size, + packet_type, + packet_crc, + payload: encoded_payload, + }) + } + + pub fn from_raw(data: &[u8]) -> Result> { if data.len() < 6 { return Err("Invalid packet: Too short".into()); } - let packet_size = u16::from_be_bytes(data[0..2].try_into()?); - let packet_type = u16::from_be_bytes(data[2..4].try_into()?); - let packet_crc = u16::from_be_bytes(data[4..6].try_into()?); + // Extract packet fields + let packet_size = u16::from_le_bytes(data[0..2].try_into()?); + let raw_packet_type = u16::from_le_bytes(data[2..4].try_into()?); + let packet_crc = u16::from_le_bytes(data[4..6].try_into()?); + + // Validate the size + if packet_size as usize != data.len() { + error!("Invalid packet: Size mismatch, expected: {}, actual: {}", packet_size, data.len()); + return Err("Invalid packet: Size mismatch".into()); + } + + debug!("size: {:#X}, raw_type: {:#X}, crc: {:#X}", packet_size, raw_packet_type, packet_crc); + + // Convert raw packet type into `PacketType` enum + let packet_type = PacketType::try_from(raw_packet_type)?; + + // Extract the payload let payload = data[6..].to_vec(); - debug!("Packet Size: {}", packet_size); - debug!("Packet Type: {}", packet_type); - debug!("Packet CRC: {}", packet_crc); - debug!("Payload: {:?}", String::from_utf8(payload.clone())?); - - Ok(Packet { packet_size, packet_type, packet_crc, payload }) + Ok(Self { + packet_size, + packet_type, + packet_crc, + payload, + }) + } + + /// Serialize the `Packet` back to raw bytes + pub fn to_raw(&self) -> Vec { + let mut raw = Vec::with_capacity(self.packet_size as usize); + + // Serialize fields + raw.extend(&self.packet_size.to_le_bytes()); + raw.extend(&(self.packet_type as u16).to_le_bytes()); + raw.extend(&self.packet_crc.to_le_bytes()); + raw.extend(&self.payload); + + raw + } + + pub fn parse(&self) -> Result> { + ::decode(&self.payload) } } + +pub async fn send_packet(stream: &mut TcpStream, packet: &Packet) -> Result<(), std::io::Error> { + let data = packet.to_raw(); + debug!("Sending data: {:?}", data); + stream.write_all(&data).await?; + Ok(()) +} diff --git a/packet-service/src/packet_type.rs b/packet-service/src/packet_type.rs new file mode 100644 index 0000000..b875371 --- /dev/null +++ b/packet-service/src/packet_type.rs @@ -0,0 +1,455 @@ +use std::convert::TryFrom; +use thiserror::Error; + +#[repr(u16)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PacketType { + PakcsAlive = 0x0500, + PakssError = 0x0501, + PakssAnnounceText = 0x0502, + PakswAnnounceChat = 0x0503, + PakcsAcceptReq = 0x0504, + PakcsChannelListReq = 0x0505, + PaklcChannelListReply = 0x0506, + PakcsLogoutReq = 0x0507, + PakwcLogoutReply = 0x0508, + PakcsLoginReq = 0x0509, + PaklcLoginReply = 0x050A, + PakgcLoginReply = 0x050B, + PakcsSrvSelectReq = 0x050C, + PaklcSrvSelectReply = 0x050D, + PakcsJoinServerReq = 0x050E, + PakscJoinServerReply = 0x050F, + PakwcGmCommand = 0x0510, + PakwcGlobalVars = 0x0511, + PakwcGlobalFlags = 0x0512, + PakccSwitchServer = 0x0513, + PakcsCharListReq = 0x0514, + PakccCharListReply = 0x0515, + PakcsCreateCharReq = 0x0516, + PakccCreateCharReply = 0x0517, + PakcsDeleteCharReq = 0x0518, + PakccDeleteCharReply = 0x0519, + PakcsSelectCharReq = 0x051A, + PakwcSelectCharReply = 0x051B, + PakwcInventoryData = 0x051C, + PakwcSetMoneyAndItem = 0x051D, + PakwcSetItem = 0x051E, + PakwcServerData = 0x051F, + PakwcQuestData = 0x0520, + PakcsChangeCharReq = 0x0521, + PakccChanCharReply = 0x0522, + PakwcSetMoney = 0x0523, + PakwcQuestRewardMoney = 0x0524, + PakwcQuestRewardItem = 0x0525, + PakwcQuestRewardAddValue = 0x0526, + PakwcQuestRewardSetValue = 0x0527, + PakcsCancelLogout = 0x0528, + PakwcQuestUpdate = 0x0529, + PakwcWishList = 0x052A, + PakcsQuestDataReq = 0x052B, + PakwcQuestDataReply = 0x052C, + PakwcNpcEvent = 0x052D, + PakwcGmCommandCode = 0x052E, + PakcsChangeMapReq = 0x052F, + PakwcChangeMapReply = 0x0530, + PakwcInitData = 0x0531, + PakcsReviveReq = 0x0532, + PakwcReviveReply = 0x0533, + PakcsReviveSetPos = 0x0534, + PakcsSetServerVarReq = 0x0535, + PakwcSetServerVarReply = 0x0536, + PakcsCharInfoReq = 0x0537, + PakwcCharInfoReply = 0x0538, + PakcsSetWeightReq = 0x0539, + PakwcSetWeight = 0x053A, + PakwcSetPosition = 0x053B, + PakcsStopMoving = 0x053C, + PakwcStopMoving = 0x053D, + PakwcUpdateNpc = 0x053E, + PakcsSummonCmd = 0x053F, + PakwcSummonCmd = 0x0540, + PakcsSetAnimation = 0x0541, + PacwcSetAnimation = 0x0542, + PakcsToggleMove = 0x0543, + PakwcToggleMove = 0x0544, + PakcsNormalChat = 0x0545, + PakwcNormalChat = 0x0546, + PakcsWhisperChat = 0x0547, + PakwcWhisperChat = 0x0548, + PakcsShoutChat = 0x0549, + PakwcShoutChat = 0x054A, + PakcsPartyChat = 0x054B, + PakwcPartyChat = 0x054C, + PakcsClanChat = 0x054D, + PakwcClanChat = 0x054E, + PakcsAlliedChat = 0x054F, + PakwcAlliedChat = 0x0550, + PakcsAlliedShoutChat = 0x0551, + PakwcAlliedShoutChat = 0x0552, + PakwcEventStatus = 0x0553, + PakwcNpcChar = 0x0554, + PakwcMobChar = 0x0555, + PakwcPlayerChar = 0x0556, + PakwcRemoveObject = 0x0557, + PakcsSetPosition = 0x0558, + PakcsStop = 0x0559, + PakwcStop = 0x055A, + PakwcMove = 0x055B, + PakcsAttack = 0x055C, + PakwcAttack = 0x055D, + PakcsDamage = 0x055E, + PakwcDamage = 0x055F, + PakcsMouseCmd = 0x0560, + PakwcMouseCmd = 0x0561, + PakwcSetExp = 0x0562, + PakwcLevelup = 0x0563, + PakcsHpReq = 0x0564, + PakwcHpReply = 0x0565, + PakwcSetHpAndMp = 0x0566, + PakcsStoreTradeReq = 0x0567, + PakwcStoreTradeReply = 0x0568, + PakcsUseItem = 0x0569, + PakwcUseItem = 0x056A, + PakcsDropItem = 0x056B, + PakcsEquipItem = 0x056C, + PakwcEquipItem = 0x056D, + PakwcDropItem = 0x056E, + PakcsPickupItemReq = 0x056F, + PakwcPickupItemReply = 0x0570, + PakcsTeleportReq = 0x0571, + PakwcTeleportReply = 0x0572, + PakcsStatAddReq = 0x0573, + PakwcStatAddReply = 0x0574, + PakcsHotbarSetIconReq = 0x0575, + PakwcHotbarSetIconReply = 0x0576, + PakcsEquipProjectile = 0x0577, + PakwcEquipProjectile = 0x0578, + PakwcChangeSkin = 0x0579, + PakcsBankListReq = 0x057A, + PakwcBankListReply = 0x057B, + PakcsBankMoveItem = 0x057C, + PakwcBankMoveItem = 0x057D, + PakcsCraftReq = 0x057E, + PakwcCraftReply = 0x057F, + PakwcSkillLearn = 0x0580, + PakcsSkillLevelReq = 0x0581, + PakwcSkillLevelReply = 0x0582, + PakcsSkillCastSelf = 0x0583, + PakwcSkillCastSelf = 0x0584, + PakcsSkillCastTarget = 0x0585, + PakwcSkillCastTarget = 0x0586, + PakcsSkillCastPosition = 0x0587, + PakwcSkillCastPosition = 0x0588, + PakwcSkillEffect = 0x0589, + PakwcSkillDamage = 0x058A, + PakwcClearStatus = 0x058B, + PakwcSpeedChanged = 0x058C, + PakwcSkillFinish = 0x058D, + PakcsAppraisalReq = 0x058E, + PakwcAppraisalReply = 0x058F, + PakwcSkillStart = 0x0590, + PakcsCraftEnhanceReq = 0x0591, + PakwcCraftEnhanceReply = 0x0592, + PakwcSkillCancel = 0x0593, + PakcsWishlistAdd = 0x0594, + PakcsTrade = 0x0595, + PakwcTrade = 0x0596, + PakcsTradeItem = 0x0597, + PakwcTradeItem = 0x0598, + PakcsShopOpen = 0x0599, + PakwcShopOpen = 0x059A, + PakcsShopClose = 0x059B, + PakwcShopClose = 0x059C, + PakcsShopListReq = 0x059D, + PakwcShopListReply = 0x059E, + PakcsShopBuyReq = 0x059F, + PakcsShopSellReq = 0x05A0, + PakcsShopBuysellReply = 0x05A1, + PakcsEquipItemRide = 0x05A2, + PakwcEquipItemRide = 0x05A3, + PakcsRepairUseItem = 0x05A4, + PakcsRepairNpc = 0x05A5, + PakwcSetItemLife = 0x05A6, + PakcsPartyReq = 0x05A7, + PakwcPartyReq = 0x05A8, + PakcsPartyReply = 0x05A9, + PakwcPartyReply = 0x05AA, + PakwcPartyMember = 0x05AB, + PakwcPartyItem = 0x05AC, + PakwcPartyLevelexp = 0x05AD, + PakwcPartyMemberUpdate = 0x05AE, + PakwcEventAdd = 0x05AF, + PakcsPartyRule = 0x05B0, + PakwcPartyRule = 0x05B1, + PakcsCraftStatus = 0x05B2, + PakwcCraftStatus = 0x05B3, + PakcsBankMoveMoney = 0x05B4, + PakwcBankMoveMoney = 0x05B5, + PakwcNpcShow = 0x05B6, + PakwcFairy = 0x05B7, + PakcsRideRequest = 0x05B8, + PakwcRideRequest = 0x05B9, + PakwcBillingMessage = 0x05BA, + PakwcBillingMessageExt = 0x05BB, + PakcsClanCommand = 0x05BC, + PakccClanCommand = 0x05BD, + PakcsMessenger = 0x05BE, + PakccMessenger = 0x05BF, + PakcsMessengerChat = 0x05C0, + PakccMessengerChat = 0x05C1, + PakcsChatroom = 0x05C2, + PakccChatroom = 0x05C3, + PakcsChatroomMessage = 0x05C4, + PakccChatroomMessage = 0x05C5, + PakcsMemo = 0x05C6, + PakccMemo = 0x05C7, + PakcsClanIconSet = 0x05C8, + PakcsClanIconReq = 0x05C9, + PakccClanIconReply = 0x05CA, + PakcsClanIconTimestamp = 0x05CB, + PakccClanIconTimestamp = 0x05CC, + PakwcRideStateChange = 0x05CD, + PawkcCharStateChange = 0x05CE, + PakcsScreenShotTimeReq = 0x05CF, + PakscScreenShotTimeReply = 0x05D0, + PakwcUpdateName = 0x05D1, + PakssAcceptReply = 0x05D2, + Epacketmax = 0x05D3, + Stress = 0x6F6D, +} + +#[derive(Debug, Error)] +pub enum PacketError { + #[error("Unknown packet type: {0}")] + UnknownPacket(u16), +} + +impl TryFrom for PacketType { + type Error = PacketError; + + fn try_from(value: u16) -> Result { + match value { + 0x0500 => Ok(PacketType::PakcsAlive), + 0x0501 => Ok(PacketType::PakssError), + 0x0502 => Ok(PacketType::PakssAnnounceText), + 0x0503 => Ok(PacketType::PakswAnnounceChat), + 0x0504 => Ok(PacketType::PakcsAcceptReq), + 0x0505 => Ok(PacketType::PakcsChannelListReq), + 0x0506 => Ok(PacketType::PaklcChannelListReply), + 0x0507 => Ok(PacketType::PakcsLogoutReq), + 0x0508 => Ok(PacketType::PakwcLogoutReply), + 0x0509 => Ok(PacketType::PakcsLoginReq), + 0x050A => Ok(PacketType::PaklcLoginReply), + 0x050B => Ok(PacketType::PakgcLoginReply), + 0x050C => Ok(PacketType::PakcsSrvSelectReq), + 0x050D => Ok(PacketType::PaklcSrvSelectReply), + 0x050E => Ok(PacketType::PakcsJoinServerReq), + 0x050F => Ok(PacketType::PakscJoinServerReply), + 0x0510 => Ok(PacketType::PakwcGmCommand), + 0x0511 => Ok(PacketType::PakwcGlobalVars), + 0x0512 => Ok(PacketType::PakwcGlobalFlags), + 0x0513 => Ok(PacketType::PakccSwitchServer), + 0x0514 => Ok(PacketType::PakcsCharListReq), + 0x0515 => Ok(PacketType::PakccCharListReply), + 0x0516 => Ok(PacketType::PakcsCreateCharReq), + 0x0517 => Ok(PacketType::PakccCreateCharReply), + 0x0518 => Ok(PacketType::PakcsDeleteCharReq), + 0x0519 => Ok(PacketType::PakccDeleteCharReply), + 0x051A => Ok(PacketType::PakcsSelectCharReq), + 0x051B => Ok(PacketType::PakwcSelectCharReply), + 0x051C => Ok(PacketType::PakwcInventoryData), + 0x051D => Ok(PacketType::PakwcSetMoneyAndItem), + 0x051E => Ok(PacketType::PakwcSetItem), + 0x051F => Ok(PacketType::PakwcServerData), + 0x0520 => Ok(PacketType::PakwcQuestData), + 0x0521 => Ok(PacketType::PakcsChangeCharReq), + 0x0522 => Ok(PacketType::PakccChanCharReply), + 0x0523 => Ok(PacketType::PakwcSetMoney), + 0x0524 => Ok(PacketType::PakwcQuestRewardMoney), + 0x0525 => Ok(PacketType::PakwcQuestRewardItem), + 0x0526 => Ok(PacketType::PakwcQuestRewardAddValue), + 0x0527 => Ok(PacketType::PakwcQuestRewardSetValue), + 0x0528 => Ok(PacketType::PakcsCancelLogout), + 0x0529 => Ok(PacketType::PakwcQuestUpdate), + 0x052A => Ok(PacketType::PakwcWishList), + 0x052B => Ok(PacketType::PakcsQuestDataReq), + 0x052C => Ok(PacketType::PakwcQuestDataReply), + 0x052D => Ok(PacketType::PakwcNpcEvent), + 0x052E => Ok(PacketType::PakwcGmCommandCode), + 0x052F => Ok(PacketType::PakcsChangeMapReq), + 0x0530 => Ok(PacketType::PakwcChangeMapReply), + 0x0531 => Ok(PacketType::PakwcInitData), + 0x0532 => Ok(PacketType::PakcsReviveReq), + 0x0533 => Ok(PacketType::PakwcReviveReply), + 0x0534 => Ok(PacketType::PakcsReviveSetPos), + 0x0535 => Ok(PacketType::PakcsSetServerVarReq), + 0x0536 => Ok(PacketType::PakwcSetServerVarReply), + 0x0537 => Ok(PacketType::PakcsCharInfoReq), + 0x0538 => Ok(PacketType::PakwcCharInfoReply), + 0x0539 => Ok(PacketType::PakcsSetWeightReq), + 0x053A => Ok(PacketType::PakwcSetWeight), + 0x053B => Ok(PacketType::PakwcSetPosition), + 0x053C => Ok(PacketType::PakcsStopMoving), + 0x053D => Ok(PacketType::PakwcStopMoving), + 0x053E => Ok(PacketType::PakwcUpdateNpc), + 0x053F => Ok(PacketType::PakcsSummonCmd), + 0x0540 => Ok(PacketType::PakwcSummonCmd), + 0x0541 => Ok(PacketType::PakcsSetAnimation), + 0x0542 => Ok(PacketType::PacwcSetAnimation), + 0x0543 => Ok(PacketType::PakcsToggleMove), + 0x0544 => Ok(PacketType::PakwcToggleMove), + 0x0545 => Ok(PacketType::PakcsNormalChat), + 0x0546 => Ok(PacketType::PakwcNormalChat), + 0x0547 => Ok(PacketType::PakcsWhisperChat), + 0x0548 => Ok(PacketType::PakwcWhisperChat), + 0x0549 => Ok(PacketType::PakcsShoutChat), + 0x054A => Ok(PacketType::PakwcShoutChat), + 0x054B => Ok(PacketType::PakcsPartyChat), + 0x054C => Ok(PacketType::PakwcPartyChat), + 0x054D => Ok(PacketType::PakcsClanChat), + 0x054E => Ok(PacketType::PakwcClanChat), + 0x054F => Ok(PacketType::PakcsAlliedChat), + 0x0550 => Ok(PacketType::PakwcAlliedChat), + 0x0551 => Ok(PacketType::PakcsAlliedShoutChat), + 0x0552 => Ok(PacketType::PakwcAlliedShoutChat), + 0x0553 => Ok(PacketType::PakwcEventStatus), + 0x0554 => Ok(PacketType::PakwcNpcChar), + 0x0555 => Ok(PacketType::PakwcMobChar), + 0x0556 => Ok(PacketType::PakwcPlayerChar), + 0x0557 => Ok(PacketType::PakwcRemoveObject), + 0x0558 => Ok(PacketType::PakcsSetPosition), + 0x0559 => Ok(PacketType::PakcsStop), + 0x055A => Ok(PacketType::PakwcStop), + 0x055B => Ok(PacketType::PakwcMove), + 0x055C => Ok(PacketType::PakcsAttack), + 0x055D => Ok(PacketType::PakwcAttack), + 0x055E => Ok(PacketType::PakcsDamage), + 0x055F => Ok(PacketType::PakwcDamage), + 0x0560 => Ok(PacketType::PakcsMouseCmd), + 0x0561 => Ok(PacketType::PakwcMouseCmd), + 0x0562 => Ok(PacketType::PakwcSetExp), + 0x0563 => Ok(PacketType::PakwcLevelup), + 0x0564 => Ok(PacketType::PakcsHpReq), + 0x0565 => Ok(PacketType::PakwcHpReply), + 0x0566 => Ok(PacketType::PakwcSetHpAndMp), + 0x0567 => Ok(PacketType::PakcsStoreTradeReq), + 0x0568 => Ok(PacketType::PakwcStoreTradeReply), + 0x0569 => Ok(PacketType::PakcsUseItem), + 0x056A => Ok(PacketType::PakwcUseItem), + 0x056B => Ok(PacketType::PakcsDropItem), + 0x056C => Ok(PacketType::PakcsEquipItem), + 0x056D => Ok(PacketType::PakwcEquipItem), + 0x056E => Ok(PacketType::PakwcDropItem), + 0x056F => Ok(PacketType::PakcsPickupItemReq), + 0x0570 => Ok(PacketType::PakwcPickupItemReply), + 0x0571 => Ok(PacketType::PakcsTeleportReq), + 0x0572 => Ok(PacketType::PakwcTeleportReply), + 0x0573 => Ok(PacketType::PakcsStatAddReq), + 0x0574 => Ok(PacketType::PakwcStatAddReply), + 0x0575 => Ok(PacketType::PakcsHotbarSetIconReq), + 0x0576 => Ok(PacketType::PakwcHotbarSetIconReply), + 0x0577 => Ok(PacketType::PakcsEquipProjectile), + 0x0578 => Ok(PacketType::PakwcEquipProjectile), + 0x0579 => Ok(PacketType::PakwcChangeSkin), + 0x057A => Ok(PacketType::PakcsBankListReq), + 0x057B => Ok(PacketType::PakwcBankListReply), + 0x057C => Ok(PacketType::PakcsBankMoveItem), + 0x057D => Ok(PacketType::PakwcBankMoveItem), + 0x057E => Ok(PacketType::PakcsCraftReq), + 0x057F => Ok(PacketType::PakwcCraftReply), + 0x0580 => Ok(PacketType::PakwcSkillLearn), + 0x0581 => Ok(PacketType::PakcsSkillLevelReq), + 0x0582 => Ok(PacketType::PakwcSkillLevelReply), + 0x0583 => Ok(PacketType::PakcsSkillCastSelf), + 0x0584 => Ok(PacketType::PakwcSkillCastSelf), + 0x0585 => Ok(PacketType::PakcsSkillCastTarget), + 0x0586 => Ok(PacketType::PakwcSkillCastTarget), + 0x0587 => Ok(PacketType::PakcsSkillCastPosition), + 0x0588 => Ok(PacketType::PakwcSkillCastPosition), + 0x0589 => Ok(PacketType::PakwcSkillEffect), + 0x058A => Ok(PacketType::PakwcSkillDamage), + 0x058B => Ok(PacketType::PakwcClearStatus), + 0x058C => Ok(PacketType::PakwcSpeedChanged), + 0x058D => Ok(PacketType::PakwcSkillFinish), + 0x058E => Ok(PacketType::PakcsAppraisalReq), + 0x058F => Ok(PacketType::PakwcAppraisalReply), + 0x0590 => Ok(PacketType::PakwcSkillStart), + 0x0591 => Ok(PacketType::PakcsCraftEnhanceReq), + 0x0592 => Ok(PacketType::PakwcCraftEnhanceReply), + 0x0593 => Ok(PacketType::PakwcSkillCancel), + 0x0594 => Ok(PacketType::PakcsWishlistAdd), + 0x0595 => Ok(PacketType::PakcsTrade), + 0x0596 => Ok(PacketType::PakwcTrade), + 0x0597 => Ok(PacketType::PakcsTradeItem), + 0x0598 => Ok(PacketType::PakwcTradeItem), + 0x0599 => Ok(PacketType::PakcsShopOpen), + 0x059A => Ok(PacketType::PakwcShopOpen), + 0x059B => Ok(PacketType::PakcsShopClose), + 0x059C => Ok(PacketType::PakwcShopClose), + 0x059D => Ok(PacketType::PakcsShopListReq), + 0x059E => Ok(PacketType::PakwcShopListReply), + 0x059F => Ok(PacketType::PakcsShopBuyReq), + 0x05A0 => Ok(PacketType::PakcsShopSellReq), + 0x05A1 => Ok(PacketType::PakcsShopBuysellReply), + 0x05A2 => Ok(PacketType::PakcsEquipItemRide), + 0x05A3 => Ok(PacketType::PakwcEquipItemRide), + 0x05A4 => Ok(PacketType::PakcsRepairUseItem), + 0x05A5 => Ok(PacketType::PakcsRepairNpc), + 0x05A6 => Ok(PacketType::PakwcSetItemLife), + 0x05A7 => Ok(PacketType::PakcsPartyReq), + 0x05A8 => Ok(PacketType::PakwcPartyReq), + 0x05A9 => Ok(PacketType::PakcsPartyReply), + 0x05AA => Ok(PacketType::PakwcPartyReply), + 0x05AB => Ok(PacketType::PakwcPartyMember), + 0x05AC => Ok(PacketType::PakwcPartyItem), + 0x05AD => Ok(PacketType::PakwcPartyLevelexp), + 0x05AE => Ok(PacketType::PakwcPartyMemberUpdate), + 0x05AF => Ok(PacketType::PakwcEventAdd), + 0x05B0 => Ok(PacketType::PakcsPartyRule), + 0x05B1 => Ok(PacketType::PakwcPartyRule), + 0x05B2 => Ok(PacketType::PakcsCraftStatus), + 0x05B3 => Ok(PacketType::PakwcCraftStatus), + 0x05B4 => Ok(PacketType::PakcsBankMoveMoney), + 0x05B5 => Ok(PacketType::PakwcBankMoveMoney), + 0x05B6 => Ok(PacketType::PakwcNpcShow), + 0x05B7 => Ok(PacketType::PakwcFairy), + 0x05B8 => Ok(PacketType::PakcsRideRequest), + 0x05B9 => Ok(PacketType::PakwcRideRequest), + 0x05BA => Ok(PacketType::PakwcBillingMessage), + 0x05BB => Ok(PacketType::PakwcBillingMessageExt), + 0x05BC => Ok(PacketType::PakcsClanCommand), + 0x05BD => Ok(PacketType::PakccClanCommand), + 0x05BE => Ok(PacketType::PakcsMessenger), + 0x05BF => Ok(PacketType::PakccMessenger), + 0x05C0 => Ok(PacketType::PakcsMessengerChat), + 0x05C1 => Ok(PacketType::PakccMessengerChat), + 0x05C2 => Ok(PacketType::PakcsChatroom), + 0x05C3 => Ok(PacketType::PakccChatroom), + 0x05C4 => Ok(PacketType::PakcsChatroomMessage), + 0x05C5 => Ok(PacketType::PakccChatroomMessage), + 0x05C6 => Ok(PacketType::PakcsMemo), + 0x05C7 => Ok(PacketType::PakccMemo), + 0x05C8 => Ok(PacketType::PakcsClanIconSet), + 0x05C9 => Ok(PacketType::PakcsClanIconReq), + 0x05CA => Ok(PacketType::PakccClanIconReply), + 0x05CB => Ok(PacketType::PakcsClanIconTimestamp), + 0x05CC => Ok(PacketType::PakccClanIconTimestamp), + 0x05CD => Ok(PacketType::PakwcRideStateChange), + 0x05CE => Ok(PacketType::PawkcCharStateChange), + 0x05CF => Ok(PacketType::PakcsScreenShotTimeReq), + 0x05D0 => Ok(PacketType::PakscScreenShotTimeReply), + 0x05D1 => Ok(PacketType::PakwcUpdateName), + 0x05D2 => Ok(PacketType::PakssAcceptReply), + 0x05D3 => Ok(PacketType::Epacketmax), + 0x6F6D => Ok(PacketType::Stress), + _ => Err(PacketError::UnknownPacket(value)), + } + } +} + +impl From for u16 { + fn from(packet_type: PacketType) -> Self { + packet_type as u16 + } +} diff --git a/packet-service/src/router.rs b/packet-service/src/router.rs index 6e5fe88..f7cd7ec 100644 --- a/packet-service/src/router.rs +++ b/packet-service/src/router.rs @@ -1,13 +1,29 @@ -use crate::packet::Packet; -// use crate::handlers::{chat, movement}; +use crate::packet::{Packet}; +use crate::handlers::{auth}; +use crate::packet_type::PacketType; use std::error::Error; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::Mutex; +use tracing::{debug, warn}; +use crate::auth_client::AuthClient; -pub async fn route_packet(data: &[u8]) -> Result<(), Box> { - let packet = Packet::parse(data)?; - +pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: Arc>) -> Result<(), Box> { + debug!("Routing packet: {:?}", packet); match packet.packet_type { + PacketType::PakcsAlive => Ok(()), + PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await, + PacketType::PakcsJoinServerReq => auth::handle_join_server_req(stream, packet).await, + // Login Stuff + PacketType::PakcsLoginReq => auth::handle_login_req(stream, packet, auth_client).await, + PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet).await, + PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await, + // 1 => chat::handle_chat(packet).await?, // 2 => movement::handle_movement(packet).await?, - _ => Err("Unknown packet type".into()), + _ => { + warn!("Unhandled packet type: {:?}", packet.packet_type); + Ok(()) + }, } } diff --git a/packet-service/src/types.rs b/packet-service/src/types.rs new file mode 100644 index 0000000..5c0899d --- /dev/null +++ b/packet-service/src/types.rs @@ -0,0 +1,38 @@ +use std::time::{Duration}; +use bincode::{Encode, Decode}; + +// `HotbarItem` structure converted to Rust. +#[derive(Clone, Copy)] +struct HotbarItem { + data: HotbarItemData, +} + +// Definition for `data` union in `HotbarItem`. +#[derive(Clone, Copy)] +union HotbarItemData { + item: u16, + components: HotbarItemComponents, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode)] +#[repr(C)] +struct HotbarItemComponents { + type_: u8, + slot_id: u16, +} + +// `Skill` structure converted to Rust. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode)] +struct Skill { + id: u16, + level: u8, +} + +// `StatusEffect` structure converted to Rust. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode)] +struct StatusEffect { + expired: Duration, + value: u16, + unknown: u16, + dt: Duration, +} diff --git a/utils/Cargo.toml b/utils/Cargo.toml new file mode 100644 index 0000000..551d88b --- /dev/null +++ b/utils/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "utils" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +reqwest = { version = "0.12.9", features = ["json"] } +tracing = "0.1" diff --git a/auth-service/src/consul_registration.rs b/utils/src/consul_registration.rs similarity index 100% rename from auth-service/src/consul_registration.rs rename to utils/src/consul_registration.rs diff --git a/utils/src/lib.rs b/utils/src/lib.rs new file mode 100644 index 0000000..299f031 --- /dev/null +++ b/utils/src/lib.rs @@ -0,0 +1,2 @@ +pub mod consul_registration; +pub mod service_discovery; diff --git a/auth-service/src/service_discovery.rs b/utils/src/service_discovery.rs similarity index 69% rename from auth-service/src/service_discovery.rs rename to utils/src/service_discovery.rs index 47f933f..09ab7f2 100644 --- a/auth-service/src/service_discovery.rs +++ b/utils/src/service_discovery.rs @@ -20,16 +20,16 @@ struct Weights { #[derive(Deserialize)] pub struct Service { - pub(crate) ID: String, - pub(crate) Service: String, - pub(crate) Tags: Vec, - pub(crate) Port: u16, - pub(crate) Address: String, - pub(crate) TaggedAddresses: TaggedAddresses, - pub(crate) Weights: Weights, - pub(crate) EnableTagOverride: bool, - pub(crate) ContentHash: String, - pub(crate) Datacenter: String, + pub ID: String, + pub Service: String, + pub Tags: Vec, + pub Port: u16, + pub Address: String, + pub TaggedAddresses: TaggedAddresses, + pub Weights: Weights, + pub EnableTagOverride: bool, + pub ContentHash: String, + pub Datacenter: String, } pub async fn get_service_address(consul_url: &str, service_name: &str) -> Result<(Service), Box> {