//! The main server module, managing connections from clients. use super::super::packet::Packet; use super::connection::Connection; use crate::stable_vec::StableVec; use serde::de::DeserializeOwned; use serde::Serialize; use std::fmt::Debug; use std::io; use std::net::{SocketAddr, TcpListener}; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread; /// Type of id for the connections inside of the connection manager. pub type ConnId = usize; /// Manages incoming connections to the servers and packets received from them. pub struct ConnectionManager { connections: Arc>>, local_port: u16, _tx: Sender<(ConnId, Packet

)>, rx: Receiver<(ConnId, Packet

)>, } impl<'de, P: 'static + Send + Debug + DeserializeOwned + Serialize> ConnectionManager

{ fn listen( listener: TcpListener, connections: Arc>>, packet_tx: Sender<(ConnId, Packet

)>, ) { for stream in listener.incoming() { info!("Incoming connection."); let stream = match stream { Ok(stream) => stream, Err(err) => { error!("Unable to accept client. {}", err); continue; } }; let mut connections = connections.write().unwrap(); let id = connections.next_free(); connections .try_insert(id, Connection::start_rcv(id, stream, packet_tx.clone())) .expect("Unable to insert client at supposedly valid id"); info!("Client `{}` connected.", id); } error!("Closing listener. This should never happen"); } /// Start listening for connections. Returns the manager for connections, /// which then can be asked about the connectins status and to send packets to /// any client connected. pub fn start(addr: SocketAddr) -> Result { let listener = TcpListener::bind(addr)?; let local_port = listener.local_addr()?.port(); let connections = Arc::new(RwLock::new(StableVec::new())); let (tx, rx) = mpsc::channel(); let tx_cl = tx.clone(); let connections_cl = connections.clone(); thread::spawn(move || Self::listen(listener, connections_cl, tx_cl)); Ok(Self { connections, local_port, _tx: tx, rx, }) } /// Try to receive the next packet. If no packet was received, this returns `None`. If the client /// was disconnected, this also returns `None`. pub fn next_packet(&self) -> Option<(ConnId, P)> { match self.rx.try_recv() { Ok((conn_id, Packet::Disconnect)) => { self.connections.write().unwrap().remove(conn_id); self.next_packet() } Ok((conn_id, Packet::Cargo(packet))) => Some((conn_id, packet)), Err(_err) => None, } } /// Send a packet to all clients currently connected pub fn broadcast(&self, packet: P) -> bool { let mut one_failed = false; let mut conns = self.connections.write().unwrap(); let packet = Packet::Cargo(packet); for (ref id, ref mut conn) in conns.id_iter_mut() { if let Err(err) = conn.send(&packet) { error!( "Broadcasting {:?} failed for client `{}`: {:?}", packet, id, err ); one_failed = true; } } !one_failed } /// Get the local port this connection manager's listener is bound to. pub fn port(&self) -> u16 { self.local_port } }