Compare commits

...

4 Commits

Author SHA256 Message Date
14c6aa485a - add: redis cache refresh function sets the ttl for a key
- update: session service refresh session function now just updates the ttl of the session instead of calling set
2025-03-09 17:08:56 -04:00
0dc69bcfcf - add: metrics exporting 2025-03-09 17:06:58 -04:00
c4d3da1f94 - fix: formatting for packet routing 2025-03-09 17:04:13 -04:00
90e4346f6d - add: rustfmt max_width set 2025-03-09 17:03:12 -04:00
15 changed files with 116 additions and 179 deletions

View File

@@ -1,8 +1,8 @@
use crate::auth::auth_service_server::AuthService; use crate::auth::auth_service_server::AuthService;
use crate::auth::{ use crate::auth::{
LoginRequest, LoginResponse, LogoutRequest, PasswordResetRequest, PasswordResetResponse, LoginRequest, LoginResponse, LogoutRequest, PasswordResetRequest, PasswordResetResponse,
RegisterRequest, RegisterResponse, ResetPasswordRequest, ResetPasswordResponse, RefreshSessionResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest,
ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse, ResetPasswordResponse, ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse,
}; };
use crate::common::Empty; use crate::common::Empty;
use crate::database_client::{DatabaseClient, DatabaseClientTrait}; use crate::database_client::{DatabaseClient, DatabaseClientTrait};
@@ -160,7 +160,7 @@ impl AuthService for MyAuthService {
async fn refresh_session( async fn refresh_session(
&self, &self,
request: Request<ValidateSessionRequest>, request: Request<ValidateSessionRequest>,
) -> Result<Response<ValidateSessionResponse>, Status> { ) -> Result<Response<RefreshSessionResponse>, Status> {
let req = request.into_inner(); let req = request.into_inner();
let response = self let response = self
.session_client .session_client
@@ -175,11 +175,11 @@ impl AuthService for MyAuthService {
Ok(res) => { Ok(res) => {
let res = res.into_inner(); let res = res.into_inner();
debug!("Session valid: {:?}", res); debug!("Session valid: {:?}", res);
Ok(Response::new(ValidateSessionResponse { valid: true, session_id: res.session_id.to_string(), user_id: res.user_id.to_string() })) Ok(Response::new(RefreshSessionResponse { valid: res.valid }))
} }
Err(_) => { Err(_) => {
debug!("Session invalid or not found"); debug!("Session invalid or not found");
Ok(Response::new(ValidateSessionResponse { valid: false, session_id: "".to_string(), user_id: "".to_string() })) Ok(Response::new(RefreshSessionResponse { valid: false }))
} }
} }
} }

View File

@@ -103,7 +103,8 @@
dockerfile: ./packet-service/Dockerfile dockerfile: ./packet-service/Dockerfile
restart: on-failure restart: on-failure
ports: ports:
- "4000:4000" - "29000:29000"
- "4001:4001"
env_file: env_file:
- ./packet-service/.env - ./packet-service/.env
- .env - .env

View File

@@ -26,6 +26,7 @@ warp = "0.3.7"
dashmap = "6.1.0" dashmap = "6.1.0"
uuid = { version = "1.11.0", features = ["v4"] } uuid = { version = "1.11.0", features = ["v4"] }
chrono = "0.4.39" chrono = "0.4.39"
prometheus_exporter = "0.8.5"
[build-dependencies] [build-dependencies]
tonic-build = "0.12.3" tonic-build = "0.12.3"

View File

@@ -21,5 +21,6 @@ RUN apk add --no-cache libssl3 libgcc
COPY --from=builder /usr/src/packet-service/target/release/packet-service /usr/local/bin/packet-service COPY --from=builder /usr/src/packet-service/target/release/packet-service /usr/local/bin/packet-service
EXPOSE 29000 EXPOSE 29000
EXPOSE 4001
CMD ["packet-service"] CMD ["packet-service"]

View File

