123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- use crate::protocol::connection::{AudioChannel, ControlChannel};
- use crate::server::client::client_worker::{ClientEvent, ServerEvent};
- use crate::storage::{Guest, SessionData, Storage};
- use log::error;
- use mumble_protocol::control::msgs::{
- Authenticate, ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync,
- TextMessage, UserRemove, UserState, Version,
- };
- use mumble_protocol::control::ControlPacket;
- use mumble_protocol::voice::{Clientbound, Serverbound, VoicePacket};
- use ring::pbkdf2;
- use std::fmt;
- use std::io::Error;
- use std::marker::PhantomData;
- use std::num::NonZeroU32;
- use std::sync::Arc;
- use tokio::sync::mpsc::error::SendError;
- use tokio::sync::mpsc::Sender;
- static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
- type Key = [u8; 16];
- type Nonce = [u8; 16];
- pub struct Handler<C: ControlChannel, A: AudioChannel> {
- storage: Arc<Storage>,
- control_channel: Arc<C>,
- audio_channel: Option<Arc<A>>,
- event_sender: Sender<ClientEvent>,
- config: Config,
- session_id: u32,
- crypto_resyncs: u32,
- }
- pub struct Config {
- pub crypto_key: Key,
- pub server_nonce: Nonce,
- pub client_nonce: Nonce,
- pub alpha_codec_version: i32,
- pub beta_codec_version: i32,
- pub prefer_alpha: bool,
- pub opus_support: bool,
- pub welcome_text: String,
- pub max_bandwidth: u32,
- pub max_users: u32,
- pub allow_html: bool,
- pub max_message_length: u32,
- pub max_image_message_length: u32,
- pub max_username_length: u32,
- pub min_compatible_version: u32,
- pub server_password: Option<String>,
- pub pbkdf2_iterations: NonZeroU32,
- }
- pub enum HandlerError {
- IO(std::io::Error),
- EventReceiverClosed,
- }
- pub enum ConnectionSetupError {
- IO(std::io::Error),
- Reject(Reject),
- WrongPacket,
- }
- impl fmt::Display for ConnectionSetupError {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match &self {
- ConnectionSetupError::IO(e) => {
- write!(f, "{}", e)
- }
- ConnectionSetupError::Reject(r) => {
- write!(f, "Reject: {}", r)
- }
- ConnectionSetupError::WrongPacket => {
- write!(f, "Wrong packet")
- }
- }
- }
- }
- pub enum Reject {
- InvalidUsername,
- UsernameInUse,
- _WrongVersion,
- WrongUserPassword,
- _WrongServerPassword,
- _NoCertificate,
- }
- impl fmt::Display for Reject {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- Reject::InvalidUsername => write!(f, "Invalid username"),
- Reject::UsernameInUse => write!(f, "Username already in use"),
- Reject::_WrongVersion => write!(f, "Wrong version"),
- Reject::WrongUserPassword => write!(f, "Wrong user password"),
- Reject::_WrongServerPassword => write!(f, "Wrong server password"),
- Reject::_NoCertificate => write!(f, "No TLS certificate"),
- }
- }
- }
- impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
- pub fn new(
- storage: Arc<Storage>,
- control_channel: Arc<C>,
- event_sender: Sender<ClientEvent>,
- session_id: u32,
- config: Config,
- ) -> Self {
- Handler {
- storage,
- control_channel,
- audio_channel: None,
- event_sender,
- session_id,
- crypto_resyncs: 0,
- config,
- }
- }
- }
- impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
- pub fn set_audio_channel(&mut self, channel: Arc<A>) {
- self.audio_channel = Some(channel);
- }
- pub async fn handle_new_connection(&self) -> Result<(), ConnectionSetupError> {
- match self.control_channel.receive().await? {
- ControlPacket::Version(_) => {
- // TODO check version
- }
- _ => return Err(ConnectionSetupError::WrongPacket),
- };
- // TODO
- let auth = match self.control_channel.receive().await? {
- ControlPacket::Authenticate(auth) => auth,
- _ => return Err(ConnectionSetupError::WrongPacket),
- };
- self.authenticate(*auth).await?;
- let mut version = Version::new();
- version.set_version(0x010300);
- let mut crypt_setup = CryptSetup::new();
- crypt_setup.set_key(Vec::from(self.config.crypto_key));
- crypt_setup.set_client_nonce(Vec::from(self.config.client_nonce));
- crypt_setup.set_server_nonce(Vec::from(self.config.server_nonce));
- let channel_states = self.storage.get_channels().into_iter().map(|channel| {
- let mut channel_state = ChannelState::new();
- channel_state.set_channel_id(channel.id);
- channel_state.set_name(channel.name);
- channel_state
- });
- let user_states: Vec<UserState> = self.get_user_states();
- let mut codec_version = CodecVersion::new();
- codec_version.set_alpha(self.config.alpha_codec_version);
- codec_version.set_beta(self.config.beta_codec_version);
- codec_version.set_prefer_alpha(self.config.prefer_alpha);
- codec_version.set_opus(self.config.opus_support);
- let mut server_sync = ServerSync::new();
- server_sync.set_session(self.session_id);
- server_sync.set_max_bandwidth(self.config.max_bandwidth);
- server_sync.set_welcome_text(self.config.welcome_text.clone());
- server_sync.set_permissions(u64::MAX);
- let mut server_config = ServerConfig::new();
- server_config.set_max_users(self.config.max_users);
- server_config.set_message_length(self.config.max_message_length);
- self.control_channel.send(version.into()).await?;
- self.control_channel.send(crypt_setup.into()).await?;
- for channel_state in channel_states {
- self.control_channel.send(channel_state.into()).await?;
- }
- for user_state in user_states.into_iter() {
- self.control_channel.send(user_state.into()).await?;
- }
- self.control_channel.send(codec_version.into()).await?;
- self.control_channel.send(server_sync.into()).await?;
- self.control_channel.send(server_config.into()).await?;
- Ok(())
- }
- pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), HandlerError> {
- match event {
- ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?,
- ServerEvent::StateChanged(state) => self.user_state_changed(state).await?,
- ServerEvent::Talking(voice_packet) => self.user_talking(voice_packet).await?,
- ServerEvent::Disconnected(session_id) => self.user_disconnected(session_id).await?,
- ServerEvent::TextMessage(message) => self.user_text_message(message).await?,
- }
- Ok(())
- }
- pub async fn handle_message(
- &self,
- packet: ControlPacket<Serverbound>,
- ) -> Result<(), HandlerError> {
- match packet {
- ControlPacket::Ping(ping) => self.control_ping(*ping).await?,
- ControlPacket::TextMessage(message) => self.text_message(*message).await?,
- ControlPacket::UserState(state) => self.user_state(*state).await?,
- ControlPacket::UDPTunnel(tunnel) => self.handle_audio_packet(*tunnel).await?,
- pkt => error!("No handler for {:?}", pkt),
- }
- Ok(())
- }
- pub async fn handle_audio_packet(
- &self,
- packet: VoicePacket<Serverbound>,
- ) -> Result<(), HandlerError> {
- match packet {
- VoicePacket::Ping { timestamp } => {
- if let Some(channel) = self.audio_channel.as_ref() {
- channel.send(VoicePacket::Ping { timestamp }).await?;
- }
- }
- VoicePacket::Audio {
- target,
- seq_num,
- payload,
- position_info,
- ..
- } => {
- let packet = VoicePacket::Audio {
- _dst: PhantomData,
- target,
- session_id: self.session_id,
- seq_num,
- payload,
- position_info,
- };
- self.event_sender.send(ClientEvent::Talking(packet)).await?;
- }
- }
- Ok(())
- }
- pub async fn self_disconnected(&self) -> Result<(), HandlerError> {
- self.storage.remove_by_session_id(self.session_id);
- self.event_sender.send(ClientEvent::Disconnected).await?;
- Ok(())
- }
- // Control packets
- async fn control_ping(&self, incoming: Ping) -> Result<(), HandlerError> {
- let mut ping = Ping::new();
- ping.set_timestamp(incoming.get_timestamp());
- if let Some(channel) = self.audio_channel.as_ref() {
- let stats = channel.get_stats();
- ping.set_good(stats.good);
- ping.set_late(stats.late);
- ping.set_lost(stats.lost);
- ping.set_resync(self.crypto_resyncs);
- }
- self.control_channel.send(ping.into()).await?;
- Ok(())
- }
- async fn text_message(&self, mut message: TextMessage) -> Result<(), HandlerError> {
- if self.config.max_message_length < message.get_message().len() as u32 {
- // TODO send the permission denied message
- return Ok(());
- }
- if !message.has_actor() {
- message.set_actor(self.session_id);
- }
- self.event_sender
- .send(ClientEvent::TextMessage(message))
- .await?;
- Ok(())
- }
- async fn user_state(&self, mut state: UserState) -> Result<(), HandlerError> {
- if !state.has_session() {
- state.set_session(self.session_id);
- }
- let session_data = SessionData {
- muted_by_admin: state.get_mute(),
- deafened_by_admin: state.get_deaf(),
- suppressed: false,
- self_mute: state.get_self_mute(),
- self_deaf: state.get_self_deaf(),
- priority_speaker: false,
- recording: false,
- };
- self.storage
- .update_session_data(self.session_id, session_data);
- self.event_sender
- .send(ClientEvent::StateChanged(state))
- .await?;
- Ok(())
- }
- // Server events
- async fn new_user_connected(&self, session_id: u32) -> Result<(), HandlerError> {
- if let Some(user) = self.storage.get_connected_user(session_id) {
- let mut user_state = UserState::new();
- user_state.set_session(session_id);
- user_state.set_name(user.username);
- user_state.set_channel_id(user.channel_id);
- self.control_channel.send(user_state.into()).await?;
- } else if let Some(guest) = self.storage.get_guest(session_id) {
- let mut user_state = UserState::new();
- user_state.set_session(session_id);
- user_state.set_name(guest.username);
- user_state.set_channel_id(guest.channel_id);
- self.control_channel.send(user_state.into()).await?;
- }
- Ok(())
- }
- async fn user_state_changed(&self, state: UserState) -> Result<(), HandlerError> {
- self.control_channel.send(state.into()).await?;
- Ok(())
- }
- async fn user_talking(
- &self,
- voice_packet: VoicePacket<Clientbound>,
- ) -> Result<(), HandlerError> {
- if let Some(data) = self.storage.get_session_data(self.session_id) {
- if data.self_deaf || data.deafened_by_admin {
- return Ok(());
- }
- }
- if let Some(channel) = self.audio_channel.as_ref() {
- channel.send(voice_packet).await?;
- } else {
- self.control_channel.send(voice_packet.into()).await?;
- }
- Ok(())
- }
- async fn user_disconnected(&self, session_id: u32) -> Result<(), HandlerError> {
- let mut user_remove = UserRemove::new();
- user_remove.set_session(session_id);
- Ok(self.control_channel.send(user_remove.into()).await?)
- }
- async fn user_text_message(&self, message: TextMessage) -> Result<(), HandlerError> {
- self.control_channel.send(message.into()).await?;
- Ok(())
- }
- // Utils
- async fn authenticate(&self, auth: Authenticate) -> Result<(), ConnectionSetupError> {
- if !auth.has_username() {
- return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
- }
- let username = auth.get_username();
- if !validate_username(&username, self.config.max_username_length as usize) {
- return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
- }
- if self.storage.username_in_connected(&username) {
- return Err(ConnectionSetupError::Reject(Reject::UsernameInUse));
- }
- let user = match self.storage.get_user_by_username(username.into()) {
- Some(user) => user,
- None => {
- self.storage
- .add_guest(Guest::new(username.into(), self.session_id, 0));
- return Ok(());
- }
- };
- if let (Some(stored_password_hash), Some(iterations), Some(salt)) = (
- &user.password_hash,
- user.pbkdf2_iterations,
- &user.password_salt,
- ) {
- if !auth.has_password() {
- return Err(ConnectionSetupError::Reject(Reject::WrongUserPassword));
- }
- let password = auth.get_password();
- pbkdf2::verify(
- PBKDF2_ALGORITHM,
- iterations,
- salt,
- password.as_bytes(),
- stored_password_hash,
- )
- .map_err(|_| ConnectionSetupError::Reject(Reject::WrongUserPassword))?;
- }
- self.storage.add_connected_user(user, self.session_id);
- Ok(())
- }
- fn get_user_states(&self) -> Vec<UserState> {
- let guests = self.storage.get_guests();
- let users = self.storage.get_connected_users();
- let mut states = Vec::with_capacity(guests.len() + users.len());
- for guest in guests {
- let mut state = UserState::new();
- state.set_session(guest.session_id);
- state.set_name(guest.username);
- state.set_channel_id(guest.channel_id);
- states.push(state);
- }
- for (session_id, user) in users {
- let mut state = UserState::new();
- state.set_session(session_id);
- state.set_name(user.username);
- state.set_channel_id(user.channel_id);
- states.push(state);
- }
- states
- }
- }
- fn validate_username(username: &str, max_username_length: usize) -> bool {
- !username.is_empty()
- && username.trim().len() == username.len()
- && username.len() <= max_username_length
- }
- impl From<Error> for HandlerError {
- fn from(err: Error) -> Self {
- HandlerError::IO(err)
- }
- }
- impl From<Error> for ConnectionSetupError {
- fn from(err: Error) -> Self {
- ConnectionSetupError::IO(err)
- }
- }
- impl From<SendError<ClientEvent>> for HandlerError {
- fn from(_: SendError<ClientEvent>) -> Self {
- HandlerError::EventReceiverClosed
- }
- }
|