handler.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. use crate::protocol::connection::{AudioChannel, ControlChannel};
  2. use crate::server::client::client_worker::{ClientEvent, ServerEvent};
  3. use crate::storage::{Guest, SessionData, Storage};
  4. use log::error;
  5. use mumble_protocol::control::msgs::{
  6. Authenticate, ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync,
  7. TextMessage, UserRemove, UserState, Version,
  8. };
  9. use mumble_protocol::control::ControlPacket;
  10. use mumble_protocol::voice::{Clientbound, Serverbound, VoicePacket};
  11. use ring::pbkdf2;
  12. use std::fmt;
  13. use std::io::Error;
  14. use std::marker::PhantomData;
  15. use std::num::NonZeroU32;
  16. use std::sync::Arc;
  17. use tokio::sync::mpsc::error::SendError;
  18. use tokio::sync::mpsc::Sender;
  19. static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
  20. type Key = [u8; 16];
  21. type Nonce = [u8; 16];
  22. pub struct Handler<C: ControlChannel, A: AudioChannel> {
  23. storage: Arc<Storage>,
  24. control_channel: Arc<C>,
  25. audio_channel: Option<Arc<A>>,
  26. event_sender: Sender<ClientEvent>,
  27. config: Config,
  28. session_id: u32,
  29. crypto_resyncs: u32,
  30. }
  31. pub struct Config {
  32. pub crypto_key: Key,
  33. pub server_nonce: Nonce,
  34. pub client_nonce: Nonce,
  35. pub alpha_codec_version: i32,
  36. pub beta_codec_version: i32,
  37. pub prefer_alpha: bool,
  38. pub opus_support: bool,
  39. pub welcome_text: String,
  40. pub max_bandwidth: u32,
  41. pub max_users: u32,
  42. pub allow_html: bool,
  43. pub max_message_length: u32,
  44. pub max_image_message_length: u32,
  45. pub max_username_length: u32,
  46. pub min_compatible_version: u32,
  47. pub server_password: Option<String>,
  48. pub pbkdf2_iterations: NonZeroU32,
  49. }
  50. pub enum HandlerError {
  51. IO(std::io::Error),
  52. EventReceiverClosed,
  53. }
  54. impl fmt::Display for HandlerError {
  55. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  56. match &self {
  57. HandlerError::IO(e) => {
  58. write!(f, "{}", e)
  59. }
  60. HandlerError::EventReceiverClosed => {
  61. write!(f, "Server event receiver have been dropped")
  62. }
  63. }
  64. }
  65. }
  66. pub enum ConnectionSetupError {
  67. IO(std::io::Error),
  68. Reject(Reject),
  69. WrongPacket,
  70. }
  71. impl fmt::Display for ConnectionSetupError {
  72. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  73. match &self {
  74. ConnectionSetupError::IO(e) => {
  75. write!(f, "{}", e)
  76. }
  77. ConnectionSetupError::Reject(r) => {
  78. write!(f, "Reject: {}", r)
  79. }
  80. ConnectionSetupError::WrongPacket => {
  81. write!(f, "Wrong packet")
  82. }
  83. }
  84. }
  85. }
  86. pub enum Reject {
  87. InvalidUsername,
  88. UsernameInUse,
  89. _WrongVersion,
  90. WrongUserPassword,
  91. _WrongServerPassword,
  92. _NoCertificate,
  93. }
  94. impl fmt::Display for Reject {
  95. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  96. match self {
  97. Reject::InvalidUsername => write!(f, "Invalid username"),
  98. Reject::UsernameInUse => write!(f, "Username already in use"),
  99. Reject::_WrongVersion => write!(f, "Wrong version"),
  100. Reject::WrongUserPassword => write!(f, "Wrong user password"),
  101. Reject::_WrongServerPassword => write!(f, "Wrong server password"),
  102. Reject::_NoCertificate => write!(f, "No TLS certificate"),
  103. }
  104. }
  105. }
  106. impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
  107. pub fn new(
  108. storage: Arc<Storage>,
  109. control_channel: Arc<C>,
  110. event_sender: Sender<ClientEvent>,
  111. session_id: u32,
  112. config: Config,
  113. ) -> Self {
  114. Handler {
  115. storage,
  116. control_channel,
  117. audio_channel: None,
  118. event_sender,
  119. session_id,
  120. crypto_resyncs: 0,
  121. config,
  122. }
  123. }
  124. }
  125. impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
  126. pub fn set_audio_channel(&mut self, channel: Arc<A>) {
  127. self.audio_channel = Some(channel);
  128. }
  129. pub async fn handle_new_connection(&self) -> Result<(), ConnectionSetupError> {
  130. match self.control_channel.receive().await? {
  131. ControlPacket::Version(_) => {
  132. // TODO check version
  133. }
  134. _ => return Err(ConnectionSetupError::WrongPacket),
  135. };
  136. // TODO
  137. let auth = match self.control_channel.receive().await? {
  138. ControlPacket::Authenticate(auth) => auth,
  139. _ => return Err(ConnectionSetupError::WrongPacket),
  140. };
  141. self.authenticate(*auth).await?;
  142. let mut version = Version::new();
  143. version.set_version(0x010300);
  144. let mut crypt_setup = CryptSetup::new();
  145. crypt_setup.set_key(Vec::from(self.config.crypto_key));
  146. crypt_setup.set_client_nonce(Vec::from(self.config.client_nonce));
  147. crypt_setup.set_server_nonce(Vec::from(self.config.server_nonce));
  148. let channel_states = self.storage.get_channels().into_iter().map(|channel| {
  149. let mut channel_state = ChannelState::new();
  150. channel_state.set_channel_id(channel.id);
  151. channel_state.set_name(channel.name);
  152. channel_state
  153. });
  154. let user_states: Vec<UserState> = self.get_user_states();
  155. let mut codec_version = CodecVersion::new();
  156. codec_version.set_alpha(self.config.alpha_codec_version);
  157. codec_version.set_beta(self.config.beta_codec_version);
  158. codec_version.set_prefer_alpha(self.config.prefer_alpha);
  159. codec_version.set_opus(self.config.opus_support);
  160. let mut server_sync = ServerSync::new();
  161. server_sync.set_session(self.session_id);
  162. server_sync.set_max_bandwidth(self.config.max_bandwidth);
  163. server_sync.set_welcome_text(self.config.welcome_text.clone());
  164. server_sync.set_permissions(u64::MAX);
  165. let mut server_config = ServerConfig::new();
  166. server_config.set_max_users(self.config.max_users);
  167. server_config.set_message_length(self.config.max_message_length);
  168. self.control_channel.send(version.into()).await?;
  169. self.control_channel.send(crypt_setup.into()).await?;
  170. for channel_state in channel_states {
  171. self.control_channel.send(channel_state.into()).await?;
  172. }
  173. for user_state in user_states.into_iter() {
  174. self.control_channel.send(user_state.into()).await?;
  175. }
  176. self.control_channel.send(codec_version.into()).await?;
  177. self.control_channel.send(server_sync.into()).await?;
  178. self.control_channel.send(server_config.into()).await?;
  179. Ok(())
  180. }
  181. pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), HandlerError> {
  182. match event {
  183. ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?,
  184. ServerEvent::StateChanged(state) => self.user_state_changed(state).await?,
  185. ServerEvent::Talking(voice_packet) => self.user_talking(voice_packet).await?,
  186. ServerEvent::Disconnected(session_id) => self.user_disconnected(session_id).await?,
  187. ServerEvent::TextMessage(message) => self.user_text_message(message).await?,
  188. }
  189. Ok(())
  190. }
  191. pub async fn handle_message(
  192. &self,
  193. packet: ControlPacket<Serverbound>,
  194. ) -> Result<(), HandlerError> {
  195. match packet {
  196. ControlPacket::Ping(ping) => self.control_ping(*ping).await?,
  197. ControlPacket::TextMessage(message) => self.text_message(*message).await?,
  198. ControlPacket::UserState(state) => self.user_state(*state).await?,
  199. ControlPacket::UDPTunnel(tunnel) => self.handle_audio_packet(*tunnel).await?,
  200. pkt => error!("No handler for {:?}", pkt),
  201. }
  202. Ok(())
  203. }
  204. pub async fn handle_audio_packet(
  205. &self,
  206. packet: VoicePacket<Serverbound>,
  207. ) -> Result<(), HandlerError> {
  208. match packet {
  209. VoicePacket::Ping { timestamp } => {
  210. if let Some(channel) = self.audio_channel.as_ref() {
  211. channel.send(VoicePacket::Ping { timestamp }).await?;
  212. }
  213. }
  214. VoicePacket::Audio {
  215. target,
  216. seq_num,
  217. payload,
  218. position_info,
  219. ..
  220. } => {
  221. let packet = VoicePacket::Audio {
  222. _dst: PhantomData,
  223. target,
  224. session_id: self.session_id,
  225. seq_num,
  226. payload,
  227. position_info,
  228. };
  229. self.event_sender.send(ClientEvent::Talking(packet)).await?;
  230. }
  231. }
  232. Ok(())
  233. }
  234. pub async fn self_disconnected(&self) -> Result<(), HandlerError> {
  235. self.storage.remove_by_session_id(self.session_id);
  236. self.event_sender.send(ClientEvent::Disconnected).await?;
  237. Ok(())
  238. }
  239. // Control packets
  240. async fn control_ping(&self, incoming: Ping) -> Result<(), HandlerError> {
  241. let mut ping = Ping::new();
  242. ping.set_timestamp(incoming.get_timestamp());
  243. if let Some(channel) = self.audio_channel.as_ref() {
  244. let stats = channel.get_stats();
  245. ping.set_good(stats.good);
  246. ping.set_late(stats.late);
  247. ping.set_lost(stats.lost);
  248. ping.set_resync(self.crypto_resyncs);
  249. }
  250. self.control_channel.send(ping.into()).await?;
  251. Ok(())
  252. }
  253. async fn text_message(&self, mut message: TextMessage) -> Result<(), HandlerError> {
  254. if self.config.max_message_length < message.get_message().len() as u32 {
  255. // TODO send the permission denied message
  256. return Ok(());
  257. }
  258. if !message.has_actor() {
  259. message.set_actor(self.session_id);
  260. }
  261. self.event_sender
  262. .send(ClientEvent::TextMessage(message))
  263. .await?;
  264. Ok(())
  265. }
  266. async fn user_state(&self, mut state: UserState) -> Result<(), HandlerError> {
  267. if !state.has_session() {
  268. state.set_session(self.session_id);
  269. }
  270. let session_data = SessionData {
  271. muted_by_admin: state.get_mute(),
  272. deafened_by_admin: state.get_deaf(),
  273. suppressed: false,
  274. self_mute: state.get_self_mute(),
  275. self_deaf: state.get_self_deaf(),
  276. priority_speaker: false,
  277. recording: false,
  278. };
  279. self.storage
  280. .update_session_data(self.session_id, session_data);
  281. self.event_sender
  282. .send(ClientEvent::StateChanged(state))
  283. .await?;
  284. Ok(())
  285. }
  286. // Server events
  287. async fn new_user_connected(&self, session_id: u32) -> Result<(), HandlerError> {
  288. if let Some(user) = self.storage.get_connected_user(session_id) {
  289. let mut user_state = UserState::new();
  290. user_state.set_session(session_id);
  291. user_state.set_name(user.username);
  292. user_state.set_channel_id(user.channel_id);
  293. self.control_channel.send(user_state.into()).await?;
  294. } else if let Some(guest) = self.storage.get_guest(session_id) {
  295. let mut user_state = UserState::new();
  296. user_state.set_session(session_id);
  297. user_state.set_name(guest.username);
  298. user_state.set_channel_id(guest.channel_id);
  299. self.control_channel.send(user_state.into()).await?;
  300. }
  301. Ok(())
  302. }
  303. async fn user_state_changed(&self, state: UserState) -> Result<(), HandlerError> {
  304. self.control_channel.send(state.into()).await?;
  305. Ok(())
  306. }
  307. async fn user_talking(
  308. &self,
  309. voice_packet: VoicePacket<Clientbound>,
  310. ) -> Result<(), HandlerError> {
  311. if let Some(data) = self.storage.get_session_data(self.session_id) {
  312. if data.self_deaf || data.deafened_by_admin {
  313. return Ok(());
  314. }
  315. }
  316. if let Some(channel) = self.audio_channel.as_ref() {
  317. channel.send(voice_packet).await?;
  318. } else {
  319. self.control_channel.send(voice_packet.into()).await?;
  320. }
  321. Ok(())
  322. }
  323. async fn user_disconnected(&self, session_id: u32) -> Result<(), HandlerError> {
  324. let mut user_remove = UserRemove::new();
  325. user_remove.set_session(session_id);
  326. Ok(self.control_channel.send(user_remove.into()).await?)
  327. }
  328. async fn user_text_message(&self, message: TextMessage) -> Result<(), HandlerError> {
  329. self.control_channel.send(message.into()).await?;
  330. Ok(())
  331. }
  332. // Utils
  333. async fn authenticate(&self, auth: Authenticate) -> Result<(), ConnectionSetupError> {
  334. if !auth.has_username() {
  335. return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
  336. }
  337. let username = auth.get_username();
  338. if !validate_username(&username, self.config.max_username_length as usize) {
  339. return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
  340. }
  341. if self.storage.username_in_connected(&username) {
  342. return Err(ConnectionSetupError::Reject(Reject::UsernameInUse));
  343. }
  344. let user = match self.storage.get_user_by_username(username.into()) {
  345. Some(user) => user,
  346. None => {
  347. self.storage
  348. .add_guest(Guest::new(username.into(), self.session_id, 0));
  349. return Ok(());
  350. }
  351. };
  352. if let (Some(stored_password_hash), Some(iterations), Some(salt)) = (
  353. &user.password_hash,
  354. user.pbkdf2_iterations,
  355. &user.password_salt,
  356. ) {
  357. if !auth.has_password() {
  358. return Err(ConnectionSetupError::Reject(Reject::WrongUserPassword));
  359. }
  360. let password = auth.get_password();
  361. pbkdf2::verify(
  362. PBKDF2_ALGORITHM,
  363. iterations,
  364. salt,
  365. password.as_bytes(),
  366. stored_password_hash,
  367. )
  368. .map_err(|_| ConnectionSetupError::Reject(Reject::WrongUserPassword))?;
  369. }
  370. self.storage.add_connected_user(user, self.session_id);
  371. Ok(())
  372. }
  373. fn get_user_states(&self) -> Vec<UserState> {
  374. let guests = self.storage.get_guests();
  375. let users = self.storage.get_connected_users();
  376. let mut states = Vec::with_capacity(guests.len() + users.len());
  377. for guest in guests {
  378. let mut state = UserState::new();
  379. state.set_session(guest.session_id);
  380. state.set_name(guest.username);
  381. state.set_channel_id(guest.channel_id);
  382. states.push(state);
  383. }
  384. for (session_id, user) in users {
  385. let mut state = UserState::new();
  386. state.set_session(session_id);
  387. state.set_name(user.username);
  388. state.set_channel_id(user.channel_id);
  389. states.push(state);
  390. }
  391. states
  392. }
  393. }
  394. fn validate_username(username: &str, max_username_length: usize) -> bool {
  395. !username.is_empty()
  396. && username.trim().len() == username.len()
  397. && username.len() <= max_username_length
  398. }
  399. impl From<Error> for HandlerError {
  400. fn from(err: Error) -> Self {
  401. HandlerError::IO(err)
  402. }
  403. }
  404. impl From<Error> for ConnectionSetupError {
  405. fn from(err: Error) -> Self {
  406. ConnectionSetupError::IO(err)
  407. }
  408. }
  409. impl From<SendError<ClientEvent>> for HandlerError {
  410. fn from(_: SendError<ClientEvent>) -> Self {
  411. HandlerError::EventReceiverClosed
  412. }
  413. }