server.rs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. use std::collections::HashMap;
  2. use std::net::{IpAddr, SocketAddr};
  3. use std::sync::Arc;
  4. use tokio::net::{TcpListener, TcpStream, UdpSocket};
  5. use tokio::sync::{mpsc, Mutex, RwLock};
  6. use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
  7. use tokio_rustls::{TlsAcceptor, TlsStream};
  8. use crate::client::{Client, Message, ResponseMessage};
  9. use crate::connection::{AudioChannel, ControlChannel};
  10. use crate::crypto::Ocb2Aes128Crypto;
  11. use crate::db::Db;
  12. use crate::protocol::AudioData;
  13. use rand::prelude::StdRng;
  14. use rand::{Rng, SeedableRng};
  15. use tokio::sync::mpsc::{Receiver, Sender};
  16. pub const MAX_UDP_DATAGRAM_SIZE: usize = 1024;
  17. pub struct Config {
  18. pub ip_address: IpAddr,
  19. pub port: u16,
  20. pub certificate: Certificate,
  21. pub private_key: PrivateKey,
  22. pub path_to_db_file: String,
  23. }
  24. pub struct Server {
  25. config: Config,
  26. db: Arc<Db>,
  27. clients: RwLock<HashMap<SessionId, Client>>,
  28. waiting_for_audio_channel: Mutex<Vec<(SessionId, IpAddr, Ocb2Aes128Crypto)>>,
  29. address_to_channel: RwLock<HashMap<SocketAddr, Sender<Vec<u8>>>>,
  30. }
  31. type SessionId = u32;
  32. impl Server {
  33. pub fn new(config: Config) -> Arc<Self> {
  34. let path_to_db_file = config.path_to_db_file.clone();
  35. Arc::new(Server {
  36. config,
  37. clients: RwLock::new(HashMap::new()),
  38. db: Arc::new(Db::open(&path_to_db_file)),
  39. waiting_for_audio_channel: Mutex::new(vec![]),
  40. address_to_channel: RwLock::new(HashMap::new()),
  41. })
  42. }
  43. pub async fn run(self: Arc<Self>) {
  44. let mut tls_config = ServerConfig::new(NoClientAuth::new());
  45. tls_config
  46. .set_single_cert(
  47. vec![self.config.certificate.clone()],
  48. self.config.private_key.clone(),
  49. )
  50. .expect("Invalid private key");
  51. let socket_address = SocketAddr::new(self.config.ip_address, self.config.port);
  52. let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
  53. let tcp_listener = TcpListener::bind(socket_address).await.unwrap();
  54. let udp_socket = UdpSocket::bind(socket_address).await.unwrap();
  55. Arc::clone(&self).run_udp_task(udp_socket).await;
  56. Arc::clone(&self)
  57. .listen_for_new_connections(tcp_listener, tls_acceptor)
  58. .await;
  59. }
  60. async fn run_udp_task(self: Arc<Self>, socket: UdpSocket) {
  61. let socket = Arc::new(socket);
  62. tokio::spawn(async move {
  63. let mut buf = [0; MAX_UDP_DATAGRAM_SIZE];
  64. loop {
  65. if let Ok((len, socket_address)) = socket.recv_from(&mut buf).await {
  66. if !Arc::clone(&self)
  67. .send_to_audio_channel(&buf[..len], &socket_address)
  68. .await
  69. {
  70. // TODO Move to a separate task
  71. Arc::clone(&self)
  72. .match_address_to_channel(
  73. &buf[..len],
  74. socket_address,
  75. Arc::clone(&socket),
  76. )
  77. .await;
  78. }
  79. }
  80. }
  81. });
  82. }
  83. async fn send_to_audio_channel(self: &Arc<Self>, buf: &[u8], address: &SocketAddr) -> bool {
  84. let connected = self.address_to_channel.read().await;
  85. if let Some(sender) = connected.get(address) {
  86. sender.try_send(Vec::from(buf));
  87. return true;
  88. }
  89. false
  90. }
  91. async fn match_address_to_channel(
  92. self: &Arc<Self>,
  93. buf: &[u8],
  94. address: SocketAddr,
  95. udp_socket: Arc<UdpSocket>,
  96. ) {
  97. let mut waiting = self.waiting_for_audio_channel.lock().await;
  98. let index = match waiting
  99. .iter_mut()
  100. .position(|(_, ip, crypto)| &address.ip() == ip && crypto.decrypt(buf).is_ok())
  101. {
  102. Some(index) => index,
  103. None => return,
  104. };
  105. let (session_id, _, crypto) = waiting.remove(index);
  106. drop(waiting);
  107. let (sender, receiver) = mpsc::channel(1);
  108. let mut clients = self.clients.write().await;
  109. if let Some(client) = clients.get_mut(&session_id) {
  110. let audio_channel = AudioChannel::new(receiver, udp_socket, crypto, address);
  111. client.set_audio_channel(audio_channel).await;
  112. }
  113. drop(clients);
  114. let mut address_to_channel = self.address_to_channel.write().await;
  115. address_to_channel.insert(address, sender);
  116. }
  117. async fn listen_for_new_connections(
  118. self: Arc<Self>,
  119. listener: TcpListener,
  120. acceptor: TlsAcceptor,
  121. ) {
  122. loop {
  123. let (stream, _) = match listener.accept().await {
  124. Ok(stream) => stream,
  125. Err(_) => continue,
  126. };
  127. let acceptor = acceptor.clone();
  128. let server = Arc::clone(&self);
  129. tokio::spawn(async move {
  130. let stream = acceptor.accept(stream).await;
  131. if let Ok(stream) = stream {
  132. server.process_new_connection(TlsStream::from(stream)).await;
  133. }
  134. });
  135. }
  136. }
  137. async fn process_new_connection(self: Arc<Self>, stream: TlsStream<TcpStream>) {
  138. let (session_id, mut responder) = match self.new_client(stream).await {
  139. Ok(id) => id,
  140. Err(_) => unimplemented!(),
  141. };
  142. loop {
  143. let message = match responder.recv().await {
  144. Some(msg) => msg,
  145. None => return,
  146. };
  147. match message {
  148. ResponseMessage::Disconnected => {
  149. self.client_disconnected(session_id).await;
  150. return;
  151. }
  152. ResponseMessage::Talking(audio_data) => {
  153. self.client_talking(session_id, audio_data).await;
  154. }
  155. }
  156. }
  157. }
  158. async fn client_disconnected(&self, session_id: SessionId) {
  159. let mut clients = self.clients.write().await;
  160. clients.remove(&session_id);
  161. for client in clients.values() {
  162. client
  163. .send_message(Message::UserDisconnected(session_id))
  164. .await;
  165. }
  166. drop(clients);
  167. //TODO optimize
  168. let mut waiting = self.waiting_for_audio_channel.lock().await;
  169. if let Some(index) = waiting.iter().position(|(id, _, _)| session_id == *id) {
  170. waiting.remove(index);
  171. } else {
  172. drop(waiting);
  173. let mut address_to_channel = self.address_to_channel.write().await;
  174. if let Some(key) = address_to_channel
  175. .keys()
  176. .find(|key| address_to_channel.get(key).unwrap().is_closed())
  177. .cloned()
  178. {
  179. address_to_channel.remove(&key);
  180. }
  181. }
  182. }
  183. async fn client_talking(&self, session_id: SessionId, audio: AudioData) {
  184. let clients = self.clients.read().await;
  185. for client in clients
  186. .values()
  187. .filter(|client| client.session_id != session_id)
  188. {
  189. client
  190. .send_message(Message::UserTalking(audio.clone()))
  191. .await;
  192. }
  193. }
  194. async fn new_client(
  195. self: &Arc<Self>,
  196. stream: TlsStream<TcpStream>,
  197. ) -> Result<(SessionId, Receiver<ResponseMessage>), crate::client::Error> {
  198. let ip = stream.get_ref().0.peer_addr().unwrap().ip();
  199. let config = self.create_client_config();
  200. let crypto =
  201. Ocb2Aes128Crypto::new(config.crypto_key, config.server_nonce, config.client_nonce);
  202. let (client, receiver) =
  203. Client::establish_connection(Arc::clone(&self.db), ControlChannel::new(stream), config)
  204. .await?;
  205. let session_id = client.session_id;
  206. let mut clients = self.clients.write().await;
  207. for client in clients.values() {
  208. client
  209. .send_message(Message::UserConnected(session_id))
  210. .await;
  211. }
  212. clients.insert(session_id, client);
  213. drop(clients);
  214. let mut waiting = self.waiting_for_audio_channel.lock().await;
  215. waiting.push((session_id, ip, crypto));
  216. drop(waiting);
  217. Ok((session_id, receiver))
  218. }
  219. fn create_client_config(&self) -> crate::client::Config {
  220. let crypto_key = self.generate_key();
  221. let server_nonce = self.generate_key();
  222. let client_nonce = self.generate_key();
  223. crate::client::Config {
  224. crypto_key,
  225. server_nonce,
  226. client_nonce,
  227. alpha_codec_version: 0,
  228. beta_codec_version: 0,
  229. prefer_alpha: true,
  230. opus_support: true,
  231. welcome_text: "Welcome".to_string(),
  232. max_bandwidth: 128000,
  233. max_users: 10,
  234. allow_html: true,
  235. max_message_length: 512,
  236. max_image_message_length: 100000,
  237. }
  238. }
  239. fn generate_key(&self) -> [u8; 16] {
  240. let mut buffer = [0; 16];
  241. let mut rng = StdRng::from_entropy();
  242. rng.fill(&mut buffer);
  243. buffer
  244. }
  245. }