aboutsummaryrefslogtreecommitdiff
path: root/src/net/server/connection_manager.rs
blob: c47aa32f6d8febe31730fe85fc62a589561dbf94 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//! The main server module, managing connections from clients.

use super::super::packet::Packet;
use super::connection::Connection;
use crate::stable_vec::StableVec;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::io;
use std::net::{SocketAddr, TcpListener};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread;

/// Type of id for the connections inside of the connection manager.
pub type ConnId = usize;

/// Manages incoming connections to the servers and packets received from them.
pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> {
    connections: Arc<RwLock<StableVec<Connection>>>,
    local_port: u16,
    _tx: Sender<(ConnId, Packet<P>)>,
    rx: Receiver<(ConnId, Packet<P>)>,
}

impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> {
    fn listen(
        listener: TcpListener,
        connections: Arc<RwLock<StableVec<Connection>>>,
        packet_tx: Sender<(ConnId, Packet<P>)>,
    ) {
        for stream in listener.incoming() {
            info!("Incoming connection.");
            let stream = match stream {
                Ok(stream) => stream,
                Err(err) => {
                    error!("Unable to accept client. {}", err);
                    continue;
                }
            };

            let mut connections = connections.write().unwrap();
            let id = connections.next_free();
            connections
                .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone()))
                .expect("Unable to insert client at supposedly valid id");
            info!("Client `{}` connected.", id);
        }

        error!("Closing listener. This should never happen");
    }

    /// Start listening for connections. Returns the manager for connections,
    /// which then can be asked about the connectins status and to send packets to
    /// any client connected.
    pub fn start(addr: SocketAddr) -> Result<Self, io::Error> {
        let listener = TcpListener::bind(addr)?;
        let local_port = listener.local_addr()?.port();
        let connections = Arc::new(RwLock::new(StableVec::new()));

        let (tx, rx) = mpsc::channel();
        let tx_cl = tx.clone();
        let connections_cl = connections.clone();
        thread::spawn(move || Self::listen(listener, connections_cl, tx_cl));

        Ok(Self {
            connections,
            local_port,
            _tx: tx,
            rx,
        })
    }

    /// Try to receive the next packet. If no packet was received, this returns `None`. If the client
    /// was disconnected, this also returns `None`.
    pub fn next_packet(&self) -> Option<(ConnId, P)> {
        match self.rx.try_recv() {
            Ok((conn_id, Packet::Disconnect)) => {
                self.connections.write().unwrap().remove(conn_id);
                self.next_packet()
            }
            Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)),
            Err(_err) => None,
        }
    }

    /// Send a packet to all clients currently connected
    pub fn broadcast(&self, packet: P) -> bool {
        let mut one_failed = false;
        let mut conns = self.connections.write().unwrap();
        let packet = Packet::Cargo(packet);
        for (ref id, ref mut conn) in conns.id_iter_mut() {
            if let Err(err) = conn.send(&packet) {
                error!(
                    "Broadcasting {:?} failed for client `{}`: {:?}",
                    packet, id, err
                );
                one_failed = true;
            }
        }

        !one_failed
    }

    /// Get the local port this connection manager's listener is bound to.
    pub fn port(&self) -> u16 {
        self.local_port
    }
}