diff options
| author | Arne Dußin | 2021-01-27 14:01:50 +0100 |
|---|---|---|
| committer | Arne Dußin | 2021-02-02 22:16:15 +0100 |
| commit | f92e9f6f07b1e3834c2ca58ce3510734819d08e4 (patch) | |
| tree | 20e3d3afce342a56ae98f6c20491482ccd2b5c6b /src/net/client | |
| parent | c60a6d07efb120724b308e29e8e70f27c87c952d (diff) | |
| download | graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.tar.gz graf_karto-f92e9f6f07b1e3834c2ca58ce3510734819d08e4.zip | |
Rework graf karto to fit the client/server structure
Diffstat (limited to 'src/net/client')
| -rw-r--r-- | src/net/client/connection.rs | 119 | ||||
| -rw-r--r-- | src/net/client/mod.rs | 4 |
2 files changed, 123 insertions, 0 deletions
diff --git a/src/net/client/connection.rs b/src/net/client/connection.rs new file mode 100644 index 0000000..2941a0a --- /dev/null +++ b/src/net/client/connection.rs @@ -0,0 +1,119 @@ +//! A connection to a server using the TCP-Protocol. + +use super::super::{Packet, PacketRwError}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::io; +use std::net::TcpStream; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use std::{cell::RefCell, net::Shutdown}; + +/// Represents a connection to the server. +pub struct Connection<P: 'static + Debug + Send + DeserializeOwned + Serialize> { + stream: RefCell<TcpStream>, + packet_rx: Receiver<Packet<P>>, + running: Arc<AtomicBool>, + rcv_thread_handle: Option<JoinHandle<()>>, +} + +impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Connection<P> { + /// Create a new connection based on the given TCP-Stream. Will start up an + /// extra thread on which it will receive packets, while the thread with the + /// connection can send packets and query for packets that may have been + /// received. + pub fn new(stream: TcpStream) -> Self { + let running = Arc::new(AtomicBool::new(true)); + + let (tx, rx) = mpsc::channel(); + let mut stream_cl = stream + .try_clone() + .expect("Unable to create tcp stream for receiving packets."); + let running_cl = running.clone(); + let rcv_thread_handle = thread::spawn(move || { + // Set the read timeout, so that this thread can be closed gracefully. + stream_cl + .set_read_timeout(Some(Duration::from_millis(500))) + .expect("Unable to set socket read timeout."); + + while running_cl.load(Ordering::Relaxed) { + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(p) => p, + Err(PacketRwError::Closed) => Packet::Disconnect, + Err(PacketRwError::IOError(err)) => { + if err.kind() == io::ErrorKind::WouldBlock { + // This error is thrown when the remote reaches the + // timeout duration and can thusly be ignored. + continue; + } + + error!("Unable to read packet: {:?}", err); + continue; + } + Err(err) => { + error!("Unable to read packet. {:?}", err); + continue; + } + }; + + // Send the packet through the mspc channel. + tx.send(packet).unwrap(); + } + }); + + Self { + stream: RefCell::new(stream), + packet_rx: rx, + running, + rcv_thread_handle: Some(rcv_thread_handle), + } + } + + /// Get the next packet in case one has been received. If no packet has been + /// received since the latest packet has been checked last time, this will + /// return none. If the client has been disconnected, this will also return + /// `None`. + pub fn next_packet(&self) -> Option<P> { + match self.packet_rx.try_recv() { + Ok(Packet::Disconnect) => { + self.running.store(false, Ordering::Relaxed); + info!("Server disconnected client."); + None + } + Ok(Packet::Cargo(packet)) => Some(packet), + Err(err) => { + warn!("unable to receive packet: {}", err); + None + } + } + } + + /// Send a packet to the server. + pub fn send(&self, packet: P) -> Result<(), PacketRwError> { + Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut()) + } + + /// Stop the client. No further packets will be received or be sent. + pub fn stop(&mut self) -> Result<(), io::Error> { + self.stream.borrow_mut().shutdown(Shutdown::Both)?; + self.running.store(false, Ordering::Relaxed); + if let Some(handle) = self.rcv_thread_handle.take() { + handle + .join() + .expect("Packet receive thread closed unexpectedly."); + } + + Ok(()) + } +} + +impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Drop for Connection<P> { + fn drop(&mut self) { + self.stop().expect("Failed to stop the client properly"); + } +} diff --git a/src/net/client/mod.rs b/src/net/client/mod.rs new file mode 100644 index 0000000..d5db3e8 --- /dev/null +++ b/src/net/client/mod.rs @@ -0,0 +1,4 @@ +//! Networking items used solely by the client. + +pub mod connection; +pub use connection::*; |
