Compare commits

...

4 Commits

Author SHA256 Message Date
14c6aa485a - add: redis cache refresh function sets the ttl for a key
- update: session service refresh session function now just updates the ttl of the session instead of calling set
2025-03-09 17:08:56 -04:00
0dc69bcfcf - add: metrics exporting 2025-03-09 17:06:58 -04:00
c4d3da1f94 - fix: formatting for packet routing 2025-03-09 17:04:13 -04:00
90e4346f6d - add: rustfmt max_width set 2025-03-09 17:03:12 -04:00
15 changed files with 116 additions and 179 deletions

View File

@@ -1,8 +1,8 @@
use crate::auth::auth_service_server::AuthService;
use crate::auth::{
LoginRequest, LoginResponse, LogoutRequest, PasswordResetRequest, PasswordResetResponse,
RegisterRequest, RegisterResponse, ResetPasswordRequest, ResetPasswordResponse,
ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse,
RefreshSessionResponse, RegisterRequest, RegisterResponse, ResetPasswordRequest,
ResetPasswordResponse, ValidateSessionRequest, ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse,
};
use crate::common::Empty;
use crate::database_client::{DatabaseClient, DatabaseClientTrait};
@@ -160,7 +160,7 @@ impl AuthService for MyAuthService {
async fn refresh_session(
&self,
request: Request<ValidateSessionRequest>,
) -> Result<Response<ValidateSessionResponse>, Status> {
) -> Result<Response<RefreshSessionResponse>, Status> {
let req = request.into_inner();
let response = self
.session_client
@@ -175,11 +175,11 @@ impl AuthService for MyAuthService {
Ok(res) => {
let res = res.into_inner();
debug!("Session valid: {:?}", res);
Ok(Response::new(ValidateSessionResponse { valid: true, session_id: res.session_id.to_string(), user_id: res.user_id.to_string() }))
Ok(Response::new(RefreshSessionResponse { valid: res.valid }))
}
Err(_) => {
debug!("Session invalid or not found");
Ok(Response::new(ValidateSessionResponse { valid: false, session_id: "".to_string(), user_id: "".to_string() }))
Ok(Response::new(RefreshSessionResponse { valid: false }))
}
}
}

View File

@@ -103,7 +103,8 @@
dockerfile: ./packet-service/Dockerfile
restart: on-failure
ports:
- "4000:4000"
- "29000:29000"
- "4001:4001"
env_file:
- ./packet-service/.env
- .env

View File

@@ -26,6 +26,7 @@ warp = "0.3.7"
dashmap = "6.1.0"
uuid = { version = "1.11.0", features = ["v4"] }
chrono = "0.4.39"
prometheus_exporter = "0.8.5"
[build-dependencies]
tonic-build = "0.12.3"

View File

@@ -21,5 +21,6 @@ RUN apk add --no-cache libssl3 libgcc
COPY --from=builder /usr/src/packet-service/target/release/packet-service /usr/local/bin/packet-service
EXPOSE 29000
EXPOSE 4001
CMD ["packet-service"]

View File

@@ -1,7 +1,7 @@
use crate::auth::auth_service_client::AuthServiceClient;
use crate::auth::{
LoginRequest, LoginResponse, LogoutRequest, ValidateSessionRequest, ValidateSessionResponse,
ValidateTokenRequest, ValidateTokenResponse,
LoginRequest, LoginResponse, LogoutRequest, RefreshSessionResponse, ValidateSessionRequest,
ValidateSessionResponse, ValidateTokenRequest, ValidateTokenResponse,
};
use crate::common::Empty;
use tonic::transport::Channel;
@@ -60,7 +60,7 @@ impl AuthClient {
pub async fn refresh_session(
&mut self,
session_id: &str,
) -> Result<ValidateSessionResponse, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<RefreshSessionResponse, Box<dyn std::error::Error + Send + Sync>> {
let request = ValidateSessionRequest {
session_id: session_id.to_string(),
};

View File

@@ -6,6 +6,8 @@ use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED};
use crate::packet::Packet;
use crate::router::PacketRouter;
use dotenv::dotenv;
use prometheus::{self, Encoder, TextEncoder};
use prometheus_exporter;
use std::collections::HashMap;
use std::env;
use std::error::Error;
@@ -66,6 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the gRPC server address
let addr = env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let port = env::var("PACKET_SERVICE_PORT").unwrap_or_else(|_| "4000".to_string());
let metrics_port = env::var("PACKET_METRICS_PORT").unwrap_or_else(|_| "4001".to_string());
let health_port = env::var("HEALTH_CHECK_PORT").unwrap_or_else(|_| "8082".to_string());
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
@@ -153,6 +156,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
let binding = format!("{}:{}", &addr, metrics_port);
prometheus_exporter::start(binding.parse().unwrap()).unwrap();
utils::signal_handler::wait_for_signal().await;
consul_registration::deregister_service(&consul_url, service_id.as_str())

View File

@@ -11,6 +11,12 @@ lazy_static! {
"Total number of packets received"
).unwrap();
// Counter to track the number of packets sent
pub static ref PACKETS_SENT: Counter = register_counter!(
"packets_sent_total",
"Total number of packets sent"
).unwrap();
// Gauge to track the number of active connections
pub static ref ACTIVE_CONNECTIONS: Gauge = register_gauge!(
"active_connections",

View File

@@ -1,3 +1,4 @@
use crate::metrics::PACKETS_SENT;
use crate::packet_type::PacketType;
use bincode::{Decode, Encode};
use std::error::Error;
@@ -112,6 +113,7 @@ impl Packet {
pub async fn send_packet(stream: &mut TcpStream, packet: &Packet) -> Result<(), std::io::Error> {
let data = packet.to_raw();
debug!("Sending '{:#X}' bytes of data. {:?}", data.len(), data);
PACKETS_SENT.inc();
stream.write_all(&data).await?;
Ok(())
}

View File

@@ -3,7 +3,7 @@ use crate::bufferpool::BufferPool;
use crate::character_client::CharacterClient;
use crate::connection_service::ConnectionService;
use crate::handlers::*;
use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED};
use crate::metrics::{ACTIVE_CONNECTIONS, PACKETS_RECEIVED, PACKET_PROCESSING_TIME};
use crate::packet::Packet;
use crate::packet_type::PacketType;
use std::error::Error;
@@ -46,6 +46,7 @@ impl PacketRouter {
PACKETS_RECEIVED.inc();
let timer = PACKET_PROCESSING_TIME.start_timer();
// Process the packet
match Packet::from_raw(&buffer[..packet_size]) {
Ok(packet) => {
@@ -56,6 +57,7 @@ impl PacketRouter {
}
Err(e) => warn!("Failed to parse packet: {}", e),
}
timer.stop_and_record();
pool.release(buffer).await;
}
@@ -73,6 +75,7 @@ impl PacketRouter {
Ok(())
}
#[rustfmt::skip]
pub async fn route_packet(
&self,
stream: &mut TcpStream,
@@ -82,122 +85,24 @@ impl PacketRouter {
debug!("Routing packet: {:?}", packet);
match packet.packet_type {
// Generic Server Packets
PacketType::PakcsAlive => {
auth::handle_alive_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsAlive => auth::handle_alive_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
PacketType::PakcsJoinServerTokenReq => {
auth::handle_join_server_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
// Login Packets
PacketType::PakcsLoginTokenReq => {
auth::handle_login_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
stream.peer_addr()?,
)
.await
}
PacketType::PakcsLogoutReq => {
auth::handle_logout_req(
stream,
packet,
self.auth_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsSrvSelectReq => {
auth::handle_server_select_req(
stream,
packet,
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id, stream.peer_addr()?).await,
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, self.auth_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet, self.connection_service.clone(), connection_id).await,
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
// Character Packets
PacketType::PakcsCharListReq => {
character::handle_char_list_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsCreateCharReq => {
character::handle_create_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsDeleteCharReq => {
character::handle_delete_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsSelectCharReq => {
character::handle_select_char_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsCharListReq => character::handle_char_list_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsCreateCharReq => character::handle_create_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsDeleteCharReq => character::handle_delete_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsSelectCharReq => character::handle_select_char_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
// World Packets
PacketType::PakcsChangeMapReq => {
world::handle_change_map_req(
stream,
packet,
self.character_client.clone(),
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsMouseCmd => {
world::handle_mouse_cmd_req(
stream,
packet,
self.connection_service.clone(),
connection_id,
)
.await
}
PacketType::PakcsChangeMapReq => world::handle_change_map_req(stream, packet, self.character_client.clone(), self.connection_service.clone(), connection_id).await,
PacketType::PakcsMouseCmd => world::handle_mouse_cmd_req(stream, packet, self.connection_service.clone(), connection_id).await,
// 1 => chat::handle_chat(packet).await?,
// 2 => movement::handle_movement(packet).await?,

View File

@@ -9,7 +9,7 @@ service AuthService {
rpc Logout(LogoutRequest) returns (common.Empty);
rpc ValidateToken(ValidateTokenRequest) returns (ValidateTokenResponse);
rpc ValidateSession(ValidateSessionRequest) returns (ValidateSessionResponse);
rpc RefreshSession(ValidateSessionRequest) returns (ValidateSessionResponse);
rpc RefreshSession(ValidateSessionRequest) returns (RefreshSessionResponse);
rpc Register (RegisterRequest) returns (RegisterResponse);
rpc RequestPasswordReset (PasswordResetRequest) returns (PasswordResetResponse);
rpc ResetPassword (ResetPasswordRequest) returns (ResetPasswordResponse);
@@ -51,6 +51,10 @@ message ValidateSessionResponse {
string user_id = 3;
}
message RefreshSessionResponse {
bool valid = 1;
}
message RegisterRequest {
string username = 1;
string email = 2;

View File

@@ -7,7 +7,7 @@ import "common.proto";
service SessionService {
rpc CreateSession (CreateSessionRequest) returns (SessionResponse);
rpc GetSession (GetSessionRequest) returns (SessionResponse);
rpc RefreshSession (GetSessionRequest) returns (SessionResponse);
rpc RefreshSession (GetSessionRequest) returns (RefreshSessionResponse);
rpc DeleteSession (DeleteSessionRequest) returns (common.Empty);
}
@@ -36,3 +36,7 @@ message SessionResponse {
string ip_address = 6;
}
message RefreshSessionResponse {
bool valid = 1;
}

View File

@@ -6,6 +6,7 @@ service WorldService {
rpc GetCharacter(CharacterRequest) returns (CharacterResponse);
rpc ChangeMap(ChangeMapRequest) returns (ChangeMapResponse);
rpc MoveCharacter(CharacterMoveRequest) returns (CharacterMoveResponse);
rpc GetTargetHp(ObjectHpRequest) returns (ObjectHpResponse);
}
message CharacterRequest {

1
rustfmt.toml Normal file
View File

@@ -0,0 +1 @@
max_width = 120

View File

@@ -1,10 +1,11 @@
use crate::api::session_service_server::SessionService;
use crate::api::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest, SessionResponse};
use crate::api::{CreateSessionRequest, DeleteSessionRequest, GetSessionRequest, RefreshSessionResponse, SessionResponse};
use crate::common::Empty;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};
use tracing::debug;
use utils::redis_cache::{Cache, RedisCache};
#[derive(Serialize, Deserialize, Debug)]
@@ -100,34 +101,25 @@ impl SessionService for SessionServiceImpl {
Ok(Response::new(Empty {}))
}
async fn refresh_session(
&self,
request: Request<GetSessionRequest>,
) -> Result<Response<SessionResponse>, Status> {
) -> Result<Response<RefreshSessionResponse>, Status> {
let req = request.into_inner();
let conn = self.redis.lock().await;
if let Some(session_data) = conn
.get::<String>(&req.session_id)
.await
.map_err(|_| Status::internal("Failed to fetch session from Redis"))?
{
let _ = conn.update(&req.session_id, Some(&session_data), Some(300));
let session: Session = serde_json::from_str(&session_data)
.map_err(|_| Status::internal("Failed to deserialize session"))?;
let response = SessionResponse {
session_id: req.session_id,
user_id: session.user_id,
username: session.username,
character_id: session.character_id,
login_time: session.login_time,
ip_address: session.ip_address,
match conn.refresh(&req.session_id, 300).await {
Ok(Response) => {
let response = RefreshSessionResponse {
valid: true
};
debug!("Session refreshed: {:?}", response);
Ok(Response::new(response))
} else {
Err(Status::not_found("Session not found"))
}
Err(e) => {
Err(Status::unknown(e.to_string()))
}
}
}
}

View File

@@ -25,6 +25,8 @@ pub trait Cache {
) -> Result<Option<T>, redis::RedisError>;
async fn delete(&mut self, key: &str) -> redis::RedisResult<()>;
async fn refresh(&self, key: &str, ttl: i64) -> redis::RedisResult<()>;
}
pub struct RedisCache {
@@ -137,4 +139,15 @@ impl Cache for RedisCache {
})?;
conn.del(key).await
}
async fn refresh(&self, key: &str, ttl: i64) -> redis::RedisResult<()> {
let mut conn = self.pool.get().await.map_err(|err| {
redis::RedisError::from((
redis::ErrorKind::IoError,
"Failed to get Redis connection",
format!("{:?}", err),
))
})?;
conn.expire(key, ttl).await
}
}