use crate::protocol::connection::{AudioChannel, ControlChannel}; use crate::server::client::client_worker::{ClientEvent, ServerEvent}; use crate::storage::{Guest, SessionData, Storage}; use log::error; use mumble_protocol::control::msgs::{ Authenticate, ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync, TextMessage, UserRemove, UserState, Version, }; use mumble_protocol::control::ControlPacket; use mumble_protocol::voice::{Clientbound, Serverbound, VoicePacket}; use ring::pbkdf2; use std::fmt; use std::io::Error; use std::marker::PhantomData; use std::num::NonZeroU32; use std::sync::Arc; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256; type Key = [u8; 16]; type Nonce = [u8; 16]; pub struct Handler { storage: Arc, control_channel: Arc, audio_channel: Option>, event_sender: Sender, config: Config, session_id: u32, crypto_resyncs: u32, } pub struct Config { pub crypto_key: Key, pub server_nonce: Nonce, pub client_nonce: Nonce, pub alpha_codec_version: i32, pub beta_codec_version: i32, pub prefer_alpha: bool, pub opus_support: bool, pub welcome_text: String, pub max_bandwidth: u32, pub max_users: u32, pub allow_html: bool, pub max_message_length: u32, pub max_image_message_length: u32, pub max_username_length: u32, pub min_compatible_version: u32, pub server_password: Option, pub pbkdf2_iterations: NonZeroU32, } pub enum HandlerError { IO(std::io::Error), EventReceiverClosed, } impl fmt::Display for HandlerError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &self { HandlerError::IO(e) => { write!(f, "{}", e) } HandlerError::EventReceiverClosed => { write!(f, "Server event receiver have been dropped") } } } } pub enum ConnectionSetupError { IO(std::io::Error), Reject(Reject), WrongPacket, } impl fmt::Display for ConnectionSetupError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match &self { ConnectionSetupError::IO(e) => { write!(f, "{}", e) } ConnectionSetupError::Reject(r) => { write!(f, "Reject: {}", r) } ConnectionSetupError::WrongPacket => { write!(f, "Wrong packet") } } } } pub enum Reject { InvalidUsername, UsernameInUse, _WrongVersion, WrongUserPassword, _WrongServerPassword, _NoCertificate, } impl fmt::Display for Reject { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Reject::InvalidUsername => write!(f, "Invalid username"), Reject::UsernameInUse => write!(f, "Username already in use"), Reject::_WrongVersion => write!(f, "Wrong version"), Reject::WrongUserPassword => write!(f, "Wrong user password"), Reject::_WrongServerPassword => write!(f, "Wrong server password"), Reject::_NoCertificate => write!(f, "No TLS certificate"), } } } impl Handler { pub fn new( storage: Arc, control_channel: Arc, event_sender: Sender, session_id: u32, config: Config, ) -> Self { Handler { storage, control_channel, audio_channel: None, event_sender, session_id, crypto_resyncs: 0, config, } } } impl Handler { pub fn set_audio_channel(&mut self, channel: Arc) { self.audio_channel = Some(channel); } pub async fn handle_new_connection(&self) -> Result<(), ConnectionSetupError> { match self.control_channel.receive().await? { ControlPacket::Version(_) => { // TODO check version } _ => return Err(ConnectionSetupError::WrongPacket), }; // TODO let auth = match self.control_channel.receive().await? { ControlPacket::Authenticate(auth) => auth, _ => return Err(ConnectionSetupError::WrongPacket), }; self.authenticate(*auth).await?; let mut version = Version::new(); version.set_version(0x010300); let mut crypt_setup = CryptSetup::new(); crypt_setup.set_key(Vec::from(self.config.crypto_key)); crypt_setup.set_client_nonce(Vec::from(self.config.client_nonce)); crypt_setup.set_server_nonce(Vec::from(self.config.server_nonce)); let channel_states = self.storage.get_channels().into_iter().map(|channel| { let mut channel_state = ChannelState::new(); channel_state.set_channel_id(channel.id); channel_state.set_name(channel.name); channel_state }); let user_states: Vec = self.get_user_states(); let mut codec_version = CodecVersion::new(); codec_version.set_alpha(self.config.alpha_codec_version); codec_version.set_beta(self.config.beta_codec_version); codec_version.set_prefer_alpha(self.config.prefer_alpha); codec_version.set_opus(self.config.opus_support); let mut server_sync = ServerSync::new(); server_sync.set_session(self.session_id); server_sync.set_max_bandwidth(self.config.max_bandwidth); server_sync.set_welcome_text(self.config.welcome_text.clone()); server_sync.set_permissions(u64::MAX); let mut server_config = ServerConfig::new(); server_config.set_max_users(self.config.max_users); server_config.set_message_length(self.config.max_message_length); self.control_channel.send(version.into()).await?; self.control_channel.send(crypt_setup.into()).await?; for channel_state in channel_states { self.control_channel.send(channel_state.into()).await?; } for user_state in user_states.into_iter() { self.control_channel.send(user_state.into()).await?; } self.control_channel.send(codec_version.into()).await?; self.control_channel.send(server_sync.into()).await?; self.control_channel.send(server_config.into()).await?; Ok(()) } pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), HandlerError> { match event { ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?, ServerEvent::StateChanged(state) => self.user_state_changed(state).await?, ServerEvent::Talking(voice_packet) => self.user_talking(voice_packet).await?, ServerEvent::Disconnected(session_id) => self.user_disconnected(session_id).await?, ServerEvent::TextMessage(message) => self.user_text_message(message).await?, } Ok(()) } pub async fn handle_message( &self, packet: ControlPacket, ) -> Result<(), HandlerError> { match packet { ControlPacket::Ping(ping) => self.control_ping(*ping).await?, ControlPacket::TextMessage(message) => self.text_message(*message).await?, ControlPacket::UserState(state) => self.user_state(*state).await?, ControlPacket::UDPTunnel(tunnel) => self.handle_audio_packet(*tunnel).await?, pkt => error!("No handler for {:?}", pkt), } Ok(()) } pub async fn handle_audio_packet( &self, packet: VoicePacket, ) -> Result<(), HandlerError> { match packet { VoicePacket::Ping { timestamp } => { if let Some(channel) = self.audio_channel.as_ref() { channel.send(VoicePacket::Ping { timestamp }).await?; } } VoicePacket::Audio { target, seq_num, payload, position_info, .. } => { let packet = VoicePacket::Audio { _dst: PhantomData, target, session_id: self.session_id, seq_num, payload, position_info, }; self.event_sender.send(ClientEvent::Talking(packet)).await?; } } Ok(()) } pub async fn self_disconnected(&self) -> Result<(), HandlerError> { self.storage.remove_by_session_id(self.session_id); self.event_sender.send(ClientEvent::Disconnected).await?; Ok(()) } // Control packets async fn control_ping(&self, incoming: Ping) -> Result<(), HandlerError> { let mut ping = Ping::new(); ping.set_timestamp(incoming.get_timestamp()); if let Some(channel) = self.audio_channel.as_ref() { let stats = channel.get_stats(); ping.set_good(stats.good); ping.set_late(stats.late); ping.set_lost(stats.lost); ping.set_resync(self.crypto_resyncs); } self.control_channel.send(ping.into()).await?; Ok(()) } async fn text_message(&self, mut message: TextMessage) -> Result<(), HandlerError> { if self.config.max_message_length < message.get_message().len() as u32 { // TODO send the permission denied message return Ok(()); } if !message.has_actor() { message.set_actor(self.session_id); } self.event_sender .send(ClientEvent::TextMessage(message)) .await?; Ok(()) } async fn user_state(&self, mut state: UserState) -> Result<(), HandlerError> { if !state.has_session() { state.set_session(self.session_id); } let session_data = SessionData { muted_by_admin: state.get_mute(), deafened_by_admin: state.get_deaf(), suppressed: false, self_mute: state.get_self_mute(), self_deaf: state.get_self_deaf(), priority_speaker: false, recording: false, }; self.storage .update_session_data(self.session_id, session_data); self.event_sender .send(ClientEvent::StateChanged(state)) .await?; Ok(()) } // Server events async fn new_user_connected(&self, session_id: u32) -> Result<(), HandlerError> { if let Some(user) = self.storage.get_connected_user(session_id) { let mut user_state = UserState::new(); user_state.set_session(session_id); user_state.set_name(user.username); user_state.set_channel_id(user.channel_id); self.control_channel.send(user_state.into()).await?; } else if let Some(guest) = self.storage.get_guest(session_id) { let mut user_state = UserState::new(); user_state.set_session(session_id); user_state.set_name(guest.username); user_state.set_channel_id(guest.channel_id); self.control_channel.send(user_state.into()).await?; } Ok(()) } async fn user_state_changed(&self, state: UserState) -> Result<(), HandlerError> { self.control_channel.send(state.into()).await?; Ok(()) } async fn user_talking( &self, voice_packet: VoicePacket, ) -> Result<(), HandlerError> { if let Some(data) = self.storage.get_session_data(self.session_id) { if data.self_deaf || data.deafened_by_admin { return Ok(()); } } if let Some(channel) = self.audio_channel.as_ref() { channel.send(voice_packet).await?; } else { self.control_channel.send(voice_packet.into()).await?; } Ok(()) } async fn user_disconnected(&self, session_id: u32) -> Result<(), HandlerError> { let mut user_remove = UserRemove::new(); user_remove.set_session(session_id); Ok(self.control_channel.send(user_remove.into()).await?) } async fn user_text_message(&self, message: TextMessage) -> Result<(), HandlerError> { self.control_channel.send(message.into()).await?; Ok(()) } // Utils async fn authenticate(&self, auth: Authenticate) -> Result<(), ConnectionSetupError> { if !auth.has_username() { return Err(ConnectionSetupError::Reject(Reject::InvalidUsername)); } let username = auth.get_username(); if !validate_username(&username, self.config.max_username_length as usize) { return Err(ConnectionSetupError::Reject(Reject::InvalidUsername)); } if self.storage.username_in_connected(&username) { return Err(ConnectionSetupError::Reject(Reject::UsernameInUse)); } let user = match self.storage.get_user_by_username(username.into()) { Some(user) => user, None => { self.storage .add_guest(Guest::new(username.into(), self.session_id, 0)); return Ok(()); } }; if let (Some(stored_password_hash), Some(iterations), Some(salt)) = ( &user.password_hash, user.pbkdf2_iterations, &user.password_salt, ) { if !auth.has_password() { return Err(ConnectionSetupError::Reject(Reject::WrongUserPassword)); } let password = auth.get_password(); pbkdf2::verify( PBKDF2_ALGORITHM, iterations, salt, password.as_bytes(), stored_password_hash, ) .map_err(|_| ConnectionSetupError::Reject(Reject::WrongUserPassword))?; } self.storage.add_connected_user(user, self.session_id); Ok(()) } fn get_user_states(&self) -> Vec { let guests = self.storage.get_guests(); let users = self.storage.get_connected_users(); let mut states = Vec::with_capacity(guests.len() + users.len()); for guest in guests { let mut state = UserState::new(); state.set_session(guest.session_id); state.set_name(guest.username); state.set_channel_id(guest.channel_id); states.push(state); } for (session_id, user) in users { let mut state = UserState::new(); state.set_session(session_id); state.set_name(user.username); state.set_channel_id(user.channel_id); states.push(state); } states } } fn validate_username(username: &str, max_username_length: usize) -> bool { !username.is_empty() && username.trim().len() == username.len() && username.len() <= max_username_length } impl From for HandlerError { fn from(err: Error) -> Self { HandlerError::IO(err) } } impl From for ConnectionSetupError { fn from(err: Error) -> Self { ConnectionSetupError::IO(err) } } impl From> for HandlerError { fn from(_: SendError) -> Self { HandlerError::EventReceiverClosed } }