handler.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. use crate::protocol::connection::{AudioChannel, ControlChannel};
  2. use crate::server::client::client_worker::{ClientEvent, ServerEvent};
  3. use crate::storage::{Guest, SessionData, Storage};
  4. use log::error;
  5. use mumble_protocol::control::msgs::{
  6. Authenticate, ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync,
  7. TextMessage, UserRemove, UserState, Version,
  8. };
  9. use mumble_protocol::control::ControlPacket;
  10. use mumble_protocol::voice::{Clientbound, Serverbound, VoicePacket};
  11. use ring::pbkdf2;
  12. use std::fmt;
  13. use std::io::Error;
  14. use std::marker::PhantomData;
  15. use std::num::NonZeroU32;
  16. use std::sync::Arc;
  17. use tokio::sync::mpsc::error::SendError;
  18. use tokio::sync::mpsc::Sender;
  19. static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
  20. type Key = [u8; 16];
  21. type Nonce = [u8; 16];
  22. pub struct Handler<C: ControlChannel, A: AudioChannel> {
  23. storage: Arc<Storage>,
  24. control_channel: Arc<C>,
  25. audio_channel: Option<Arc<A>>,
  26. event_sender: Sender<ClientEvent>,
  27. config: Config,
  28. session_id: u32,
  29. crypto_resyncs: u32,
  30. }
  31. pub struct Config {
  32. pub crypto_key: Key,
  33. pub server_nonce: Nonce,
  34. pub client_nonce: Nonce,
  35. pub alpha_codec_version: i32,
  36. pub beta_codec_version: i32,
  37. pub prefer_alpha: bool,
  38. pub opus_support: bool,
  39. pub welcome_text: String,
  40. pub max_bandwidth: u32,
  41. pub max_users: u32,
  42. pub allow_html: bool,
  43. pub max_message_length: u32,
  44. pub max_image_message_length: u32,
  45. pub max_username_length: u32,
  46. pub min_compatible_version: u32,
  47. pub server_password: Option<String>,
  48. pub pbkdf2_iterations: NonZeroU32,
  49. }
  50. pub enum HandlerError {
  51. IO(std::io::Error),
  52. EventReceiverClosed,
  53. }
  54. pub enum ConnectionSetupError {
  55. IO(std::io::Error),
  56. Reject(Reject),
  57. WrongPacket,
  58. }
  59. impl fmt::Display for ConnectionSetupError {
  60. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  61. match &self {
  62. ConnectionSetupError::IO(e) => {
  63. write!(f, "{}", e)
  64. }
  65. ConnectionSetupError::Reject(r) => {
  66. write!(f, "Reject: {}", r)
  67. }
  68. ConnectionSetupError::WrongPacket => {
  69. write!(f, "Wrong packet")
  70. }
  71. }
  72. }
  73. }
  74. pub enum Reject {
  75. InvalidUsername,
  76. UsernameInUse,
  77. _WrongVersion,
  78. WrongUserPassword,
  79. _WrongServerPassword,
  80. _NoCertificate,
  81. }
  82. impl fmt::Display for Reject {
  83. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  84. match self {
  85. Reject::InvalidUsername => write!(f, "Invalid username"),
  86. Reject::UsernameInUse => write!(f, "Username already in use"),
  87. Reject::_WrongVersion => write!(f, "Wrong version"),
  88. Reject::WrongUserPassword => write!(f, "Wrong user password"),
  89. Reject::_WrongServerPassword => write!(f, "Wrong server password"),
  90. Reject::_NoCertificate => write!(f, "No TLS certificate"),
  91. }
  92. }
  93. }
  94. impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
  95. pub fn new(
  96. storage: Arc<Storage>,
  97. control_channel: Arc<C>,
  98. event_sender: Sender<ClientEvent>,
  99. session_id: u32,
  100. config: Config,
  101. ) -> Self {
  102. Handler {
  103. storage,
  104. control_channel,
  105. audio_channel: None,
  106. event_sender,
  107. session_id,
  108. crypto_resyncs: 0,
  109. config,
  110. }
  111. }
  112. }
  113. impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
  114. pub fn set_audio_channel(&mut self, channel: Arc<A>) {
  115. self.audio_channel = Some(channel);
  116. }
  117. pub async fn handle_new_connection(&self) -> Result<(), ConnectionSetupError> {
  118. match self.control_channel.receive().await? {
  119. ControlPacket::Version(_) => {
  120. // TODO check version
  121. }
  122. _ => return Err(ConnectionSetupError::WrongPacket),
  123. };
  124. // TODO
  125. let auth = match self.control_channel.receive().await? {
  126. ControlPacket::Authenticate(auth) => auth,
  127. _ => return Err(ConnectionSetupError::WrongPacket),
  128. };
  129. self.authenticate(*auth).await?;
  130. let mut version = Version::new();
  131. version.set_version(0x010300);
  132. let mut crypt_setup = CryptSetup::new();
  133. crypt_setup.set_key(Vec::from(self.config.crypto_key));
  134. crypt_setup.set_client_nonce(Vec::from(self.config.client_nonce));
  135. crypt_setup.set_server_nonce(Vec::from(self.config.server_nonce));
  136. let channel_states = self.storage.get_channels().into_iter().map(|channel| {
  137. let mut channel_state = ChannelState::new();
  138. channel_state.set_channel_id(channel.id);
  139. channel_state.set_name(channel.name);
  140. channel_state
  141. });
  142. let user_states: Vec<UserState> = self.get_user_states();
  143. let mut codec_version = CodecVersion::new();
  144. codec_version.set_alpha(self.config.alpha_codec_version);
  145. codec_version.set_beta(self.config.beta_codec_version);
  146. codec_version.set_prefer_alpha(self.config.prefer_alpha);
  147. codec_version.set_opus(self.config.opus_support);
  148. let mut server_sync = ServerSync::new();
  149. server_sync.set_session(self.session_id);
  150. server_sync.set_max_bandwidth(self.config.max_bandwidth);
  151. server_sync.set_welcome_text(self.config.welcome_text.clone());
  152. server_sync.set_permissions(u64::MAX);
  153. let mut server_config = ServerConfig::new();
  154. server_config.set_max_users(self.config.max_users);
  155. server_config.set_message_length(self.config.max_message_length);
  156. self.control_channel.send(version.into()).await?;
  157. self.control_channel.send(crypt_setup.into()).await?;
  158. for channel_state in channel_states {
  159. self.control_channel.send(channel_state.into()).await?;
  160. }
  161. for user_state in user_states.into_iter() {
  162. self.control_channel.send(user_state.into()).await?;
  163. }
  164. self.control_channel.send(codec_version.into()).await?;
  165. self.control_channel.send(server_sync.into()).await?;
  166. self.control_channel.send(server_config.into()).await?;
  167. Ok(())
  168. }
  169. pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), HandlerError> {
  170. match event {
  171. ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?,
  172. ServerEvent::StateChanged(state) => self.user_state_changed(state).await?,
  173. ServerEvent::Talking(voice_packet) => self.user_talking(voice_packet).await?,
  174. ServerEvent::Disconnected(session_id) => self.user_disconnected(session_id).await?,
  175. ServerEvent::TextMessage(message) => self.user_text_message(message).await?,
  176. }
  177. Ok(())
  178. }
  179. pub async fn handle_message(
  180. &self,
  181. packet: ControlPacket<Serverbound>,
  182. ) -> Result<(), HandlerError> {
  183. match packet {
  184. ControlPacket::Ping(ping) => self.control_ping(*ping).await?,
  185. ControlPacket::TextMessage(message) => self.text_message(*message).await?,
  186. ControlPacket::UserState(state) => self.user_state(*state).await?,
  187. ControlPacket::UDPTunnel(tunnel) => self.handle_audio_packet(*tunnel).await?,
  188. pkt => error!("No handler for {:?}", pkt),
  189. }
  190. Ok(())
  191. }
  192. pub async fn handle_audio_packet(
  193. &self,
  194. packet: VoicePacket<Serverbound>,
  195. ) -> Result<(), HandlerError> {
  196. match packet {
  197. VoicePacket::Ping { timestamp } => {
  198. if let Some(channel) = self.audio_channel.as_ref() {
  199. channel.send(VoicePacket::Ping { timestamp }).await?;
  200. }
  201. }
  202. VoicePacket::Audio {
  203. target,
  204. seq_num,
  205. payload,
  206. position_info,
  207. ..
  208. } => {
  209. let packet = VoicePacket::Audio {
  210. _dst: PhantomData,
  211. target,
  212. session_id: self.session_id,
  213. seq_num,
  214. payload,
  215. position_info,
  216. };
  217. self.event_sender.send(ClientEvent::Talking(packet)).await?;
  218. }
  219. }
  220. Ok(())
  221. }
  222. pub async fn self_disconnected(&self) -> Result<(), HandlerError> {
  223. self.storage.remove_by_session_id(self.session_id);
  224. self.event_sender.send(ClientEvent::Disconnected).await?;
  225. Ok(())
  226. }
  227. // Control packets
  228. async fn control_ping(&self, incoming: Ping) -> Result<(), HandlerError> {
  229. let mut ping = Ping::new();
  230. ping.set_timestamp(incoming.get_timestamp());
  231. if let Some(channel) = self.audio_channel.as_ref() {
  232. let stats = channel.get_stats();
  233. ping.set_good(stats.good);
  234. ping.set_late(stats.late);
  235. ping.set_lost(stats.lost);
  236. ping.set_resync(self.crypto_resyncs);
  237. }
  238. self.control_channel.send(ping.into()).await?;
  239. Ok(())
  240. }
  241. async fn text_message(&self, mut message: TextMessage) -> Result<(), HandlerError> {
  242. if self.config.max_message_length < message.get_message().len() as u32 {
  243. // TODO send the permission denied message
  244. return Ok(());
  245. }
  246. if !message.has_actor() {
  247. message.set_actor(self.session_id);
  248. }
  249. self.event_sender
  250. .send(ClientEvent::TextMessage(message))
  251. .await?;
  252. Ok(())
  253. }
  254. async fn user_state(&self, mut state: UserState) -> Result<(), HandlerError> {
  255. if !state.has_session() {
  256. state.set_session(self.session_id);
  257. }
  258. let session_data = SessionData {
  259. muted_by_admin: state.get_mute(),
  260. deafened_by_admin: state.get_deaf(),
  261. suppressed: false,
  262. self_mute: state.get_self_mute(),
  263. self_deaf: state.get_self_deaf(),
  264. priority_speaker: false,
  265. recording: false,
  266. };
  267. self.storage
  268. .update_session_data(self.session_id, session_data);
  269. self.event_sender
  270. .send(ClientEvent::StateChanged(state))
  271. .await?;
  272. Ok(())
  273. }
  274. // Server events
  275. async fn new_user_connected(&self, session_id: u32) -> Result<(), HandlerError> {
  276. if let Some(user) = self.storage.get_connected_user(session_id) {
  277. let mut user_state = UserState::new();
  278. user_state.set_session(session_id);
  279. user_state.set_name(user.username);
  280. user_state.set_channel_id(user.channel_id);
  281. self.control_channel.send(user_state.into()).await?;
  282. } else if let Some(guest) = self.storage.get_guest(session_id) {
  283. let mut user_state = UserState::new();
  284. user_state.set_session(session_id);
  285. user_state.set_name(guest.username);
  286. user_state.set_channel_id(guest.channel_id);
  287. self.control_channel.send(user_state.into()).await?;
  288. }
  289. Ok(())
  290. }
  291. async fn user_state_changed(&self, state: UserState) -> Result<(), HandlerError> {
  292. self.control_channel.send(state.into()).await?;
  293. Ok(())
  294. }
  295. async fn user_talking(
  296. &self,
  297. voice_packet: VoicePacket<Clientbound>,
  298. ) -> Result<(), HandlerError> {
  299. if let Some(data) = self.storage.get_session_data(self.session_id) {
  300. if data.self_deaf || data.deafened_by_admin {
  301. return Ok(());
  302. }
  303. }
  304. if let Some(channel) = self.audio_channel.as_ref() {
  305. channel.send(voice_packet).await?;
  306. } else {
  307. self.control_channel.send(voice_packet.into()).await?;
  308. }
  309. Ok(())
  310. }
  311. async fn user_disconnected(&self, session_id: u32) -> Result<(), HandlerError> {
  312. let mut user_remove = UserRemove::new();
  313. user_remove.set_session(session_id);
  314. Ok(self.control_channel.send(user_remove.into()).await?)
  315. }
  316. async fn user_text_message(&self, message: TextMessage) -> Result<(), HandlerError> {
  317. self.control_channel.send(message.into()).await?;
  318. Ok(())
  319. }
  320. // Utils
  321. async fn authenticate(&self, auth: Authenticate) -> Result<(), ConnectionSetupError> {
  322. if !auth.has_username() {
  323. return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
  324. }
  325. let username = auth.get_username();
  326. if !validate_username(&username, self.config.max_username_length as usize) {
  327. return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
  328. }
  329. if self.storage.username_in_connected(&username) {
  330. return Err(ConnectionSetupError::Reject(Reject::UsernameInUse));
  331. }
  332. let user = match self.storage.get_user_by_username(username.into()) {
  333. Some(user) => user,
  334. None => {
  335. self.storage
  336. .add_guest(Guest::new(username.into(), self.session_id, 0));
  337. return Ok(());
  338. }
  339. };
  340. if let (Some(stored_password_hash), Some(iterations), Some(salt)) = (
  341. &user.password_hash,
  342. user.pbkdf2_iterations,
  343. &user.password_salt,
  344. ) {
  345. if !auth.has_password() {
  346. return Err(ConnectionSetupError::Reject(Reject::WrongUserPassword));
  347. }
  348. let password = auth.get_password();
  349. pbkdf2::verify(
  350. PBKDF2_ALGORITHM,
  351. iterations,
  352. salt,
  353. password.as_bytes(),
  354. stored_password_hash,
  355. )
  356. .map_err(|_| ConnectionSetupError::Reject(Reject::WrongUserPassword))?;
  357. }
  358. self.storage.add_connected_user(user, self.session_id);
  359. Ok(())
  360. }
  361. fn get_user_states(&self) -> Vec<UserState> {
  362. let guests = self.storage.get_guests();
  363. let users = self.storage.get_connected_users();
  364. let mut states = Vec::with_capacity(guests.len() + users.len());
  365. for guest in guests {
  366. let mut state = UserState::new();
  367. state.set_session(guest.session_id);
  368. state.set_name(guest.username);
  369. state.set_channel_id(guest.channel_id);
  370. states.push(state);
  371. }
  372. for (session_id, user) in users {
  373. let mut state = UserState::new();
  374. state.set_session(session_id);
  375. state.set_name(user.username);
  376. state.set_channel_id(user.channel_id);
  377. states.push(state);
  378. }
  379. states
  380. }
  381. }
  382. fn validate_username(username: &str, max_username_length: usize) -> bool {
  383. !username.is_empty()
  384. && username.trim().len() == username.len()
  385. && username.len() <= max_username_length
  386. }
  387. impl From<Error> for HandlerError {
  388. fn from(err: Error) -> Self {
  389. HandlerError::IO(err)
  390. }
  391. }
  392. impl From<Error> for ConnectionSetupError {
  393. fn from(err: Error) -> Self {
  394. ConnectionSetupError::IO(err)
  395. }
  396. }
  397. impl From<SendError<ClientEvent>> for HandlerError {
  398. fn from(_: SendError<ClientEvent>) -> Self {
  399. HandlerError::EventReceiverClosed
  400. }
  401. }