diff options
| author | Arne Dußin | 2021-01-27 14:01:50 +0100 |
|---|---|---|
| committer | Arne Dußin | 2021-02-02 22:16:15 +0100 |
| commit | f92e9f6f07b1e3834c2ca58ce3510734819d08e4 (patch) | |
| tree | 20e3d3afce342a56ae98f6c20491482ccd2b5c6b /src/net/server | |
| parent | c60a6d07efb120724b308e29e8e70f27c87c952d (diff) | |
| download | graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.tar.gz graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.zip | |
Rework graf karto to fit the client/server structure
Diffstat (limited to 'src/net/server')
| -rw-r--r-- | src/net/server/connection.rs | 84 | ||||
| -rw-r--r-- | src/net/server/connection_manager.rs | 109 | ||||
| -rw-r--r-- | src/net/server/mod.rs | 6 |
3 files changed, 199 insertions, 0 deletions
diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs new file mode 100644 index 0000000..801eb4b --- /dev/null +++ b/src/net/server/connection.rs @@ -0,0 +1,84 @@ +//! A TCP-connection from a client to the server. + +use super::super::packet::{Packet, PacketRwError}; +use super::connection_manager::ConnId; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::net::TcpStream; +use std::sync::mpsc::Sender; +use std::thread::{self, JoinHandle}; + +/// Holds a stream to the client and manages receiving packets from said client +/// in an extra thread. +pub struct Connection { + stream: TcpStream, + rcv_thread_handle: JoinHandle<()>, +} + +impl Connection { + /// Start up the receiving thread after a client connection has been detected. + /// Will create a ghost disconnect packet in case the client disconnects. + pub(super) fn start_rcv<P>( + conn_id: ConnId, + stream: TcpStream, + packet_tx: Sender<(ConnId, Packet<P>)>, + ) -> Self + where + P: 'static + Debug + Send + DeserializeOwned + Serialize, + { + let mut stream_cl = stream + .try_clone() + .expect("Unable to clone TcpStream handle"); + let rcv_thread_handle = thread::spawn(move || { + let mut running = true; + while running { + // Read the newest packet from the stream. + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(packet) => dbg!(packet), + Err(PacketRwError::Closed) => { + // Stop the thread after this packet. + running = false; + + /* Generate an internal disconnection packet, so the connection + * manager can call cleanup code if necessary. + */ + Packet::Disconnect + } + Err(err) => { + error!( + "Receiving packet failed. Connection `{}`. {:?}", + conn_id, err + ); + + // Ignore the received data. + continue; + } + }; + + /* Try sending the packet to the Connection manager. If it has already + * stopped and hung up on the channel, stop this receive thread as well. + */ + if packet_tx.send((conn_id, packet)).is_err() { + info!("Shutting down connection `{}`", conn_id); + running = false; + } + } + + info!("Packet receive thread has stopped running."); + }); + + Self { + stream, + rcv_thread_handle, + } + } + + /// Send a packet to the client via TCP. + pub(super) fn send<P>(&mut self, packet: &Packet<P>) -> Result<(), PacketRwError> + where + P: 'static + Debug + Send + DeserializeOwned + Serialize, + { + packet.write_to_stream(&mut self.stream) + } +} diff --git a/src/net/server/connection_manager.rs b/src/net/server/connection_manager.rs new file mode 100644 index 0000000..c47aa32 --- /dev/null +++ b/src/net/server/connection_manager.rs @@ -0,0 +1,109 @@ +//! The main server module, managing connections from clients. + +use super::super::packet::Packet; +use super::connection::Connection; +use crate::stable_vec::StableVec; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::io; +use std::net::{SocketAddr, TcpListener}; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread; + +/// Type of id for the connections inside of the connection manager. +pub type ConnId = usize; + +/// Manages incoming connections to the servers and packets received from them. +pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> { + connections: Arc<RwLock<StableVec<Connection>>>, + local_port: u16, + _tx: Sender<(ConnId, Packet<P>)>, + rx: Receiver<(ConnId, Packet<P>)>, +} + +impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> { + fn listen( + listener: TcpListener, + connections: Arc<RwLock<StableVec<Connection>>>, + packet_tx: Sender<(ConnId, Packet<P>)>, + ) { + for stream in listener.incoming() { + info!("Incoming connection."); + let stream = match stream { + Ok(stream) => stream, + Err(err) => { + error!("Unable to accept client. {}", err); + continue; + } + }; + + let mut connections = connections.write().unwrap(); + let id = connections.next_free(); + connections + .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone())) + .expect("Unable to insert client at supposedly valid id"); + info!("Client `{}` connected.", id); + } + + error!("Closing listener. This should never happen"); + } + + /// Start listening for connections. Returns the manager for connections, + /// which then can be asked about the connectins status and to send packets to + /// any client connected. + pub fn start(addr: SocketAddr) -> Result<Self, io::Error> { + let listener = TcpListener::bind(addr)?; + let local_port = listener.local_addr()?.port(); + let connections = Arc::new(RwLock::new(StableVec::new())); + + let (tx, rx) = mpsc::channel(); + let tx_cl = tx.clone(); + let connections_cl = connections.clone(); + thread::spawn(move || Self::listen(listener, connections_cl, tx_cl)); + + Ok(Self { + connections, + local_port, + _tx: tx, + rx, + }) + } + + /// Try to receive the next packet. If no packet was received, this returns `None`. If the client + /// was disconnected, this also returns `None`. + pub fn next_packet(&self) -> Option<(ConnId, P)> { + match self.rx.try_recv() { + Ok((conn_id, Packet::Disconnect)) => { + self.connections.write().unwrap().remove(conn_id); + self.next_packet() + } + Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)), + Err(_err) => None, + } + } + + /// Send a packet to all clients currently connected + pub fn broadcast(&self, packet: P) -> bool { + let mut one_failed = false; + let mut conns = self.connections.write().unwrap(); + let packet = Packet::Cargo(packet); + for (ref id, ref mut conn) in conns.id_iter_mut() { + if let Err(err) = conn.send(&packet) { + error!( + "Broadcasting {:?} failed for client `{}`: {:?}", + packet, id, err + ); + one_failed = true; + } + } + + !one_failed + } + + /// Get the local port this connection manager's listener is bound to. + pub fn port(&self) -> u16 { + self.local_port + } +} diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs new file mode 100644 index 0000000..1f52ad5 --- /dev/null +++ b/src/net/server/mod.rs @@ -0,0 +1,6 @@ +//! Module containing network library server parts. + +pub mod connection; +pub mod connection_manager; + +pub use connection_manager::*; |
