From f92e9f6f07b1e3834c2ca58ce3510734819d08e4 Mon Sep 17 00:00:00 2001 From: Arne Dußin Date: Wed, 27 Jan 2021 14:01:50 +0100 Subject: Rework graf karto to fit the client/server structure --- src/net/server/connection.rs | 84 +++++++++++++++++++++++++++ src/net/server/connection_manager.rs | 109 +++++++++++++++++++++++++++++++++++ src/net/server/mod.rs | 6 ++ 3 files changed, 199 insertions(+) create mode 100644 src/net/server/connection.rs create mode 100644 src/net/server/connection_manager.rs create mode 100644 src/net/server/mod.rs (limited to 'src/net/server') diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs new file mode 100644 index 0000000..801eb4b --- /dev/null +++ b/src/net/server/connection.rs @@ -0,0 +1,84 @@ +//! A TCP-connection from a client to the server. + +use super::super::packet::{Packet, PacketRwError}; +use super::connection_manager::ConnId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::net::TcpStream; +use std::sync::mpsc::Sender; +use std::thread::{self, JoinHandle}; + +/// Holds a stream to the client and manages receiving packets from said client +/// in an extra thread. +pub struct Connection { + stream: TcpStream, + rcv_thread_handle: JoinHandle<()>, +} + +impl Connection { + /// Start up the receiving thread after a client connection has been detected. + /// Will create a ghost disconnect packet in case the client disconnects. + pub(super) fn start_rcv
( + conn_id: ConnId, + stream: TcpStream, + packet_tx: Sender<(ConnId, Packet
)>, + ) -> Self + where + P: 'static + Debug + Send + DeserializeOwned + Serialize, + { + let mut stream_cl = stream + .try_clone() + .expect("Unable to clone TcpStream handle"); + let rcv_thread_handle = thread::spawn(move || { + let mut running = true; + while running { + // Read the newest packet from the stream. + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(packet) => dbg!(packet), + Err(PacketRwError::Closed) => { + // Stop the thread after this packet. + running = false; + + /* Generate an internal disconnection packet, so the connection + * manager can call cleanup code if necessary. + */ + Packet::Disconnect + } + Err(err) => { + error!( + "Receiving packet failed. Connection `{}`. {:?}", + conn_id, err + ); + + // Ignore the received data. + continue; + } + }; + + /* Try sending the packet to the Connection manager. If it has already + * stopped and hung up on the channel, stop this receive thread as well. + */ + if packet_tx.send((conn_id, packet)).is_err() { + info!("Shutting down connection `{}`", conn_id); + running = false; + } + } + + info!("Packet receive thread has stopped running."); + }); + + Self { + stream, + rcv_thread_handle, + } + } + + /// Send a packet to the client via TCP. + pub(super) fn send
(&mut self, packet: &Packet
) -> Result<(), PacketRwError>
+ where
+ P: 'static + Debug + Send + DeserializeOwned + Serialize,
+ {
+ packet.write_to_stream(&mut self.stream)
+ }
+}
diff --git a/src/net/server/connection_manager.rs b/src/net/server/connection_manager.rs
new file mode 100644
index 0000000..c47aa32
--- /dev/null
+++ b/src/net/server/connection_manager.rs
@@ -0,0 +1,109 @@
+//! The main server module, managing connections from clients.
+
+use super::super::packet::Packet;
+use super::connection::Connection;
+use crate::stable_vec::StableVec;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::io;
+use std::net::{SocketAddr, TcpListener};
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, RwLock};
+use std::thread;
+
+/// Type of id for the connections inside of the connection manager.
+pub type ConnId = usize;
+
+/// Manages incoming connections to the servers and packets received from them.
+pub struct ConnectionManager )>,
+ rx: Receiver<(ConnId, Packet )>,
+}
+
+impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager {
+ fn listen(
+ listener: TcpListener,
+ connections: Arc )>,
+ ) {
+ for stream in listener.incoming() {
+ info!("Incoming connection.");
+ let stream = match stream {
+ Ok(stream) => stream,
+ Err(err) => {
+ error!("Unable to accept client. {}", err);
+ continue;
+ }
+ };
+
+ let mut connections = connections.write().unwrap();
+ let id = connections.next_free();
+ connections
+ .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone()))
+ .expect("Unable to insert client at supposedly valid id");
+ info!("Client `{}` connected.", id);
+ }
+
+ error!("Closing listener. This should never happen");
+ }
+
+ /// Start listening for connections. Returns the manager for connections,
+ /// which then can be asked about the connectins status and to send packets to
+ /// any client connected.
+ pub fn start(addr: SocketAddr) -> Result