- add: logout grpc function
- add: logout packet handler - add: connection state and service for storing connection data - add: session service calls to auth-service - fix: compile error on database service due to moved redis cache
This commit is contained in:
@@ -23,6 +23,8 @@ tonic = "0.12.3"
|
||||
prost = "0.13.4"
|
||||
utils = { path = "../utils" }
|
||||
warp = "0.3.7"
|
||||
dashmap = "6.1.0"
|
||||
uuid = { version = "1.11.0", features = ["v4"] }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.12.3"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::auth::auth_service_client::AuthServiceClient;
|
||||
use crate::auth::{LoginRequest, LoginResponse, ValidateTokenRequest, ValidateTokenResponse};
|
||||
use crate::auth::{Empty, LoginRequest, LoginResponse, LogoutRequest, ValidateTokenRequest, ValidateTokenResponse};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
pub struct AuthClient {
|
||||
@@ -12,10 +12,11 @@ impl AuthClient {
|
||||
Ok(AuthClient { client })
|
||||
}
|
||||
|
||||
pub async fn login(&mut self, username: &str, password: &str) -> Result<LoginResponse, Box<dyn std::error::Error + Send + Sync>> {
|
||||
pub async fn login(&mut self, username: &str, password: &str, ip_address: &str) -> Result<LoginResponse, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let request = LoginRequest {
|
||||
username: username.to_string(),
|
||||
password: password.to_string(),
|
||||
ip_address: ip_address.to_string(),
|
||||
};
|
||||
|
||||
let response = self.client.login(request).await?;
|
||||
@@ -24,10 +25,19 @@ impl AuthClient {
|
||||
|
||||
pub async fn login_token(&mut self, token: &str) -> Result<ValidateTokenResponse, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let request = ValidateTokenRequest {
|
||||
token: token.to_string(),
|
||||
token: token.to_string()
|
||||
};
|
||||
|
||||
let response = self.client.validate_token(request).await?;
|
||||
Ok(response.into_inner())
|
||||
}
|
||||
|
||||
pub async fn logout(&mut self, session_id: &str) -> Result<Empty, Box<dyn std::error::Error + Send + Sync>> {
|
||||
let request = LogoutRequest {
|
||||
session_id: session_id.to_string(),
|
||||
};
|
||||
|
||||
let response = self.client.logout(request).await?;
|
||||
Ok(response.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
30
packet-service/src/connection_service.rs
Normal file
30
packet-service/src/connection_service.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use dashmap::DashMap;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
use crate::connection_state::ConnectionState;
|
||||
|
||||
pub struct ConnectionService {
|
||||
pub connections: Arc<DashMap<String, ConnectionState>>, // Map connection ID to state
|
||||
}
|
||||
|
||||
impl ConnectionService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
connections: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_connection(&self) -> String {
|
||||
let connection_id = Uuid::new_v4().to_string();
|
||||
self.connections.insert(connection_id.clone(), ConnectionState::new());
|
||||
connection_id
|
||||
}
|
||||
|
||||
pub fn get_connection(&self, connection_id: &str) -> Option<ConnectionState> {
|
||||
self.connections.get(connection_id).map(|entry| entry.clone())
|
||||
}
|
||||
|
||||
pub fn remove_connection(&self, connection_id: &str) {
|
||||
self.connections.remove(connection_id);
|
||||
}
|
||||
}
|
||||
20
packet-service/src/connection_state.rs
Normal file
20
packet-service/src/connection_state.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ConnectionState {
|
||||
pub user_id: Option<i32>,
|
||||
pub session_id: Option<String>,
|
||||
pub character_id: Option<i32>,
|
||||
pub additional_data: HashMap<String, String>, // Flexible data storage
|
||||
}
|
||||
|
||||
impl ConnectionState {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
user_id: None,
|
||||
session_id: None,
|
||||
character_id: None,
|
||||
additional_data: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,17 +15,17 @@ use crate::packets::*;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::{Code, Status};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use utils::service_discovery;
|
||||
use crate::connection_service::ConnectionService;
|
||||
use crate::packets::cli_logout_req::CliLogoutReq;
|
||||
|
||||
pub(crate) async fn handle_accept_req(stream: &mut TcpStream, packet: Packet) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
// let request = CliAcceptReq::decode(packet.payload.as_slice());
|
||||
|
||||
// We need to do reply to this packet
|
||||
let data = SrvAcceptReply { result: srv_accept_reply::Result::Accepted, rand_value: 0 };
|
||||
let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?;
|
||||
|
||||
@@ -39,7 +39,19 @@ pub(crate) async fn handle_join_server_req(stream: &mut TcpStream, packet: Packe
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
pub(crate) async fn handle_logout_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>, connection_service: Arc<ConnectionService>, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let request = CliLogoutReq::decode(packet.payload.as_slice());
|
||||
let mut auth_client = auth_client.lock().await;
|
||||
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
||||
let session_id = state.session_id.clone().unwrap();
|
||||
auth_client.logout(&session_id).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err("Unable to find connection state".into())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>, connection_service: Arc<ConnectionService>, connection_id: String, addr: SocketAddr) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
debug!("decoding packet payload of size {}", packet.payload.as_slice().len());
|
||||
let data = CliLoginTokenReq::decode(packet.payload.as_slice())?;
|
||||
debug!("{:?}", data);
|
||||
@@ -56,6 +68,11 @@ pub(crate) async fn handle_login_req(stream: &mut TcpStream, packet: Packet, aut
|
||||
} else {
|
||||
debug!("Successfully logged in");
|
||||
|
||||
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
||||
state.user_id = Some(response.user_id.parse().unwrap());
|
||||
// auth_client.logout(&session_id).await?;
|
||||
}
|
||||
|
||||
let consul_url = env::var("CONSUL_URL").unwrap_or_else(|_| "http://127.0.0.1:8500".to_string());
|
||||
let servers = service_discovery::get_service_address(&consul_url, "character-service").await.unwrap_or_else(|err| {
|
||||
warn!(err);
|
||||
@@ -125,10 +142,10 @@ pub(crate) async fn handle_server_select_req(stream: &mut TcpStream, packet: Pac
|
||||
|
||||
let data = SrvSrvSelectReply {
|
||||
result: srv_srv_select_reply::Result::Failed,
|
||||
session_id: 0,
|
||||
crypt_val: 0,
|
||||
ip: NullTerminatedString::new(""),
|
||||
port: 0,
|
||||
session_id: 0, // Client should already have this value
|
||||
crypt_val: 0, // This is only for the old encryption
|
||||
ip: NullTerminatedString::new(""), // If this is empty, the client should stay connected (requires client change)
|
||||
port: 0, // See comment about ip above
|
||||
};
|
||||
|
||||
let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?;
|
||||
|
||||
@@ -18,6 +18,7 @@ use tracing::{debug, error, info, warn};
|
||||
use utils::consul_registration;
|
||||
use utils::service_discovery::get_service_address;
|
||||
use warp::Filter;
|
||||
use crate::connection_service::ConnectionService;
|
||||
|
||||
mod packet_type;
|
||||
mod packet;
|
||||
@@ -30,6 +31,9 @@ mod handlers;
|
||||
mod bufferpool;
|
||||
mod metrics;
|
||||
mod auth_client;
|
||||
mod connection_state;
|
||||
mod connection_service;
|
||||
|
||||
pub mod auth {
|
||||
tonic::include_proto!("auth"); // Path matches the package name in auth.proto
|
||||
}
|
||||
@@ -38,7 +42,7 @@ const BUFFER_POOL_SIZE: usize = 1000;
|
||||
const MAX_CONCURRENT_CONNECTIONS: usize = 100;
|
||||
|
||||
|
||||
async fn handle_connection(stream: &mut TcpStream, pool: Arc<BufferPool>, auth_client: Arc<Mutex<AuthClient>>) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
async fn handle_connection(stream: &mut TcpStream, pool: Arc<BufferPool>, auth_client: Arc<Mutex<AuthClient>>, connection_service: Arc<ConnectionService>, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
ACTIVE_CONNECTIONS.inc();
|
||||
while let Some(mut buffer) = pool.acquire().await {
|
||||
// Read data into the buffer
|
||||
@@ -53,13 +57,19 @@ async fn handle_connection(stream: &mut TcpStream, pool: Arc<BufferPool>, auth_c
|
||||
Ok(packet) => {
|
||||
debug!("Parsed Packet: {:?}", packet);
|
||||
// Handle the parsed packet (route it, process it, etc.)
|
||||
router::route_packet(stream, packet, auth_client.clone()).await?;
|
||||
router::route_packet(stream, packet, auth_client.clone(), connection_service.clone(), connection_id.clone()).await?;
|
||||
}
|
||||
Err(e) => warn!("Failed to parse packet: {}", e),
|
||||
}
|
||||
|
||||
pool.release(buffer).await;
|
||||
}
|
||||
|
||||
if let Some(state) = connection_service.get_connection(&connection_id) {
|
||||
let session_id = state.session_id.unwrap();
|
||||
let mut auth_client = auth_client.lock().await;
|
||||
auth_client.logout(&session_id).await?;
|
||||
}
|
||||
ACTIVE_CONNECTIONS.dec();
|
||||
Ok(())
|
||||
}
|
||||
@@ -113,12 +123,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS));
|
||||
let listener = TcpListener::bind(full_addr.clone()).await.unwrap();
|
||||
let buffer_pool = BufferPool::new(BUFFER_POOL_SIZE);
|
||||
let connection_service = Arc::new(ConnectionService::new());
|
||||
|
||||
info!("Packet service listening on {}", full_addr);
|
||||
|
||||
loop {
|
||||
let (mut socket, addr) = listener.accept().await.unwrap();
|
||||
let auth_client = auth_client.clone();
|
||||
let connection_service = connection_service.clone();
|
||||
info!("New connection from {}", addr);
|
||||
|
||||
let pool = buffer_pool.clone();
|
||||
@@ -127,9 +139,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Spawn a new task for each connection
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit;
|
||||
if let Err(e) = handle_connection(&mut socket, pool, auth_client).await {
|
||||
let connection_id = connection_service.add_connection();
|
||||
if let Err(e) = handle_connection(&mut socket, pool, auth_client, connection_service.clone(), connection_id.clone()).await {
|
||||
error!("Error handling connection: {}", e);
|
||||
}
|
||||
connection_service.remove_connection(&connection_id);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -7,15 +7,17 @@ use std::sync::Arc;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, warn};
|
||||
use crate::connection_service::ConnectionService;
|
||||
|
||||
pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
pub async fn route_packet(stream: &mut TcpStream, packet: Packet, auth_client: Arc<Mutex<AuthClient>>, connection_service: Arc<ConnectionService>, connection_id: String) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
debug!("Routing packet: {:?}", packet);
|
||||
match packet.packet_type {
|
||||
PacketType::PakcsAlive => Ok(()),
|
||||
PacketType::PakcsAcceptReq => auth::handle_accept_req(stream, packet).await,
|
||||
PacketType::PakcsJoinServerTokenReq => auth::handle_join_server_req(stream, packet).await,
|
||||
// Login Stuff
|
||||
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, auth_client).await,
|
||||
PacketType::PakcsLoginTokenReq => auth::handle_login_req(stream, packet, auth_client, connection_service, connection_id, stream.peer_addr()?).await,
|
||||
PacketType::PakcsLogoutReq => auth::handle_logout_req(stream, packet, auth_client, connection_service, connection_id).await,
|
||||
PacketType::PakcsSrvSelectReq => auth::handle_server_select_req(stream, packet).await,
|
||||
PacketType::PakcsChannelListReq => auth::handle_channel_list_req(stream, packet).await,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user