//! A connection to a server using the TCP-Protocol. use super::super::{Packet, PacketRwError}; use serde::de::DeserializeOwned; use serde::Serialize; use std::fmt::Debug; use std::io; use std::net::TcpStream; use std::ops::DerefMut; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{self, Receiver}; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; use std::{cell::RefCell, net::Shutdown}; /// Represents a connection to the server. pub struct Connection { stream: RefCell, packet_rx: Receiver>, running: Arc, rcv_thread_handle: Option>, } impl Connection

{ /// Create a new connection based on the given TCP-Stream. Will start up an /// extra thread on which it will receive packets, while the thread with the /// connection can send packets and query for packets that may have been /// received. pub fn new(stream: TcpStream) -> Self { let running = Arc::new(AtomicBool::new(true)); let (tx, rx) = mpsc::channel(); let mut stream_cl = stream .try_clone() .expect("Unable to create tcp stream for receiving packets."); let running_cl = running.clone(); let rcv_thread_handle = thread::spawn(move || { // Set the read timeout, so that this thread can be closed gracefully. stream_cl .set_read_timeout(Some(Duration::from_millis(500))) .expect("Unable to set socket read timeout."); while running_cl.load(Ordering::Relaxed) { let packet = match Packet::read_from_stream(&mut stream_cl) { Ok(p) => p, Err(PacketRwError::Closed) => Packet::Disconnect, Err(PacketRwError::IOError(err)) => { if err.kind() == io::ErrorKind::WouldBlock { // This error is thrown when the remote reaches the // timeout duration and can thusly be ignored. continue; } error!("Unable to read packet: {:?}", err); continue; } Err(err) => { error!("Unable to read packet. {:?}", err); continue; } }; // Send the packet through the mspc channel. tx.send(packet).unwrap(); } }); Self { stream: RefCell::new(stream), packet_rx: rx, running, rcv_thread_handle: Some(rcv_thread_handle), } } /// Get the next packet in case one has been received. If no packet has been /// received since the latest packet has been checked last time, this will /// return none. If the client has been disconnected, this will also return /// `None`. pub fn next_packet(&self) -> Option

{ match self.packet_rx.try_recv() { Ok(Packet::Disconnect) => { self.running.store(false, Ordering::Relaxed); info!("Server disconnected client."); None } Ok(Packet::Cargo(packet)) => Some(packet), Err(err) => { warn!("unable to receive packet: {}", err); None } } } /// Send a packet to the server. pub fn send(&self, packet: P) -> Result<(), PacketRwError> { Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut()) } /// Stop the client. No further packets will be received or be sent. pub fn stop(&mut self) -> Result<(), io::Error> { self.stream.borrow_mut().shutdown(Shutdown::Both)?; self.running.store(false, Ordering::Relaxed); if let Some(handle) = self.rcv_thread_handle.take() { handle .join() .expect("Packet receive thread closed unexpectedly."); } Ok(()) } } impl Drop for Connection

{ fn drop(&mut self) { self.stop().expect("Failed to stop the client properly"); } }