416 lines
18 KiB
Rust
416 lines
18 KiB
Rust
use crate::auth_client::AuthClient;
|
|
use crate::connection_service::ConnectionService;
|
|
use crate::packet::{send_packet, Packet, PacketPayload};
|
|
use crate::packet_type::PacketType;
|
|
use crate::packets::*;
|
|
use std::collections::HashMap;
|
|
use std::env;
|
|
use std::error::Error;
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio::net::TcpStream;
|
|
use tokio::sync::Mutex;
|
|
use tonic::{Code, Status};
|
|
use tracing::{debug, error, info, warn};
|
|
use utils::null_string::NullTerminatedString;
|
|
use utils::service_discovery;
|
|
use utils::service_discovery::{get_kube_service_endpoints_by_dns, get_service_info};
|
|
use crate::handlers::chat::create_chat_client_handler;
|
|
use crate::handlers::chat_client::ChatClientHandler;
|
|
|
|
pub(crate) async fn handle_alive_req(
|
|
packet: Packet,
|
|
auth_client: Arc<Mutex<AuthClient>>,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
|
let session_id = state.session_id.clone().unwrap();
|
|
debug!("Attempting to refresh session {}", session_id);
|
|
let mut auth_client = auth_client.lock().await;
|
|
let session = auth_client.refresh_session(&session_id).await?;
|
|
if (!session.valid) {
|
|
warn!("Invalid session ID: {}", session_id);
|
|
return Err("Session not valid".into());
|
|
}
|
|
Ok(())
|
|
} else {
|
|
Err("Unable to find connection state".into())
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn handle_accept_req(
|
|
packet: Packet,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::srv_accept_reply::SrvAcceptReply;
|
|
let data = SrvAcceptReply {
|
|
result: srv_accept_reply::Result::Accepted,
|
|
rand_value: 0,
|
|
};
|
|
let response_packet = Packet::new(PacketType::PakssAcceptReply, &data)?;
|
|
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn handle_join_server_req(
|
|
packet: Packet,
|
|
auth_client: Arc<Mutex<AuthClient>>,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::cli_join_server_token_req::CliJoinServerTokenReq;
|
|
use crate::packets::srv_join_server_reply::SrvJoinServerReply;
|
|
let request = CliJoinServerTokenReq::decode(packet.payload.as_slice());
|
|
debug!("{:?}", request);
|
|
|
|
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
|
let session_id = state.session_id.clone().unwrap();
|
|
let mut auth_client = auth_client.lock().await;
|
|
let session = auth_client.validate_session(&session_id).await?;
|
|
if (!session.valid) {
|
|
warn!("Invalid session ID: {}", session_id);
|
|
|
|
let data = SrvJoinServerReply {
|
|
result: srv_join_server_reply::Result::Failed,
|
|
id: 0,
|
|
pay_flag: 0,
|
|
};
|
|
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
return Err("Session not valid".into());
|
|
}
|
|
|
|
let local_id = state.local_id.clone();
|
|
let data = SrvJoinServerReply {
|
|
result: srv_join_server_reply::Result::Ok,
|
|
id: local_id as u32,
|
|
pay_flag: 0,
|
|
};
|
|
let response_packet = Packet::new(PacketType::PakscJoinServerReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
Ok(())
|
|
} else {
|
|
Err("Unable to find connection state".into())
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn handle_logout_req(
|
|
packet: Packet,
|
|
auth_client: Arc<Mutex<AuthClient>>,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::srv_logout_reply::SrvLogoutReply;
|
|
if let Some(mut state) = connection_service.get_connection(&connection_id) {
|
|
let session_id = state.session_id.clone().unwrap();
|
|
let mut auth_client = auth_client.lock().await;
|
|
auth_client.logout(&session_id).await?;
|
|
|
|
let data = SrvLogoutReply { wait_time: 1 };
|
|
let response_packet = Packet::new(PacketType::PakwcLogoutReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
locked_stream.shutdown().await?;
|
|
}
|
|
Ok(())
|
|
} else {
|
|
Err("Unable to find connection state".into())
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn handle_login_req(
|
|
packet: Packet,
|
|
auth_client: Arc<Mutex<AuthClient>>,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::cli_login_token_req::CliLoginTokenReq;
|
|
use crate::packets::srv_login_reply::{ServerInfo, SrvLoginReply};
|
|
debug!("decoding packet payload of size {}", packet.payload.as_slice().len());
|
|
let data = CliLoginTokenReq::decode(packet.payload.as_slice())?;
|
|
debug!("{:?}", data);
|
|
|
|
let mut auth_client = auth_client.lock().await;
|
|
match auth_client.validate_session(&data.token.0).await {
|
|
Ok(response) => {
|
|
debug!("Response: {:?}", response);
|
|
if response.valid == false {
|
|
info!("Login failed: Invalid credentials");
|
|
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::UnknownAccount,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
} else {
|
|
debug!("Successfully logged in");
|
|
|
|
let mut local_id = 0;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
state.user_id = Some(response.user_id);
|
|
state.session_id = Some(response.session_id.clone());
|
|
local_id = connection_service.next_id();
|
|
state.local_id = local_id;
|
|
}
|
|
|
|
let chat_url = format!(
|
|
"http://{}",
|
|
get_kube_service_endpoints_by_dns("chat-service", "tcp", "chat-service")
|
|
.await
|
|
.expect("Failed to get chat service endpoints")
|
|
.get(0)
|
|
.unwrap()
|
|
);
|
|
|
|
let handler = ChatClientHandler::new(chat_url, local_id.to_string(), response.session_id.clone()).await?;
|
|
let chat_handler = Arc::new(handler);
|
|
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
create_chat_client_handler(writer_clone, chat_handler.clone()).await?;
|
|
state.chat_handler = Some(chat_handler);
|
|
}
|
|
|
|
let mut id = 0;
|
|
let mut server_info: Vec<ServerInfo> = Vec::new();
|
|
match get_service_info("default", "character-service").await {
|
|
Ok(service_info) => {
|
|
if let Some(annotations) = service_info.annotations {
|
|
let mut server_name = "".to_string();
|
|
let mut is_test = false;
|
|
for (key, value) in annotations {
|
|
match key.as_str() {
|
|
"name" => {
|
|
server_name = value;
|
|
}
|
|
"tags" => {}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
if is_test {
|
|
server_name = format!("@{}", server_name);
|
|
} else {
|
|
server_name = format!(" {}", server_name);
|
|
}
|
|
|
|
server_info.push(ServerInfo {
|
|
test: u8::from(is_test),
|
|
name: NullTerminatedString::new(&server_name),
|
|
id,
|
|
});
|
|
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::Ok,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: server_info,
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
debug!("Error getting service info: {}", err);
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::Failed,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Err(status) => {
|
|
if let Some(tonic_status) = status.downcast_ref::<Status>() {
|
|
match tonic_status.code() {
|
|
Code::Unauthenticated => {
|
|
info!("Login failed: Invalid credentials");
|
|
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::UnknownAccount,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
}
|
|
Code::Unavailable => {
|
|
warn!("Login failed: Service is unavailable");
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::Failed,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
}
|
|
_ => {
|
|
error!("Unexpected error: {}", tonic_status.message());
|
|
let data = SrvLoginReply {
|
|
result: srv_login_reply::Result::Failed,
|
|
right: 0,
|
|
type_: 0,
|
|
servers_info: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcLoginReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn handle_server_select_req(
|
|
packet: Packet,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::cli_srv_select_req::CliSrvSelectReq;
|
|
use crate::packets::srv_srv_select_reply::SrvSrvSelectReply;
|
|
let request = CliSrvSelectReq::decode(packet.payload.as_slice())?;
|
|
debug!("{:?}", request);
|
|
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
state
|
|
.additional_data
|
|
.insert("server".to_string(), request.server_id.to_string());
|
|
state
|
|
.additional_data
|
|
.insert("channel".to_string(), request.channel_id.to_string());
|
|
}
|
|
|
|
let data = SrvSrvSelectReply {
|
|
result: srv_srv_select_reply::Result::Ok,
|
|
session_id: 0, // Client should already have this value
|
|
crypt_val: 0, // This is only for the old encryption
|
|
ip: NullTerminatedString::new(""), // If this is empty, the client should stay connected (requires client change)
|
|
port: 0, // See comment about ip above
|
|
};
|
|
|
|
let response_packet = Packet::new(PacketType::PaklcSrvSelectReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn handle_channel_list_req(
|
|
packet: Packet,
|
|
connection_service: Arc<ConnectionService>,
|
|
connection_id: String,
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
use crate::packets::cli_channel_list_req::CliChannelListReq;
|
|
use crate::packets::srv_channel_list_reply::{ChannelInfo, SrvChannelListReply};
|
|
let request = CliChannelListReq::decode(packet.payload.as_slice());
|
|
debug!("{:?}", request);
|
|
|
|
let mut id = 1;
|
|
let mut channel_info: Vec<ChannelInfo> = Vec::new();
|
|
match get_service_info("default", "world-service").await {
|
|
Ok(service_info) => {
|
|
if let Some(annotations) = service_info.annotations {
|
|
let mut server_name = "".to_string();
|
|
for (key, value) in annotations {
|
|
match key.as_str() {
|
|
"name" => {
|
|
server_name = value;
|
|
}
|
|
"tags" => {}
|
|
_ => {}
|
|
}
|
|
}
|
|
channel_info.push(ChannelInfo {
|
|
id: id,
|
|
low_age: 0,
|
|
high_age: 0,
|
|
capacity: 0,
|
|
name: NullTerminatedString::new(&server_name),
|
|
});
|
|
id = id + 1;
|
|
|
|
let data = SrvChannelListReply {
|
|
id: request?.server_id,
|
|
channels: channel_info,
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
let data = SrvChannelListReply {
|
|
id: request?.server_id,
|
|
channels: Vec::new(),
|
|
};
|
|
let response_packet = Packet::new(PacketType::PaklcChannelListReply, &data)?;
|
|
if let Some(mut state) = connection_service.get_connection_mut(&connection_id) {
|
|
let writer_clone = state.writer.clone().unwrap();
|
|
let mut locked_stream = writer_clone.lock().await;
|
|
send_packet(&mut locked_stream, &response_packet).await?;
|
|
}
|
|
return Ok(());
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|