aboutsummaryrefslogtreecommitdiff
path: root/src/net/client/connection.rs
blob: 2941a0ad4751c707a8c2ad5765741d74bf00c3f3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//! A connection to a server using the TCP-Protocol.

use super::super::{Packet, PacketRwError};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::io;
use std::net::TcpStream;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::{cell::RefCell, net::Shutdown};

/// Represents a connection to the server.
pub struct Connection<P: 'static + Debug + Send + DeserializeOwned + Serialize> {
    stream: RefCell<TcpStream>,
    packet_rx: Receiver<Packet<P>>,
    running: Arc<AtomicBool>,
    rcv_thread_handle: Option<JoinHandle<()>>,
}

impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Connection<P> {
    /// Create a new connection based on the given TCP-Stream. Will start up an
    /// extra thread on which it will receive packets, while the thread with the
    /// connection can send packets and query for packets that may have been
    /// received.
    pub fn new(stream: TcpStream) -> Self {
        let running = Arc::new(AtomicBool::new(true));

        let (tx, rx) = mpsc::channel();
        let mut stream_cl = stream
            .try_clone()
            .expect("Unable to create tcp stream for receiving packets.");
        let running_cl = running.clone();
        let rcv_thread_handle = thread::spawn(move || {
            // Set the read timeout, so that this thread can be closed gracefully.
            stream_cl
                .set_read_timeout(Some(Duration::from_millis(500)))
                .expect("Unable to set socket read timeout.");

            while running_cl.load(Ordering::Relaxed) {
                let packet = match Packet::read_from_stream(&mut stream_cl) {
                    Ok(p) => p,
                    Err(PacketRwError::Closed) => Packet::Disconnect,
                    Err(PacketRwError::IOError(err)) => {
                        if err.kind() == io::ErrorKind::WouldBlock {
                            // This error is thrown when the remote reaches the
                            // timeout duration and can thusly be ignored.
                            continue;
                        }

                        error!("Unable to read packet: {:?}", err);
                        continue;
                    }
                    Err(err) => {
                        error!("Unable to read packet. {:?}", err);
                        continue;
                    }
                };

                // Send the packet through the mspc channel.
                tx.send(packet).unwrap();
            }
        });

        Self {
            stream: RefCell::new(stream),
            packet_rx: rx,
            running,
            rcv_thread_handle: Some(rcv_thread_handle),
        }
    }

    /// Get the next packet in case one has been received. If no packet has been
    /// received since the latest packet has been checked last time, this will
    /// return none. If the client has been disconnected, this will also return
    /// `None`.
    pub fn next_packet(&self) -> Option<P> {
        match self.packet_rx.try_recv() {
            Ok(Packet::Disconnect) => {
                self.running.store(false, Ordering::Relaxed);
                info!("Server disconnected client.");
                None
            }
            Ok(Packet::Cargo(packet)) => Some(packet),
            Err(err) => {
                warn!("unable to receive packet: {}", err);
                None
            }
        }
    }

    /// Send a packet to the server.
    pub fn send(&self, packet: P) -> Result<(), PacketRwError> {
        Packet::Cargo(packet).write_to_stream(self.stream.borrow_mut().deref_mut())
    }

    /// Stop the client. No further packets will be received or be sent.
    pub fn stop(&mut self) -> Result<(), io::Error> {
        self.stream.borrow_mut().shutdown(Shutdown::Both)?;
        self.running.store(false, Ordering::Relaxed);
        if let Some(handle) = self.rcv_thread_handle.take() {
            handle
                .join()
                .expect("Packet receive thread closed unexpectedly.");
        }

        Ok(())
    }
}

impl<P: 'static + Debug + Send + DeserializeOwned + Serialize> Drop for Connection<P> {
    fn drop(&mut self) {
        self.stop().expect("Failed to stop the client properly");
    }
}