1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
//! The main server module, managing connections from clients.
use super::super::packet::Packet;
use super::connection::Connection;
use crate::stable_vec::StableVec;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::io;
use std::net::{SocketAddr, TcpListener};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
/// Type of id for the connections inside of the connection manager.
pub type ConnId = usize;
/// Manages incoming connections to the servers and packets received from them.
pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> {
connections: Arc<RwLock<StableVec<Connection>>>,
local_port: u16,
_tx: Sender<(ConnId, Packet<P>)>,
rx: Receiver<(ConnId, Packet<P>)>,
}
impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> {
fn listen(
listener: TcpListener,
connections: Arc<RwLock<StableVec<Connection>>>,
packet_tx: Sender<(ConnId, Packet<P>)>,
) {
for stream in listener.incoming() {
info!("Incoming connection.");
let stream = match stream {
Ok(stream) => stream,
Err(err) => {
error!("Unable to accept client. {}", err);
continue;
}
};
let mut connections = connections.write().unwrap();
let id = dbg!(connections.next_free());
connections
.try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone()))
.expect("Unable to insert client at supposedly valid id");
info!("Client `{}` connected.", id);
}
error!("Closing listener. This should never happen");
}
/// Start listening for connections. Returns the manager for connections,
/// which then can be asked about the connectins status and to send packets to
/// any client connected.
pub fn start(addr: SocketAddr) -> Result<Self, io::Error> {
let listener = TcpListener::bind(addr)?;
let local_port = listener.local_addr()?.port();
let connections = Arc::new(RwLock::new(StableVec::new()));
let (tx, rx) = mpsc::channel();
let tx_cl = tx.clone();
let connections_cl = connections.clone();
thread::spawn(move || Self::listen(listener, connections_cl, tx_cl));
Ok(Self {
connections,
local_port,
_tx: tx,
rx,
})
}
/// Try to receive the next packet. If no packet was received, this returns `None`. If the client
/// was disconnected, this also returns `None`.
pub fn next_packet(&self) -> Option<(ConnId, P)> {
match self.rx.try_recv() {
Ok((conn_id, Packet::Disconnect)) => {
self.connections.write().unwrap().remove(conn_id);
self.next_packet()
}
Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)),
Err(_err) => None,
}
}
/// Send a packet to all clients currently connected
pub fn broadcast(&self, packet: P) -> bool {
let mut one_failed = false;
let mut conns = self.connections.write().unwrap();
let packet = Packet::Cargo(packet);
for (ref id, ref mut conn) in conns.id_iter_mut() {
if let Err(err) = conn.send(&packet) {
error!(
"Broadcasting {:?} failed for client `{}`: {:?}",
packet, id, err
);
one_failed = true;
}
}
!one_failed
}
/// Get the local port this connection manager's listener is bound to.
pub fn port(&self) -> u16 {
self.local_port
}
}
|