diff options
| author | Arne Dußin | 2021-01-27 14:01:50 +0100 |
|---|---|---|
| committer | Arne Dußin | 2021-02-02 22:16:15 +0100 |
| commit | f92e9f6f07b1e3834c2ca58ce3510734819d08e4 (patch) | |
| tree | 20e3d3afce342a56ae98f6c20491482ccd2b5c6b /src/net | |
| parent | c60a6d07efb120724b308e29e8e70f27c87c952d (diff) | |
| download | graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.tar.gz graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.zip | |
Rework graf karto to fit the client/server structure
Diffstat (limited to 'src/net')
| -rw-r--r-- | src/net/cargo.rs | 33 | ||||
| -rw-r--r-- | src/net/client/connection.rs | 119 | ||||
| -rw-r--r-- | src/net/client/mod.rs | 4 | ||||
| -rw-r--r-- | src/net/mod.rs | 11 | ||||
| -rw-r--r-- | src/net/packet.rs | 73 | ||||
| -rw-r--r-- | src/net/server/connection.rs | 84 | ||||
| -rw-r--r-- | src/net/server/connection_manager.rs | 109 | ||||
| -rw-r--r-- | src/net/server/mod.rs | 6 |
8 files changed, 439 insertions, 0 deletions
diff --git a/src/net/cargo.rs b/src/net/cargo.rs new file mode 100644 index 0000000..b05944c --- /dev/null +++ b/src/net/cargo.rs @@ -0,0 +1,33 @@ +//! The cargo is information actually concerning the inner workings of graf karto, +//! as opposed to the inner workings of the network library. + +use crate::world::{Icon, Room, Wall, World}; +use nalgebra::Matrix3; +use serde::{Deserialize, Serialize}; + +/// Packets sent oven the network will carry this cargo to inform on what the client needs or the +/// server wants. +#[derive(Debug, Deserialize, Serialize)] +pub enum Cargo { + /// Client -> Server: Request to add an icon to the map + AddIcon(Icon), + /// Client -> Server: Request to add a room to the map + AddRoom(Room), + /// Client -> Server: Request to add a wall to the map + AddWall(Wall), + /// Client <-> Server: Update the info of the icon with the given id. + UpdateIcon((usize, Icon)), + /// Client <-> Server: Update the info of the room with the given id. + UpdateRoom((usize, Room)), + /// Client <-> Server: Update the info of the wall with the given id. + UpdateWall((usize, Wall)), + /// Client -> Server: Request to apply the given matrix to the item with the provided id. + /// If the matrix cannot be applied to an item with the given id, it will do nothing. + ApplyMatrix((usize, Matrix3<f64>)), + /// Client <-> Remove the item with the given id. + Remove(usize), + /// Server -> Client: Add all of the data additively to the map + AddMapData(World), + /// Server -> Client: Clear the current map data of the client and replace it with this. + UpdateMapData(World), +} diff --git a/src/net/client/connection.rs b/src/net/client/connection.rs new file mode 100644 index 0000000..2941a0a --- /dev/null +++ b/src/net/client/connection.rs @@ -0,0 +1,119 @@ +//! A connection to a server using the TCP-Protocol. + +use super::super::{Packet, PacketRwError}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::io; +use std::net::TcpStream; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use std::{cell::RefCell, net::Shutdown}; + +/// Represents a connection to the server. +pub struct Connection<P: 'static + Debug + Send + DeserializeOwned + Serialize> { + stream: RefCell<TcpStream>, + packet_rx: Receiver<Packet<P>>, + running: Arc<AtomicBool>, + rcv_thread_handle: Option<JoinHandle<()>>, +} + +impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Connection<P> { + /// Create a new connection based on the given TCP-Stream. Will start up an + /// extra thread on which it will receive packets, while the thread with the + /// connection can send packets and query for packets that may have been + /// received. + pub fn new(stream: TcpStream) -> Self { + let running = Arc::new(AtomicBool::new(true)); + + let (tx, rx) = mpsc::channel(); + let mut stream_cl = stream + .try_clone() + .expect("Unable to create tcp stream for receiving packets."); + let running_cl = running.clone(); + let rcv_thread_handle = thread::spawn(move || { + // Set the read timeout, so that this thread can be closed gracefully. + stream_cl + .set_read_timeout(Some(Duration::from_millis(500))) + .expect("Unable to set socket read timeout."); + + while running_cl.load(Ordering::Relaxed) { + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(p) => p, + Err(PacketRwError::Closed) => Packet::Disconnect, + Err(PacketRwError::IOError(err)) => { + if err.kind() == io::ErrorKind::WouldBlock { + // This error is thrown when the remote reaches the + // timeout duration and can thusly be ignored. + continue; + } + + error!("Unable to read packet: {:?}", err); + continue; + } + Err(err) => { + error!("Unable to read packet. {:?}", err); + continue; + } + }; + + // Send the packet through the mspc channel. + tx.send(packet).unwrap(); + } + }); + + Self { + stream: RefCell::new(stream), + packet_rx: rx, + running, + rcv_thread_handle: Some(rcv_thread_handle), + } + } + + /// Get the next packet in case one has been received. If no packet has been + /// received since the latest packet has been checked last time, this will + /// return none. If the client has been disconnected, this will also return + /// `None`. + pub fn next_packet(&self) -> Option<P> { + match self.packet_rx.try_recv() { + Ok(Packet::Disconnect) => { + self.running.store(false, Ordering::Relaxed); + info!("Server disconnected client."); + None + } + Ok(Packet::Cargo(packet)) => Some(packet), + Err(err) => { + warn!("unable to receive packet: {}", err); + None + } + } + } + + /// Send a packet to the server. + pub fn send(&self, packet: P) -> Result<(), PacketRwError> { + Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut()) + } + + /// Stop the client. No further packets will be received or be sent. + pub fn stop(&mut self) -> Result<(), io::Error> { + self.stream.borrow_mut().shutdown(Shutdown::Both)?; + self.running.store(false, Ordering::Relaxed); + if let Some(handle) = self.rcv_thread_handle.take() { + handle + .join() + .expect("Packet receive thread closed unexpectedly."); + } + + Ok(()) + } +} + +impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Drop for Connection<P> { + fn drop(&mut self) { + self.stop().expect("Failed to stop the client properly"); + } +} diff --git a/src/net/client/mod.rs b/src/net/client/mod.rs new file mode 100644 index 0000000..d5db3e8 --- /dev/null +++ b/src/net/client/mod.rs @@ -0,0 +1,4 @@ +//! Networking items used solely by the client. + +pub mod connection; +pub use connection::*; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000..eb68d1d --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,11 @@ +//! Network module containing a simple client/server library. + +pub mod cargo; +pub mod client; +pub(self) mod packet; +pub mod server; + +pub use cargo::*; +pub use client::*; +pub(self) use packet::*; +pub use server::*; diff --git a/src/net/packet.rs b/src/net/packet.rs new file mode 100644 index 0000000..2d97504 --- /dev/null +++ b/src/net/packet.rs @@ -0,0 +1,73 @@ +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::io; +use std::mem; + +#[derive(Debug, Deserialize, Serialize)] +pub(super) enum Packet<P: 'static + Send + Debug> { + Cargo(P), + Disconnect, +} + +#[derive(Debug, thiserror::Error)] +pub enum PacketRwError { + #[error("packet could not be properly deserialised: {0}")] + DeserialiseError(bincode::Error), + #[error("packet could not be properly serialised: {0}")] + SerialiseError(bincode::Error), + #[error("unable to read packet from stream: {0}")] + IOError(io::Error), + #[error("connection was closed from the remote end")] + Closed, +} + +impl<P: 'static + Send + Debug + DeserializeOwned + Serialize> Packet<P> { + pub fn write_to_stream(&self, stream: &mut impl io::Write) -> Result<(), PacketRwError> { + let data: Vec<u8> = + bincode::serialize(&self).map_err(|err| PacketRwError::SerialiseError(err))?; + + // Write head with packet length + assert!(data.len() as u64 <= u32::MAX as u64); + let len = data.len() as u32; + let len = bincode::serialize(&len).map_err(|err| PacketRwError::SerialiseError(err))?; + stream + .write_all(&len) + .map_err(|err| PacketRwError::IOError(err))?; + + // Write the data of the packet and pray all errors are caught. + Ok(stream + .write_all(&data) + .map_err(|err| PacketRwError::IOError(err))?) + } + + pub fn read_from_stream(stream: &mut impl io::Read) -> Result<Self, PacketRwError> { + // Read packet head which informs us of the length. + let mut len = vec![0; mem::size_of::<u32>()]; + stream.read_exact(&mut len).map_err(|err| { + if err.kind() == io::ErrorKind::UnexpectedEof { + PacketRwError::Closed + } else { + PacketRwError::IOError(err) + } + })?; + let len: u32 = bincode::deserialize(&len) + .expect("Unable to deserialise length of packet. Stream is corrupted."); + + // Read all data from the packet according to the length. + let mut data = vec![0; len as usize]; + match stream.read_exact(&mut data) { + Ok(()) => { + let res: Result<Self, bincode::Error> = bincode::deserialize(&data); + Ok(res.map_err(|err| PacketRwError::DeserialiseError(err))?) + } + Err(err) => { + if err.kind() == io::ErrorKind::UnexpectedEof { + Err(PacketRwError::Closed) + } else { + Err(PacketRwError::IOError(err)) + } + } + } + } +} diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs new file mode 100644 index 0000000..801eb4b --- /dev/null +++ b/src/net/server/connection.rs @@ -0,0 +1,84 @@ +//! A TCP-connection from a client to the server. + +use super::super::packet::{Packet, PacketRwError}; +use super::connection_manager::ConnId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::net::TcpStream; +use std::sync::mpsc::Sender; +use std::thread::{self, JoinHandle}; + +/// Holds a stream to the client and manages receiving packets from said client +/// in an extra thread. +pub struct Connection { + stream: TcpStream, + rcv_thread_handle: JoinHandle<()>, +} + +impl Connection { + /// Start up the receiving thread after a client connection has been detected. + /// Will create a ghost disconnect packet in case the client disconnects. + pub(super) fn start_rcv<P>( + conn_id: ConnId, + stream: TcpStream, + packet_tx: Sender<(ConnId, Packet<P>)>, + ) -> Self + where + P: 'static + Debug + Send + DeserializeOwned + Serialize, + { + let mut stream_cl = stream + .try_clone() + .expect("Unable to clone TcpStream handle"); + let rcv_thread_handle = thread::spawn(move || { + let mut running = true; + while running { + // Read the newest packet from the stream. + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(packet) => dbg!(packet), + Err(PacketRwError::Closed) => { + // Stop the thread after this packet. + running = false; + + /* Generate an internal disconnection packet, so the connection + * manager can call cleanup code if necessary. + */ + Packet::Disconnect + } + Err(err) => { + error!( + "Receiving packet failed. Connection `{}`. {:?}", + conn_id, err + ); + + // Ignore the received data. + continue; + } + }; + + /* Try sending the packet to the Connection manager. If it has already + * stopped and hung up on the channel, stop this receive thread as well. + */ + if packet_tx.send((conn_id, packet)).is_err() { + info!("Shutting down connection `{}`", conn_id); + running = false; + } + } + + info!("Packet receive thread has stopped running."); + }); + + Self { + stream, + rcv_thread_handle, + } + } + + /// Send a packet to the client via TCP. + pub(super) fn send<P>(&mut self, packet: &Packet<P>) -> Result<(), PacketRwError> + where + P: 'static + Debug + Send + DeserializeOwned + Serialize, + { + packet.write_to_stream(&mut self.stream) + } +} diff --git a/src/net/server/connection_manager.rs b/src/net/server/connection_manager.rs new file mode 100644 index 0000000..c47aa32 --- /dev/null +++ b/src/net/server/connection_manager.rs @@ -0,0 +1,109 @@ +//! The main server module, managing connections from clients. + +use super::super::packet::Packet; +use super::connection::Connection; +use crate::stable_vec::StableVec; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::io; +use std::net::{SocketAddr, TcpListener}; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread; + +/// Type of id for the connections inside of the connection manager. +pub type ConnId = usize; + +/// Manages incoming connections to the servers and packets received from them. +pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> { + connections: Arc<RwLock<StableVec<Connection>>>, + local_port: u16, + _tx: Sender<(ConnId, Packet<P>)>, + rx: Receiver<(ConnId, Packet<P>)>, +} + +impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> { + fn listen( + listener: TcpListener, + connections: Arc<RwLock<StableVec<Connection>>>, + packet_tx: Sender<(ConnId, Packet<P>)>, + ) { + for stream in listener.incoming() { + info!("Incoming connection."); + let stream = match stream { + Ok(stream) => stream, + Err(err) => { + error!("Unable to accept client. {}", err); + continue; + } + }; + + let mut connections = connections.write().unwrap(); + let id = connections.next_free(); + connections + .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone())) + .expect("Unable to insert client at supposedly valid id"); + info!("Client `{}` connected.", id); + } + + error!("Closing listener. This should never happen"); + } + + /// Start listening for connections. Returns the manager for connections, + /// which then can be asked about the connectins status and to send packets to + /// any client connected. + pub fn start(addr: SocketAddr) -> Result<Self, io::Error> { + let listener = TcpListener::bind(addr)?; + let local_port = listener.local_addr()?.port(); + let connections = Arc::new(RwLock::new(StableVec::new())); + + let (tx, rx) = mpsc::channel(); + let tx_cl = tx.clone(); + let connections_cl = connections.clone(); + thread::spawn(move || Self::listen(listener, connections_cl, tx_cl)); + + Ok(Self { + connections, + local_port, + _tx: tx, + rx, + }) + } + + /// Try to receive the next packet. If no packet was received, this returns `None`. If the client + /// was disconnected, this also returns `None`. + pub fn next_packet(&self) -> Option<(ConnId, P)> { + match self.rx.try_recv() { + Ok((conn_id, Packet::Disconnect)) => { + self.connections.write().unwrap().remove(conn_id); + self.next_packet() + } + Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)), + Err(_err) => None, + } + } + + /// Send a packet to all clients currently connected + pub fn broadcast(&self, packet: P) -> bool { + let mut one_failed = false; + let mut conns = self.connections.write().unwrap(); + let packet = Packet::Cargo(packet); + for (ref id, ref mut conn) in conns.id_iter_mut() { + if let Err(err) = conn.send(&packet) { + error!( + "Broadcasting {:?} failed for client `{}`: {:?}", + packet, id, err + ); + one_failed = true; + } + } + + !one_failed + } + + /// Get the local port this connection manager's listener is bound to. + pub fn port(&self) -> u16 { + self.local_port + } +} diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs new file mode 100644 index 0000000..1f52ad5 --- /dev/null +++ b/src/net/server/mod.rs @@ -0,0 +1,6 @@ +//! Module containing network library server parts. + +pub mod connection; +pub mod connection_manager; + +pub use connection_manager::*; |