@@ -1,7 +1,7 @@
use crate::auth::auth_service_client::AuthServiceClient; use crate::auth::auth_service_client::AuthServiceClient;
use crate::auth::{ use crate::auth::{
LoginRequest, LoginResponse, LogoutRequest, ValidateSessionRequest, ValidateSessionResponse, LoginRequest, LoginResponse, LogoutRequest, RefreshSessionResponse, ValidateSessionRequest,
ValidateTokenRequest, ValidateTokenResponse, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse,
}; };
use crate::common::Empty; use crate::common::Empty;
use tonic::transport::Channel; use tonic::transport::Channel;
@@ -60,7 +60,7 @@ impl AuthClient {
pub async fn refresh_session( pub async fn refresh_session(
&mut self, &mut self,
session_id: &str, session_id: &str,
) -> Result<ValidateSessionResponse, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<RefreshSessionResponse, Box<dyn std::error::Error + Send + Sync>> {
let request = ValidateSessionRequest { let request = ValidateSessionRequest {
session_id: session_id.to_string(), session_id: session_id.to_string(),
}; };

View File

@@ -6,6 +6,8 @@ use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED};
use crate::packet::Packet; use crate::packet::Packet;
use crate::router::PacketRouter; use crate::router::PacketRouter;
use dotenv::dotenv; use dotenv::dotenv;
use prometheus::{self, Encoder, TextEncoder};
use prometheus_exporter;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
@@ -66,6 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address // Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string()); let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string());
let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string()); let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string()); let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
@@ -93,7 +96,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
meta, meta,
&health_check_url, &health_check_url,
) )
.await?; .await?;
// Start health-check endpoint // Start health-check endpoint
consul_registration::start_health_check(addr.as_str()).await?; consul_registration::start_health_check(addr.as_str()).await?;
@@ -153,6 +156,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
}); });
let binding = format!("{}:{}", &addr, metrics_port);
prometheus_exporter::start(binding.parse().unwrap()).unwrap();
utils::signal_handler::wait_for_signal().await; utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str()) consul_registration::deregister_service(&consul_url, service_id.as_str())

View File

