diff --git a/Cargo.lock b/Cargo.lock index e2cde756..c712ac17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -482,7 +482,6 @@ dependencies = [ "ferrumc_utils", "lazy_static", "log", - "parking_lot", "serde", "thiserror", "tokio", @@ -513,7 +512,6 @@ dependencies = [ "lariv", "lazy_static", "log", - "parking_lot", "rand", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 762dd9bf..b780b016 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] # Async tokio = {version = "1.38.0", features = ["full"]} -parking_lot = "0.12.3" # Error handling anyhow = "1.0.86" diff --git a/src/crates/ferrumc_macros/src/lib.rs b/src/crates/ferrumc_macros/src/lib.rs index d0239a1f..d9720081 100644 --- a/src/crates/ferrumc_macros/src/lib.rs +++ b/src/crates/ferrumc_macros/src/lib.rs @@ -275,7 +275,7 @@ pub fn bake_packet_registry(input: TokenStream) -> TokenStream { let output = quote! { pub async fn handle_packet(packet_id: u8, conn_owned: &mut Arc>, cursor: &mut std::io::Cursor>) -> ferrumc_utils::prelude::Result<()> { let mut conn = conn_owned.write().await; - match (packet_id, conn_owned.read().await.state.as_str()) { + match (packet_id, conn.state.as_str()) { #(#match_arms)* _ => println!("No packet found for ID: 0x{:02X} in state: {}", packet_id, conn_owned.read().await.state.as_str()), } diff --git a/src/crates/ferrumc_net/Cargo.lock b/src/crates/ferrumc_net/Cargo.lock index f54e481f..d99dac6d 100644 --- a/src/crates/ferrumc_net/Cargo.lock +++ b/src/crates/ferrumc_net/Cargo.lock @@ -411,7 +411,6 @@ dependencies = [ "lariv", "lazy_static", "log", - "parking_lot", "rand", "serde", "serde_json", diff --git a/src/crates/ferrumc_net/Cargo.toml b/src/crates/ferrumc_net/Cargo.toml index 8324e7e2..648958c8 100644 --- a/src/crates/ferrumc_net/Cargo.toml +++ b/src/crates/ferrumc_net/Cargo.toml @@ -19,4 +19,3 @@ serde = { version = "1.0.117", features = ["derive"] } serde_json = "1.0.117" base64 = "0.22.1" console-subscriber = "0.3.0" -parking_lot = "0.12.3" diff --git a/src/crates/ferrumc_net/src/lib.rs b/src/crates/ferrumc_net/src/lib.rs index 6a17d766..293e6c70 100644 --- a/src/crates/ferrumc_net/src/lib.rs +++ b/src/crates/ferrumc_net/src/lib.rs @@ -15,7 +15,7 @@ use log::{debug, trace}; use rand::random; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, RwLockWriteGuard}; use crate::packets::{handle_packet}; @@ -81,6 +81,8 @@ pub struct Connection { pub state: State, // Metadata pub metadata: ConnectionMetadata, + // Whether to drop and clean up the connection + pub drop: bool, } #[derive(Debug, Default)] @@ -100,13 +102,15 @@ pub async fn handle_connection(socket: tokio::net::TcpStream) -> Result<()> { } let conn = Arc::new(RwLock::new( Connection { - id, - socket, - player_uuid: None, - state: State::Handshake, - metadata: ConnectionMetadata::default(), - })); - + id, + socket, + player_uuid: None, + state: State::Handshake, + metadata: ConnectionMetadata::default(), + drop: false, + })); + + // Add the connection to the connections list CONNECTIONS().connections.insert(id, conn); CONNECTIONS() .connection_count @@ -114,62 +118,68 @@ pub async fn handle_connection(socket: tokio::net::TcpStream) -> Result<()> { debug!("Connection established with id: {}. Current connection count: {}", id, CONNECTIONS().connection_count.load(atomic::Ordering::Relaxed)); - let mut conn_ref = CONNECTIONS().connections.view(&id, |_k, v| {v.clone()}).unwrap(); + // Get a reference to the connection + let mut conn_ref = CONNECTIONS().connections.view(&id, |_k, v| { v.clone() }).unwrap(); manage_conn(&mut conn_ref).await?; Ok(()) } - pub async fn manage_conn(conn: &mut Arc>) -> Result<()> { - debug!("Starting receiver for the same addr: {:?}", conn.read().await.socket.peer_addr()?); +pub async fn manage_conn(conn: &mut Arc>) -> Result<()> { + debug!("Starting receiver for the same addr: {:?}", conn.read().await.socket.peer_addr()?); + + loop { + // Get the length of the packet + let mut length_buffer = vec![0u8; 1]; + { + let mut conn_write = conn.write().await; + conn_write.socket.read_exact(&mut length_buffer).await?; + } - loop { - debug!("1"); - let mut length_buffer = vec![0u8; 1]; - { - let mut conn_write = conn.write().await; - conn_write.socket.read_exact(&mut length_buffer).await?; - } - debug!("2"); - let length = length_buffer[0] as usize; + let length = length_buffer[0] as usize; - let mut buffer = vec![0u8; length]; + // Get the rest of the packet + let mut buffer = vec![0u8; length]; + + { + let mut conn_write = conn.write().await; + conn_write.socket.read_exact(&mut buffer).await?; + } - { - let mut conn_write = conn.write().await; - conn_write.socket.read_exact(&mut buffer).await?; - } - debug!("3"); - let buffer = vec![length_buffer, buffer].concat(); + let buffer = vec![length_buffer, buffer].concat(); - let mut cursor = Cursor::new(buffer); + let mut cursor = Cursor::new(buffer); - let packet_length = read_varint(&mut cursor).await?; - let packet_id = read_varint(&mut cursor).await?; - debug!("4"); + // Get the packet length and id + let packet_length = read_varint(&mut cursor).await?; + let packet_id = read_varint(&mut cursor).await?; - trace!("Packet Length: {}", packet_length); - trace!("Packet ID: {}", packet_id); - debug!("5"); + trace!("Packet Length: {}", packet_length); + trace!("Packet ID: {}", packet_id); - handle_packet(packet_id.get_val() as u8, conn, &mut cursor).await?; + // Handle the packet + handle_packet(packet_id.get_val() as u8, conn, &mut cursor).await?; - debug!("6"); + // Check if we need to drop the connection + let do_drop = conn.read().await.drop; + let id = conn.read().await.id; - // TODO: Check if we need to drop the connection + // Drop the connection if needed + if do_drop { + drop_conn(id).await?; + conn.write().await.socket.shutdown().await?; + break; } - #[allow(unreachable_code)] - Ok(()) } + Ok(()) +} - #[allow(dead_code)] - async fn drop_conn(connection: &mut Connection) -> Result<()> { - trace!("Dropping connection with id: {}", connection.id); - let id = connection.id; - CONNECTIONS().connections.remove(&id); - CONNECTIONS().connection_count.fetch_sub(1, atomic::Ordering::Relaxed); - Ok(()) - } +async fn drop_conn(connection_id: u32) -> Result<()> { + debug!("Dropping connection with id: {}", connection_id); + CONNECTIONS().connections.remove(&connection_id); + CONNECTIONS().connection_count.fetch_sub(1, atomic::Ordering::Relaxed); + Ok(()) +} diff --git a/src/crates/ferrumc_net/src/packets/incoming/ping.rs b/src/crates/ferrumc_net/src/packets/incoming/ping.rs index 8216e8b6..d7e21773 100644 --- a/src/crates/ferrumc_net/src/packets/incoming/ping.rs +++ b/src/crates/ferrumc_net/src/packets/incoming/ping.rs @@ -23,6 +23,8 @@ impl IncomingPacket for Ping { }; let response = response.encode().await?; + + conn.drop = true; conn.socket.write_all(&response).await.map_err(|e| e.into()) }