aboutsummaryrefslogtreecommitdiff
path: root/src/net
diff options
context:
space:
mode:
authorArne Dußin2021-01-27 14:01:50 +0100
committerArne Dußin2021-02-02 22:16:15 +0100
commitf92e9f6f07b1e3834c2ca58ce3510734819d08e4 (patch)
tree20e3d3afce342a56ae98f6c20491482ccd2b5c6b /src/net
parentc60a6d07efb120724b308e29e8e70f27c87c952d (diff)
downloadgraf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.tar.gz
graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.zip
Rework graf karto to fit the client/server structure
Diffstat (limited to 'src/net')
-rw-r--r--src/net/cargo.rs33
-rw-r--r--src/net/client/connection.rs119
-rw-r--r--src/net/client/mod.rs4
-rw-r--r--src/net/mod.rs11
-rw-r--r--src/net/packet.rs73
-rw-r--r--src/net/server/connection.rs84
-rw-r--r--src/net/server/connection_manager.rs109
-rw-r--r--src/net/server/mod.rs6
8 files changed, 439 insertions, 0 deletions
diff --git a/src/net/cargo.rs b/src/net/cargo.rs
new file mode 100644
index 0000000..b05944c
--- /dev/null
+++ b/src/net/cargo.rs
@@ -0,0 +1,33 @@
+//! The cargo is information actually concerning the inner workings of graf karto,
+//! as opposed to the inner workings of the network library.
+
+use crate::world::{Icon, Room, Wall, World};
+use nalgebra::Matrix3;
+use serde::{Deserialize, Serialize};
+
+/// Packets sent oven the network will carry this cargo to inform on what the client needs or the
+/// server wants.
+#[derive(Debug, Deserialize, Serialize)]
+pub enum Cargo {
+ /// Client -> Server: Request to add an icon to the map
+ AddIcon(Icon),
+ /// Client -> Server: Request to add a room to the map
+ AddRoom(Room),
+ /// Client -> Server: Request to add a wall to the map
+ AddWall(Wall),
+ /// Client <-> Server: Update the info of the icon with the given id.
+ UpdateIcon((usize, Icon)),
+ /// Client <-> Server: Update the info of the room with the given id.
+ UpdateRoom((usize, Room)),
+ /// Client <-> Server: Update the info of the wall with the given id.
+ UpdateWall((usize, Wall)),
+ /// Client -> Server: Request to apply the given matrix to the item with the provided id.
+ /// If the matrix cannot be applied to an item with the given id, it will do nothing.
+ ApplyMatrix((usize, Matrix3<f64>)),
+ /// Client <-> Remove the item with the given id.
+ Remove(usize),
+ /// Server -> Client: Add all of the data additively to the map
+ AddMapData(World),
+ /// Server -> Client: Clear the current map data of the client and replace it with this.
+ UpdateMapData(World),
+}
diff --git a/src/net/client/connection.rs b/src/net/client/connection.rs
new file mode 100644
index 0000000..2941a0a
--- /dev/null
+++ b/src/net/client/connection.rs
@@ -0,0 +1,119 @@
+//! A connection to a server using the TCP-Protocol.
+
+use super::super::{Packet, PacketRwError};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::io;
+use std::net::TcpStream;
+use std::ops::DerefMut;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc::{self, Receiver};
+use std::sync::Arc;
+use std::thread::{self, JoinHandle};
+use std::time::Duration;
+use std::{cell::RefCell, net::Shutdown};
+
+/// Represents a connection to the server.
+pub struct Connection<P: 'static + Debug + Send + DeserializeOwned + Serialize> {
+ stream: RefCell<TcpStream>,
+ packet_rx: Receiver<Packet<P>>,
+ running: Arc<AtomicBool>,
+ rcv_thread_handle: Option<JoinHandle<()>>,
+}
+
+impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Connection<P> {
+ /// Create a new connection based on the given TCP-Stream. Will start up an
+ /// extra thread on which it will receive packets, while the thread with the
+ /// connection can send packets and query for packets that may have been
+ /// received.
+ pub fn new(stream: TcpStream) -> Self {
+ let running = Arc::new(AtomicBool::new(true));
+
+ let (tx, rx) = mpsc::channel();
+ let mut stream_cl = stream
+ .try_clone()
+ .expect("Unable to create tcp stream for receiving packets.");
+ let running_cl = running.clone();
+ let rcv_thread_handle = thread::spawn(move || {
+ // Set the read timeout, so that this thread can be closed gracefully.
+ stream_cl
+ .set_read_timeout(Some(Duration::from_millis(500)))
+ .expect("Unable to set socket read timeout.");
+
+ while running_cl.load(Ordering::Relaxed) {
+ let packet = match Packet::read_from_stream(&mut stream_cl) {
+ Ok(p) => p,
+ Err(PacketRwError::Closed) => Packet::Disconnect,
+ Err(PacketRwError::IOError(err)) => {
+ if err.kind() == io::ErrorKind::WouldBlock {
+ // This error is thrown when the remote reaches the
+ // timeout duration and can thusly be ignored.
+ continue;
+ }
+
+ error!("Unable to read packet: {:?}", err);
+ continue;
+ }
+ Err(err) => {
+ error!("Unable to read packet. {:?}", err);
+ continue;
+ }
+ };
+
+ // Send the packet through the mspc channel.
+ tx.send(packet).unwrap();
+ }
+ });
+
+ Self {
+ stream: RefCell::new(stream),
+ packet_rx: rx,
+ running,
+ rcv_thread_handle: Some(rcv_thread_handle),
+ }
+ }
+
+ /// Get the next packet in case one has been received. If no packet has been
+ /// received since the latest packet has been checked last time, this will
+ /// return none. If the client has been disconnected, this will also return
+ /// `None`.
+ pub fn next_packet(&self) -> Option<P> {
+ match self.packet_rx.try_recv() {
+ Ok(Packet::Disconnect) => {
+ self.running.store(false, Ordering::Relaxed);
+ info!("Server disconnected client.");
+ None
+ }
+ Ok(Packet::Cargo(packet)) => Some(packet),
+ Err(err) => {
+ warn!("unable to receive packet: {}", err);
+ None
+ }
+ }
+ }
+
+ /// Send a packet to the server.
+ pub fn send(&self, packet: P) -> Result<(), PacketRwError> {
+ Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut())
+ }
+
+ /// Stop the client. No further packets will be received or be sent.
+ pub fn stop(&mut self) -> Result<(), io::Error> {
+ self.stream.borrow_mut().shutdown(Shutdown::Both)?;
+ self.running.store(false, Ordering::Relaxed);
+ if let Some(handle) = self.rcv_thread_handle.take() {
+ handle
+ .join()
+ .expect("Packet receive thread closed unexpectedly.");
+ }
+
+ Ok(())
+ }
+}
+
+impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Drop for Connection<P> {
+ fn drop(&mut self) {
+ self.stop().expect("Failed to stop the client properly");
+ }
+}
diff --git a/src/net/client/mod.rs b/src/net/client/mod.rs
new file mode 100644
index 0000000..d5db3e8
--- /dev/null
+++ b/src/net/client/mod.rs
@@ -0,0 +1,4 @@
+//! Networking items used solely by the client.
+
+pub mod connection;
+pub use connection::*;
diff --git a/src/net/mod.rs b/src/net/mod.rs
new file mode 100644
index 0000000..eb68d1d
--- /dev/null
+++ b/src/net/mod.rs
@@ -0,0 +1,11 @@
+//! Network module containing a simple client/server library.
+
+pub mod cargo;
+pub mod client;
+pub(self) mod packet;
+pub mod server;
+
+pub use cargo::*;
+pub use client::*;
+pub(self) use packet::*;
+pub use server::*;
diff --git a/src/net/packet.rs b/src/net/packet.rs
new file mode 100644
index 0000000..2d97504
--- /dev/null
+++ b/src/net/packet.rs
@@ -0,0 +1,73 @@
+use serde::de::DeserializeOwned;
+use serde::{Deserialize, Serialize};
+use std::fmt::Debug;
+use std::io;
+use std::mem;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub(super) enum Packet<P: 'static + Send + Debug> {
+ Cargo(P),
+ Disconnect,
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum PacketRwError {
+ #[error("packet could not be properly deserialised: {0}")]
+ DeserialiseError(bincode::Error),
+ #[error("packet could not be properly serialised: {0}")]
+ SerialiseError(bincode::Error),
+ #[error("unable to read packet from stream: {0}")]
+ IOError(io::Error),
+ #[error("connection was closed from the remote end")]
+ Closed,
+}
+
+impl<P: 'static + Send + Debug + DeserializeOwned + Serialize> Packet<P> {
+ pub fn write_to_stream(&self, stream: &mut impl io::Write) -> Result<(), PacketRwError> {
+ let data: Vec<u8> =
+ bincode::serialize(&self).map_err(|err| PacketRwError::SerialiseError(err))?;
+
+ // Write head with packet length
+ assert!(data.len() as u64 <= u32::MAX as u64);
+ let len = data.len() as u32;
+ let len = bincode::serialize(&len).map_err(|err| PacketRwError::SerialiseError(err))?;
+ stream
+ .write_all(&len)
+ .map_err(|err| PacketRwError::IOError(err))?;
+
+ // Write the data of the packet and pray all errors are caught.
+ Ok(stream
+ .write_all(&data)
+ .map_err(|err| PacketRwError::IOError(err))?)
+ }
+
+ pub fn read_from_stream(stream: &mut impl io::Read) -> Result<Self, PacketRwError> {
+ // Read packet head which informs us of the length.
+ let mut len = vec![0; mem::size_of::<u32>()];
+ stream.read_exact(&mut len).map_err(|err| {
+ if err.kind() == io::ErrorKind::UnexpectedEof {
+ PacketRwError::Closed
+ } else {
+ PacketRwError::IOError(err)
+ }
+ })?;
+ let len: u32 = bincode::deserialize(&len)
+ .expect("Unable to deserialise length of packet. Stream is corrupted.");
+
+ // Read all data from the packet according to the length.
+ let mut data = vec![0; len as usize];
+ match stream.read_exact(&mut data) {
+ Ok(()) => {
+ let res: Result<Self, bincode::Error> = bincode::deserialize(&data);
+ Ok(res.map_err(|err| PacketRwError::DeserialiseError(err))?)
+ }
+ Err(err) => {
+ if err.kind() == io::ErrorKind::UnexpectedEof {
+ Err(PacketRwError::Closed)
+ } else {
+ Err(PacketRwError::IOError(err))
+ }
+ }
+ }
+ }
+}
diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs
new file mode 100644
index 0000000..801eb4b
--- /dev/null
+++ b/src/net/server/connection.rs
@@ -0,0 +1,84 @@
+//! A TCP-connection from a client to the server.
+
+use super::super::packet::{Packet, PacketRwError};
+use super::connection_manager::ConnId;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::net::TcpStream;
+use std::sync::mpsc::Sender;
+use std::thread::{self, JoinHandle};
+
+/// Holds a stream to the client and manages receiving packets from said client
+/// in an extra thread.
+pub struct Connection {
+ stream: TcpStream,
+ rcv_thread_handle: JoinHandle<()>,
+}
+
+impl Connection {
+ /// Start up the receiving thread after a client connection has been detected.
+ /// Will create a ghost disconnect packet in case the client disconnects.
+ pub(super) fn start_rcv<P>(
+ conn_id: ConnId,
+ stream: TcpStream,
+ packet_tx: Sender<(ConnId, Packet<P>)>,
+ ) -> Self
+ where
+ P: 'static + Debug + Send + DeserializeOwned + Serialize,
+ {
+ let mut stream_cl = stream
+ .try_clone()
+ .expect("Unable to clone TcpStream handle");
+ let rcv_thread_handle = thread::spawn(move || {
+ let mut running = true;
+ while running {
+ // Read the newest packet from the stream.
+ let packet = match Packet::read_from_stream(&mut stream_cl) {
+ Ok(packet) => dbg!(packet),
+ Err(PacketRwError::Closed) => {
+ // Stop the thread after this packet.
+ running = false;
+
+ /* Generate an internal disconnection packet, so the connection
+ * manager can call cleanup code if necessary.
+ */
+ Packet::Disconnect
+ }
+ Err(err) => {
+ error!(
+ "Receiving packet failed. Connection `{}`. {:?}",
+ conn_id, err
+ );
+
+ // Ignore the received data.
+ continue;
+ }
+ };
+
+ /* Try sending the packet to the Connection manager. If it has already
+ * stopped and hung up on the channel, stop this receive thread as well.
+ */
+ if packet_tx.send((conn_id, packet)).is_err() {
+ info!("Shutting down connection `{}`", conn_id);
+ running = false;
+ }
+ }
+
+ info!("Packet receive thread has stopped running.");
+ });
+
+ Self {
+ stream,
+ rcv_thread_handle,
+ }
+ }
+
+ /// Send a packet to the client via TCP.
+ pub(super) fn send<P>(&mut self, packet: &Packet<P>) -> Result<(), PacketRwError>
+ where
+ P: 'static + Debug + Send + DeserializeOwned + Serialize,
+ {
+ packet.write_to_stream(&mut self.stream)
+ }
+}
diff --git a/src/net/server/connection_manager.rs b/src/net/server/connection_manager.rs
new file mode 100644
index 0000000..c47aa32
--- /dev/null
+++ b/src/net/server/connection_manager.rs
@@ -0,0 +1,109 @@
+//! The main server module, managing connections from clients.
+
+use super::super::packet::Packet;
+use super::connection::Connection;
+use crate::stable_vec::StableVec;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::io;
+use std::net::{SocketAddr, TcpListener};
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, RwLock};
+use std::thread;
+
+/// Type of id for the connections inside of the connection manager.
+pub type ConnId = usize;
+
+/// Manages incoming connections to the servers and packets received from them.
+pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> {
+ connections: Arc<RwLock<StableVec<Connection>>>,
+ local_port: u16,
+ _tx: Sender<(ConnId, Packet<P>)>,
+ rx: Receiver<(ConnId, Packet<P>)>,
+}
+
+impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> {
+ fn listen(
+ listener: TcpListener,
+ connections: Arc<RwLock<StableVec<Connection>>>,
+ packet_tx: Sender<(ConnId, Packet<P>)>,
+ ) {
+ for stream in listener.incoming() {
+ info!("Incoming connection.");
+ let stream = match stream {
+ Ok(stream) => stream,
+ Err(err) => {
+ error!("Unable to accept client. {}", err);
+ continue;
+ }
+ };
+
+ let mut connections = connections.write().unwrap();
+ let id = connections.next_free();
+ connections
+ .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone()))
+ .expect("Unable to insert client at supposedly valid id");
+ info!("Client `{}` connected.", id);
+ }
+
+ error!("Closing listener. This should never happen");
+ }
+
+ /// Start listening for connections. Returns the manager for connections,
+ /// which then can be asked about the connectins status and to send packets to
+ /// any client connected.
+ pub fn start(addr: SocketAddr) -> Result<Self, io::Error> {
+ let listener = TcpListener::bind(addr)?;
+ let local_port = listener.local_addr()?.port();
+ let connections = Arc::new(RwLock::new(StableVec::new()));
+
+ let (tx, rx) = mpsc::channel();
+ let tx_cl = tx.clone();
+ let connections_cl = connections.clone();
+ thread::spawn(move || Self::listen(listener, connections_cl, tx_cl));
+
+ Ok(Self {
+ connections,
+ local_port,
+ _tx: tx,
+ rx,
+ })
+ }
+
+ /// Try to receive the next packet. If no packet was received, this returns `None`. If the client
+ /// was disconnected, this also returns `None`.
+ pub fn next_packet(&self) -> Option<(ConnId, P)> {
+ match self.rx.try_recv() {
+ Ok((conn_id, Packet::Disconnect)) => {
+ self.connections.write().unwrap().remove(conn_id);
+ self.next_packet()
+ }
+ Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)),
+ Err(_err) => None,
+ }
+ }
+
+ /// Send a packet to all clients currently connected
+ pub fn broadcast(&self, packet: P) -> bool {
+ let mut one_failed = false;
+ let mut conns = self.connections.write().unwrap();
+ let packet = Packet::Cargo(packet);
+ for (ref id, ref mut conn) in conns.id_iter_mut() {
+ if let Err(err) = conn.send(&packet) {
+ error!(
+ "Broadcasting {:?} failed for client `{}`: {:?}",
+ packet, id, err
+ );
+ one_failed = true;
+ }
+ }
+
+ !one_failed
+ }
+
+ /// Get the local port this connection manager's listener is bound to.
+ pub fn port(&self) -> u16 {
+ self.local_port
+ }
+}
diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs
new file mode 100644
index 0000000..1f52ad5
--- /dev/null
+++ b/src/net/server/mod.rs
@@ -0,0 +1,6 @@
+//! Module containing network library server parts.
+
+pub mod connection;
+pub mod connection_manager;
+
+pub use connection_manager::*;