From f92e9f6f07b1e3834c2ca58ce3510734819d08e4 Mon Sep 17 00:00:00 2001 From: Arne Dußin Date: Wed, 27 Jan 2021 14:01:50 +0100 Subject: Rework graf karto to fit the client/server structure --- src/net/client/connection.rs | 119 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 src/net/client/connection.rs (limited to 'src/net/client/connection.rs') diff --git a/src/net/client/connection.rs b/src/net/client/connection.rs new file mode 100644 index 0000000..2941a0a --- /dev/null +++ b/src/net/client/connection.rs @@ -0,0 +1,119 @@ +//! A connection to a server using the TCP-Protocol. + +use super::super::{Packet, PacketRwError}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use std::io; +use std::net::TcpStream; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use std::{cell::RefCell, net::Shutdown}; + +/// Represents a connection to the server. +pub struct Connection { + stream: RefCell, + packet_rx: Receiver>, + running: Arc, + rcv_thread_handle: Option>, +} + +impl Connection

{ + /// Create a new connection based on the given TCP-Stream. Will start up an + /// extra thread on which it will receive packets, while the thread with the + /// connection can send packets and query for packets that may have been + /// received. + pub fn new(stream: TcpStream) -> Self { + let running = Arc::new(AtomicBool::new(true)); + + let (tx, rx) = mpsc::channel(); + let mut stream_cl = stream + .try_clone() + .expect("Unable to create tcp stream for receiving packets."); + let running_cl = running.clone(); + let rcv_thread_handle = thread::spawn(move || { + // Set the read timeout, so that this thread can be closed gracefully. + stream_cl + .set_read_timeout(Some(Duration::from_millis(500))) + .expect("Unable to set socket read timeout."); + + while running_cl.load(Ordering::Relaxed) { + let packet = match Packet::read_from_stream(&mut stream_cl) { + Ok(p) => p, + Err(PacketRwError::Closed) => Packet::Disconnect, + Err(PacketRwError::IOError(err)) => { + if err.kind() == io::ErrorKind::WouldBlock { + // This error is thrown when the remote reaches the + // timeout duration and can thusly be ignored. + continue; + } + + error!("Unable to read packet: {:?}", err); + continue; + } + Err(err) => { + error!("Unable to read packet. {:?}", err); + continue; + } + }; + + // Send the packet through the mspc channel. + tx.send(packet).unwrap(); + } + }); + + Self { + stream: RefCell::new(stream), + packet_rx: rx, + running, + rcv_thread_handle: Some(rcv_thread_handle), + } + } + + /// Get the next packet in case one has been received. If no packet has been + /// received since the latest packet has been checked last time, this will + /// return none. If the client has been disconnected, this will also return + /// `None`. + pub fn next_packet(&self) -> Option

{ + match self.packet_rx.try_recv() { + Ok(Packet::Disconnect) => { + self.running.store(false, Ordering::Relaxed); + info!("Server disconnected client."); + None + } + Ok(Packet::Cargo(packet)) => Some(packet), + Err(err) => { + warn!("unable to receive packet: {}", err); + None + } + } + } + + /// Send a packet to the server. + pub fn send(&self, packet: P) -> Result<(), PacketRwError> { + Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut()) + } + + /// Stop the client. No further packets will be received or be sent. + pub fn stop(&mut self) -> Result<(), io::Error> { + self.stream.borrow_mut().shutdown(Shutdown::Both)?; + self.running.store(false, Ordering::Relaxed); + if let Some(handle) = self.rcv_thread_handle.take() { + handle + .join() + .expect("Packet receive thread closed unexpectedly."); + } + + Ok(()) + } +} + +impl Drop for Connection

{ + fn drop(&mut self) { + self.stop().expect("Failed to stop the client properly"); + } +} -- cgit v1.2.3-70-g09d2