@@ -11,6 +11,12 @@ lazy_static! {
"Total number of packets received" "Total number of packets received"
).unwrap(); ).unwrap();
// Counter to track the number of packets sent
pub static ref PACKETS_SENT: Counter = register_counter!(
"packets_sent_total",
"Total number of packets sent"
).unwrap();
// Gauge to track the number of active connections // Gauge to track the number of active connections
pub static ref ACTIVE_CONNECTIONS: Gauge = register_gauge!( pub static ref ACTIVE_CONNECTIONS: Gauge = register_gauge!(
"active_connections", "active_connections",

View File

@@ -1,3 +1,4 @@
use crate::metrics::PACKETS_SENT;
use crate::packet_type::PacketType; use crate::packet_type::PacketType;
use bincode::{Decode, Encode}; use bincode::{Decode, Encode};
use std::error::Error; use std::error::Error;
@@ -112,6 +113,7 @@ impl Packet {
pub async fn send_packet(stream: &mut TcpStream, packet: &Packet) -> Result<(), std::io::Error> { pub async fn send_packet(stream: &mut TcpStream, packet: &Packet) -> Result<(), std::io::Error> {
let data = packet.to_raw(); let data = packet.to_raw();
debug!("Sending '{:#X}' bytes of data. {:?}", data.len(), data); debug!("Sending '{:#X}' bytes of data. {:?}", data.len(), data);
PACKETS_SENT.inc();
stream.write_all(&data).await?; stream.write_all(&data).await?;
Ok(()) Ok(())
} }

View File

@@ -3,7 +3,7 @@ use crate::bufferpool::BufferPool;
use crate::character_client::CharacterClient; use crate::character_client::CharacterClient;
use crate::connection_service::ConnectionService; use crate::connection_service::ConnectionService;
use crate::handlers::*; use crate::handlers::*;
use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED}; use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED, PACKET_PROCESSING_TIME};
use crate::packet::Packet; use crate::packet::Packet;
use crate::packet_type::PacketType; use crate::packet_type::PacketType;
use std::error::Error; use std::error::Error;
@@ -46,6 +46,7 @@ impl PacketRouter {
PACKETS_RECEIVED.inc(); PACKETS_RECEIVED.inc();
let timer = PACKET_PROCESSING_TIME.start_timer();
// Process the packet // Process the packet
match Packet::from_raw(&buffer[..packet_size]) { match Packet::from_raw(&buffer[..packet_size]) {
Ok(packet) => { Ok(packet) => {
@@ -56,6 +57,7 @@ impl PacketRouter {
} }
Err(e) => warn!("Failed to parse packet: {}", e), Err(e) => warn!("Failed to parse packet: {}", e),
} }
timer.stop_and_record();
pool.release(buffer).await; pool.release(buffer).await;
} }
@@ -73,6 +75,7 @@ impl PacketRouter {
Ok(()) Ok(())
} }
#[rustfmt::skip]
pub async fn route_packet( pub async fn route_packet(
&self, &self,
stream: &mut TcpStream, stream: &mut TcpStream,
@@ -82,122 +85,24 @@ impl PacketRouter {
debug!("Routing packet: {:?}", packet); debug!("Routing packet: {:?}", packet);
match packet.packet_type { match packet.packet_type {
// Generic Server Packets // Generic Server Packets
PacketType::PakcsAlive => { PacketType::PakcsAlive => auth::handle_alive_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
auth::handle_alive_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await, PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
PacketType::PakcsJoinServerTokenReq => { PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
auth::handle_join_server_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
// Login Packets // Login Packets
PacketType::PakcsLoginTokenReq => { PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await,
auth::handle_login_req( PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
stream, PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
stream.peer_addr()?,
)
.await
}
PacketType::PakcsLogoutReq => {
auth::handle_logout_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsSrvSelectReq => {
auth::handle_server_select_req(
stream,
packet,
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await, PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
// Character Packets // Character Packets
PacketType::PakcsCharListReq => { PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
character::handle_char_list_req( PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
stream, PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
packet, PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsCreateCharReq => {
character::handle_create_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsDeleteCharReq => {
character::handle_delete_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsSelectCharReq => {
character::handle_select_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
// World Packets // World Packets
PacketType::PakcsChangeMapReq => { PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
world::handle_change_map_req( PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await,
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsMouseCmd => {
world::handle_mouse_cmd_req(
stream,
packet,
self.connection_service.clone(),
connection_id,
)
.await
}
// 1 => chat::handle_chat(packet).await?, // 1 => chat::handle_chat(packet).await?,
// 2 => movement::handle_movement(packet).await?, // 2 => movement::handle_movement(packet).await?,

View File

@@ -9,7 +9,7 @@ service AuthService {
rpc Logout(LogoutRequest) returns (common.Empty); rpc Logout(LogoutRequest) returns (common.Empty);
rpc ValidateToken(ValidateTokenRequest) returns (ValidateTokenResponse); rpc ValidateToken(ValidateTokenRequest) returns (ValidateTokenResponse);
rpc ValidateSession(ValidateSessionRequest) returns (ValidateSessionResponse); rpc ValidateSession(ValidateSessionRequest) returns (ValidateSessionResponse);
rpc RefreshSession(ValidateSessionRequest) returns (ValidateSessionResponse); rpc RefreshSession(ValidateSessionRequest) returns (RefreshSessionResponse);
rpc Register (RegisterRequest) returns (RegisterResponse); rpc Register (RegisterRequest) returns (RegisterResponse);
rpc RequestPasswordReset (PasswordResetRequest) returns (PasswordResetResponse); rpc RequestPasswordReset (PasswordResetRequest) returns (PasswordResetResponse);
rpc ResetPassword (ResetPasswordRequest) returns (ResetPasswordResponse); rpc ResetPassword (ResetPasswordRequest) returns (ResetPasswordResponse);
@@ -51,6 +51,10 @@ message ValidateSessionResponse {
string user_id = 3; string user_id = 3;
} }
message RefreshSessionResponse {
bool valid = 1;
}
message RegisterRequest { message RegisterRequest {
string username = 1; string username = 1;
string email = 2; string email = 2;

View File

@@ -7,7 +7,7 @@ import "common.proto";
service SessionService { service SessionService {
rpc CreateSession (CreateSessionRequest) returns (SessionResponse); rpc CreateSession (CreateSessionRequest) returns (SessionResponse);
rpc GetSession (GetSessionRequest) returns (SessionResponse); rpc GetSession (GetSessionRequest) returns (SessionResponse);
rpc RefreshSession (GetSessionRequest) returns (SessionResponse); rpc RefreshSession (GetSessionRequest) returns (RefreshSessionResponse);
rpc DeleteSession (DeleteSessionRequest) returns (common.Empty); rpc DeleteSession (DeleteSessionRequest) returns (common.Empty);
} }
@@ -36,3 +36,7 @@ message SessionResponse {
string ip_address = 6; string ip_address = 6;
} }
message RefreshSessionResponse {
bool valid = 1;
}

View File

@@ -3,65 +3,66 @@ syntax = "proto3";
package world; package world;
service WorldService { service WorldService {
rpc GetCharacter(CharacterRequest) returns (CharacterResponse); rpc GetCharacter(CharacterRequest) returns (CharacterResponse);
rpc ChangeMap(ChangeMapRequest) returns (ChangeMapResponse); rpc ChangeMap(ChangeMapRequest) returns (ChangeMapResponse);
rpc MoveCharacter(CharacterMoveRequest) returns (CharacterMoveResponse); rpc MoveCharacter(CharacterMoveRequest) returns (CharacterMoveResponse);
rpc GetTargetHp(ObjectHpRequest) returns (ObjectHpResponse);
} }
message CharacterRequest { message CharacterRequest {
string token = 1; string token = 1;
string user_id = 2; string user_id = 2;
string char_id = 3; string char_id = 3;
string session_id = 4; string session_id = 4;
} }
message CharacterResponse { message CharacterResponse {
int32 count = 1; int32 count = 1;
} }
message CharacterMoveRequest { message CharacterMoveRequest {
string session_id = 1; string session_id = 1;
uint32 target_id = 2; uint32 target_id = 2;
float x = 3; float x = 3;
float y = 4; float y = 4;
float z = 5; float z = 5;
} }
message CharacterMoveResponse { message CharacterMoveResponse {
int32 id = 1; int32 id = 1;
int32 target_id = 2; int32 target_id = 2;
int32 distance = 3; int32 distance = 3;
float x = 4; float x = 4;
float y = 5; float y = 5;
float z = 6; float z = 6;
} }
message ChangeMapRequest { message ChangeMapRequest {
int32 id = 1; int32 id = 1;
float x = 2; float x = 2;
float y = 3; float y = 3;
} }
message ChangeMapResponse { message ChangeMapResponse {
int32 id = 1; int32 id = 1;
int32 map_id = 2; int32 map_id = 2;
float x = 3; float x = 3;
float y = 4; float y = 4;
int32 move_mode = 5; int32 move_mode = 5;
int32 ride_mode = 6; int32 ride_mode = 6;
} }
message AttackRequest { message AttackRequest {
string session_id = 1; string session_id = 1;
uint32 target_id = 2; uint32 target_id = 2;
} }
message ObjectHpRequest { message ObjectHpRequest {
string session_id = 1; string session_id = 1;
uint32 target_id = 2; uint32 target_id = 2;
} }
message ObjectHpResponse { message ObjectHpResponse {
uint32 target_id = 1; uint32 target_id = 1;
int32 hp = 2; int32 hp = 2;
} }

1
rustfmt.toml Normal file
View File

@@ -0,0 +1 @@
max_width = 120

View File

@@ -1,10 +1,11 @@
use crate::api::session_service_server::SessionService; use crate::api::session_service_server::SessionService;
use crate::api::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest, SessionResponse}; use crate::api::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest, RefreshSessionResponse, SessionResponse};
use crate::common::Empty; use crate::common::Empty;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
use tracing::debug;
use utils::redis_cache::{Cache, RedisCache}; use utils::redis_cache::{Cache, RedisCache};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@@ -100,34 +101,25 @@ impl SessionService for SessionServiceImpl {
Ok(Response::new(Empty {})) Ok(Response::new(Empty {}))
} }
async fn refresh_session( async fn refresh_session(
&self, &self,
request: Request<GetSessionRequest>, request: Request<GetSessionRequest>,
) -> Result<Response<SessionResponse>, Status> { ) -> Result<Response<RefreshSessionResponse>, Status> {
let req = request.into_inner(); let req = request.into_inner();
let conn = self.redis.lock().await; let conn = self.redis.lock().await;
match conn.refresh(&req.session_id, 300).await {
Ok(Response) => {
let response = RefreshSessionResponse {
valid: true
};
if let Some(session_data) = conn debug!("Session refreshed: {:?}", response);
.get::<String>(&req.session_id) Ok(Response::new(response))
.await }
.map_err(|_| Status::internal("Failed to fetch session from Redis"))? Err(e) => {
{ Err(Status::unknown(e.to_string()))
let _ = conn.update(&req.session_id, Some(&session_data), Some(300)); }
let session: Session = serde_json::from_str(&session_data)
.map_err(|_| Status::internal("Failed to deserialize session"))?;
let response = SessionResponse {
session_id: req.session_id,
user_id: session.user_id,
username: session.username,
character_id: session.character_id,
login_time: session.login_time,
ip_address: session.ip_address,
};
Ok(Response::new(response))
} else {
Err(Status::not_found("Session not found"))
} }
} }
} }

View File

@@ -25,6 +25,8 @@ pub trait Cache {
) -> Result<Option<T>, redis::RedisError>; ) -> Result<Option<T>, redis::RedisError>;
async fn delete(&mut self, key: &str) -> redis::RedisResult<()>; async fn delete(&mut self, key: &str) -> redis::RedisResult<()>;
async fn refresh(&self, key: &str, ttl: i64) -> redis::RedisResult<()>;
} }
pub struct RedisCache { pub struct RedisCache {
@@ -137,4 +139,15 @@ impl Cache for RedisCache {
})?; })?;
conn.del(key).await conn.del(key).await
} }
async fn refresh(&self, key: &str, ttl: i64) -> redis::RedisResult<()> {
let mut conn = self.pool.get().await.map_err(|err| {
redis::RedisError::from((
redis::ErrorKind::IoError,
"Failed to get Redis connection",
format!("{:?}", err),
))
})?;
conn.expire(key, ttl).await
}
} }