aboutsummaryrefslogtreecommitdiff
path: root/src/net/server/connection_manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/server/connection_manager.rs')
-rw-r--r--src/net/server/connection_manager.rs109
1 files changed, 109 insertions, 0 deletions
diff --git a/src/net/server/connection_manager.rs b/src/net/server/connection_manager.rs
new file mode 100644
index 0000000..b79ab2c
--- /dev/null
+++ b/src/net/server/connection_manager.rs
@@ -0,0 +1,109 @@
+//! The main server module, managing connections from clients.
+
+use super::super::packet::Packet;
+use super::connection::Connection;
+use crate::stable_vec::StableVec;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::fmt::Debug;
+use std::io;
+use std::net::{SocketAddr, TcpListener};
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, RwLock};
+use std::thread;
+
+/// Type of id for the connections inside of the connection manager.
+pub type ConnId = usize;
+
+/// Manages incoming connections to the servers and packets received from them.
+pub struct ConnectionManager<P: 'static + Send + Debug + DeserializeOwned> {
+ connections: Arc<RwLock<StableVec<Connection>>>,
+ local_port: u16,
+ _tx: Sender<(ConnId, Packet<P>)>,
+ rx: Receiver<(ConnId, Packet<P>)>,
+}
+
+impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager<P> {
+ fn listen(
+ listener: TcpListener,
+ connections: Arc<RwLock<StableVec<Connection>>>,
+ packet_tx: Sender<(ConnId, Packet<P>)>,
+ ) {
+ for stream in listener.incoming() {
+ info!("Incoming connection.");
+ let stream = match stream {
+ Ok(stream) => stream,
+ Err(err) => {
+ error!("Unable to accept client. {}", err);
+ continue;
+ }
+ };
+
+ let mut connections = connections.write().unwrap();
+ let id = dbg!(connections.next_free());
+ connections
+ .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone()))
+ .expect("Unable to insert client at supposedly valid id");
+ info!("Client `{}` connected.", id);
+ }
+
+ error!("Closing listener. This should never happen");
+ }
+
+ /// Start listening for connections. Returns the manager for connections,
+ /// which then can be asked about the connectins status and to send packets to
+ /// any client connected.
+ pub fn start(addr: SocketAddr) -> Result<Self, io::Error> {
+ let listener = TcpListener::bind(addr)?;
+ let local_port = listener.local_addr()?.port();
+ let connections = Arc::new(RwLock::new(StableVec::new()));
+
+ let (tx, rx) = mpsc::channel();
+ let tx_cl = tx.clone();
+ let connections_cl = connections.clone();
+ thread::spawn(move || Self::listen(listener, connections_cl, tx_cl));
+
+ Ok(Self {
+ connections,
+ local_port,
+ _tx: tx,
+ rx,
+ })
+ }
+
+ /// Try to receive the next packet. If no packet was received, this returns `None`. If the client
+ /// was disconnected, this also returns `None`.
+ pub fn next_packet(&self) -> Option<(ConnId, P)> {
+ match self.rx.try_recv() {
+ Ok((conn_id, Packet::Disconnect)) => {
+ self.connections.write().unwrap().remove(conn_id);
+ self.next_packet()
+ }
+ Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)),
+ Err(_err) => None,
+ }
+ }
+
+ /// Send a packet to all clients currently connected
+ pub fn broadcast(&self, packet: P) -> bool {
+ let mut one_failed = false;
+ let mut conns = self.connections.write().unwrap();
+ let packet = Packet::Cargo(packet);
+ for (ref id, ref mut conn) in conns.id_iter_mut() {
+ if let Err(err) = conn.send(&packet) {
+ error!(
+ "Broadcasting {:?} failed for client `{}`: {:?}",
+ packet, id, err
+ );
+ one_failed = true;
+ }
+ }
+
+ !one_failed
+ }
+
+ /// Get the local port this connection manager's listener is bound to.
+ pub fn port(&self) -> u16 {
+ self.local_port
+ }
+}