aboutsummaryrefslogtreecommitdiff
path: root/src/net/client/connection.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/client/connection.rs')
-rw-r--r--src/net/client/connection.rs119
1 files changed, 119 insertions, 0 deletions
diff --git a/src/net/client/connection.rs b/src/net/client/connection.rs
new file mode 100644
index 0000000..2941a0a
--- /dev/null
+++ b/src/net/client/connection.rs
@@ -0,0 +1,119 @@
+//! A connection to a server using the TCP-Protocol.
+
+use super::super::{Packet, PacketRwError};
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::io;
+use std::net::TcpStream;
+use std::ops::DerefMut;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc::{self, Receiver};
+use std::sync::Arc;
+use std::thread::{self, JoinHandle};
+use std::time::Duration;
+use std::{cell::RefCell, net::Shutdown};
+
+/// Represents a connection to the server.
+pub struct Connection<P: 'static + Debug + Send + DeserializeOwned + Serialize> {
+ stream: RefCell<TcpStream>,
+ packet_rx: Receiver<Packet<P>>,
+ running: Arc<AtomicBool>,
+ rcv_thread_handle: Option<JoinHandle<()>>,
+}
+
+impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Connection<P> {
+ /// Create a new connection based on the given TCP-Stream. Will start up an
+ /// extra thread on which it will receive packets, while the thread with the
+ /// connection can send packets and query for packets that may have been
+ /// received.
+ pub fn new(stream: TcpStream) -> Self {
+ let running = Arc::new(AtomicBool::new(true));
+
+ let (tx, rx) = mpsc::channel();
+ let mut stream_cl = stream
+ .try_clone()
+ .expect("Unable to create tcp stream for receiving packets.");
+ let running_cl = running.clone();
+ let rcv_thread_handle = thread::spawn(move || {
+ // Set the read timeout, so that this thread can be closed gracefully.
+ stream_cl
+ .set_read_timeout(Some(Duration::from_millis(500)))
+ .expect("Unable to set socket read timeout.");
+
+ while running_cl.load(Ordering::Relaxed) {
+ let packet = match Packet::read_from_stream(&mut stream_cl) {
+ Ok(p) => p,
+ Err(PacketRwError::Closed) => Packet::Disconnect,
+ Err(PacketRwError::IOError(err)) => {
+ if err.kind() == io::ErrorKind::WouldBlock {
+ // This error is thrown when the remote reaches the
+ // timeout duration and can thusly be ignored.
+ continue;
+ }
+
+ error!("Unable to read packet: {:?}", err);
+ continue;
+ }
+ Err(err) => {
+ error!("Unable to read packet. {:?}", err);
+ continue;
+ }
+ };
+
+ // Send the packet through the mspc channel.
+ tx.send(packet).unwrap();
+ }
+ });
+
+ Self {
+ stream: RefCell::new(stream),
+ packet_rx: rx,
+ running,
+ rcv_thread_handle: Some(rcv_thread_handle),
+ }
+ }
+
+ /// Get the next packet in case one has been received. If no packet has been
+ /// received since the latest packet has been checked last time, this will
+ /// return none. If the client has been disconnected, this will also return
+ /// `None`.
+ pub fn next_packet(&self) -> Option<P> {
+ match self.packet_rx.try_recv() {
+ Ok(Packet::Disconnect) => {
+ self.running.store(false, Ordering::Relaxed);
+ info!("Server disconnected client.");
+ None
+ }
+ Ok(Packet::Cargo(packet)) => Some(packet),
+ Err(err) => {
+ warn!("unable to receive packet: {}", err);
+ None
+ }
+ }
+ }
+
+ /// Send a packet to the server.
+ pub fn send(&self, packet: P) -> Result<(), PacketRwError> {
+ Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut())
+ }
+
+ /// Stop the client. No further packets will be received or be sent.
+ pub fn stop(&mut self) -> Result<(), io::Error> {
+ self.stream.borrow_mut().shutdown(Shutdown::Both)?;
+ self.running.store(false, Ordering::Relaxed);
+ if let Some(handle) = self.rcv_thread_handle.take() {
+ handle
+ .join()
+ .expect("Packet receive thread closed unexpectedly.");
+ }
+
+ Ok(())
+ }
+}
+
+impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Drop for Connection<P> {
+ fn drop(&mut self) {
+ self.stop().expect("Failed to stop the client properly");
+ }
+}