client.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. use std::sync::Arc;
  2. use tokio::sync::mpsc;
  3. use tokio::sync::mpsc::{Receiver, Sender};
  4. use tokio::task::JoinHandle;
  5. use crate::client::Error::StreamError;
  6. use crate::connection::{AudioChannel, AudioChannelSender, ControlChannel, ControlChannelSender};
  7. use crate::db::{Db, User};
  8. use crate::proto::mumble::{
  9. ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync, UserRemove, UserState,
  10. Version,
  11. };
  12. use crate::protocol::{AudioData, AudioPacket, MumblePacket, MUMBLE_PROTOCOL_VERSION};
  13. pub struct Client {
  14. pub session_id: u32,
  15. inner_sender: Sender<InnerMessage>,
  16. handler_task: JoinHandle<()>,
  17. packet_task: JoinHandle<()>,
  18. audio_task: Option<JoinHandle<()>>,
  19. }
  20. pub struct Config {
  21. pub crypto_key: [u8; 16],
  22. pub server_nonce: [u8; 16],
  23. pub client_nonce: [u8; 16],
  24. pub alpha_codec_version: i32,
  25. pub beta_codec_version: i32,
  26. pub prefer_alpha: bool,
  27. pub opus_support: bool,
  28. pub welcome_text: String,
  29. pub max_bandwidth: u32,
  30. pub max_users: u32,
  31. pub allow_html: bool,
  32. pub max_message_length: u32,
  33. pub max_image_message_length: u32,
  34. }
  35. // from other connected users
  36. pub enum Message {
  37. UserConnected(u32),
  38. UserDisconnected(u32),
  39. UserTalking(AudioData),
  40. }
  41. // to other connected users
  42. pub enum ResponseMessage {
  43. Disconnected,
  44. Talking(AudioData),
  45. }
  46. pub enum Error {
  47. AuthenticationError,
  48. StreamError,
  49. WrongPacket,
  50. }
  51. struct Handler {
  52. session_id: u32,
  53. db: Arc<Db>,
  54. control_channel_sender: ControlChannelSender,
  55. audio_channel_sender: Option<AudioChannelSender>,
  56. response_sender: Sender<ResponseMessage>,
  57. is_audio_tunneling: bool,
  58. }
  59. enum InnerMessage {
  60. Message(Message),
  61. Packet(Box<MumblePacket>),
  62. Audio(AudioPacket),
  63. AudioChannel(AudioChannelSender),
  64. SelfDisconnected,
  65. }
  66. type Responder = Receiver<ResponseMessage>;
  67. impl Client {
  68. pub async fn establish_connection(
  69. db: Arc<Db>,
  70. mut control_channel: ControlChannel,
  71. config: Config,
  72. ) -> Result<(Self, Responder), Error> {
  73. match control_channel.receive().await? {
  74. MumblePacket::Version(version) => version,
  75. _ => return Err(Error::WrongPacket),
  76. };
  77. let mut auth = match control_channel.receive().await? {
  78. MumblePacket::Authenticate(auth) => auth,
  79. _ => return Err(Error::WrongPacket),
  80. };
  81. if !auth.has_username() {
  82. return Err(Error::AuthenticationError);
  83. }
  84. let session_id = db.add_new_user(auth.take_username()).await;
  85. let version = {
  86. let mut version = Version::new();
  87. version.set_version(MUMBLE_PROTOCOL_VERSION);
  88. MumblePacket::Version(version)
  89. };
  90. let crypt_setup = {
  91. let key = config.crypto_key;
  92. let server_nonce = config.server_nonce;
  93. let client_nonce = config.client_nonce;
  94. let mut crypt_setup = CryptSetup::new();
  95. crypt_setup.set_key(Vec::from(key));
  96. crypt_setup.set_server_nonce(Vec::from(server_nonce));
  97. crypt_setup.set_client_nonce(Vec::from(client_nonce));
  98. MumblePacket::CryptSetup(crypt_setup)
  99. };
  100. let codec_version = {
  101. let mut codec_version = CodecVersion::new();
  102. codec_version.set_alpha(config.alpha_codec_version);
  103. codec_version.set_beta(config.beta_codec_version);
  104. codec_version.set_prefer_alpha(config.prefer_alpha);
  105. codec_version.set_opus(config.opus_support);
  106. MumblePacket::CodecVersion(codec_version)
  107. };
  108. let channel_states: Vec<MumblePacket> = {
  109. db.get_channels()
  110. .await
  111. .into_iter()
  112. .map(|channel| {
  113. let mut channel_state = ChannelState::new();
  114. channel_state.set_channel_id(channel.id);
  115. channel_state.set_name(channel.name);
  116. MumblePacket::ChannelState(channel_state)
  117. })
  118. .collect()
  119. };
  120. let user_states: Vec<MumblePacket> = {
  121. db.get_connected_users()
  122. .await
  123. .into_iter()
  124. .map(|user| {
  125. let mut user_state = UserState::new();
  126. user_state.set_name(user.username);
  127. user_state.set_session(user.session_id);
  128. user_state.set_channel_id(user.channel_id);
  129. MumblePacket::UserState(user_state)
  130. })
  131. .collect()
  132. };
  133. let server_sync = {
  134. let mut server_sync = ServerSync::new();
  135. server_sync.set_session(session_id);
  136. server_sync.set_welcome_text(config.welcome_text);
  137. server_sync.set_max_bandwidth(config.max_bandwidth);
  138. MumblePacket::ServerSync(server_sync)
  139. };
  140. let server_config = {
  141. let mut server_config = ServerConfig::new();
  142. server_config.set_max_users(config.max_users);
  143. server_config.set_allow_html(config.allow_html);
  144. server_config.set_message_length(config.max_message_length);
  145. server_config.set_image_message_length(config.max_image_message_length);
  146. MumblePacket::ServerConfig(server_config)
  147. };
  148. control_channel.send(version).await?;
  149. control_channel.send(crypt_setup).await?;
  150. control_channel.send(codec_version).await?;
  151. for channel_state in channel_states {
  152. control_channel.send(channel_state).await?;
  153. }
  154. for user_state in user_states {
  155. control_channel.send(user_state).await?;
  156. }
  157. control_channel.send(server_sync).await?;
  158. control_channel.send(server_config).await?;
  159. let (client, response_receiver) = Client::new(control_channel, db, session_id).await;
  160. Ok((client, response_receiver))
  161. }
  162. pub async fn set_audio_channel(&mut self, audio_channel: AudioChannel) {
  163. let (mut receiver, sender) = audio_channel.split();
  164. let inner_sender = self.inner_sender.clone();
  165. self.audio_task = Some(tokio::spawn(async move {
  166. loop {
  167. match receiver.receive().await {
  168. Ok(packet) => {
  169. if inner_sender.try_send(InnerMessage::Audio(packet)).is_err() {
  170. return;
  171. }
  172. }
  173. Err(_) => return,
  174. }
  175. }
  176. }));
  177. self.inner_sender
  178. .send(InnerMessage::AudioChannel(sender))
  179. .await;
  180. }
  181. pub async fn send_message(&self, message: Message) {
  182. match message {
  183. Message::UserTalking(_) => {
  184. self.inner_sender.try_send(InnerMessage::Message(message));
  185. }
  186. _ => {
  187. self.inner_sender.send(InnerMessage::Message(message)).await;
  188. }
  189. }
  190. }
  191. async fn new(
  192. control_channel: ControlChannel,
  193. db: Arc<Db>,
  194. session_id: u32,
  195. ) -> (Client, Responder) {
  196. let (inner_sender, mut inner_receiver) = mpsc::channel(2);
  197. let (response_sender, response_receiver) = mpsc::channel(2);
  198. let (mut control_channel_receiver, control_channel_sender) = control_channel.split();
  199. let handler_task = tokio::spawn(async move {
  200. let mut handler = Handler {
  201. session_id,
  202. db,
  203. control_channel_sender,
  204. audio_channel_sender: None,
  205. response_sender,
  206. is_audio_tunneling: false,
  207. };
  208. loop {
  209. let message = match inner_receiver.recv().await {
  210. Some(msg) => msg,
  211. None => return,
  212. };
  213. match message {
  214. InnerMessage::Message(msg) => {
  215. let result = handler.handle_message(msg).await;
  216. if result.is_err() {
  217. return;
  218. }
  219. }
  220. InnerMessage::Packet(packet) => {
  221. let result = handler.handle_mumble_packet(*packet).await;
  222. if result.is_err() {
  223. return;
  224. }
  225. }
  226. InnerMessage::SelfDisconnected => {
  227. handler.self_disconnected().await;
  228. return;
  229. }
  230. InnerMessage::Audio(audio) => {
  231. let result = handler.handle_audio_packet(audio).await;
  232. if result.is_err() {
  233. return;
  234. }
  235. }
  236. InnerMessage::AudioChannel(sender) => {
  237. handler.audio_channel_sender = Some(sender)
  238. }
  239. }
  240. }
  241. });
  242. let sender = inner_sender.clone();
  243. let packet_task = tokio::spawn(async move {
  244. loop {
  245. match control_channel_receiver.receive().await {
  246. Ok(packet) => sender.send(InnerMessage::Packet(Box::from(packet))).await,
  247. Err(_) => {
  248. sender.send(InnerMessage::SelfDisconnected).await;
  249. return;
  250. }
  251. };
  252. }
  253. });
  254. return (
  255. Client {
  256. session_id,
  257. inner_sender,
  258. handler_task,
  259. packet_task,
  260. audio_task: None,
  261. },
  262. response_receiver,
  263. );
  264. }
  265. }
  266. impl Drop for Client {
  267. fn drop(&mut self) {
  268. self.handler_task.abort();
  269. self.packet_task.abort();
  270. if let Some(audio_task) = self.audio_task.as_ref() {
  271. audio_task.abort();
  272. }
  273. }
  274. }
  275. impl Handler {
  276. async fn handle_mumble_packet(&mut self, packet: MumblePacket) -> Result<(), Error> {
  277. match packet {
  278. MumblePacket::Ping(ping) => {
  279. if ping.has_timestamp() {
  280. let mut ping = Ping::new();
  281. ping.set_timestamp(ping.get_timestamp());
  282. self.control_channel_sender
  283. .send(MumblePacket::Ping(ping))
  284. .await?;
  285. }
  286. }
  287. MumblePacket::UdpTunnel(voice) => match voice {
  288. AudioPacket::Ping(_) => {
  289. self.control_channel_sender
  290. .send(MumblePacket::UdpTunnel(voice))
  291. .await?;
  292. }
  293. AudioPacket::AudioData(mut audio_data) => {
  294. audio_data.session_id = Some(self.session_id);
  295. self.response_sender
  296. .try_send(ResponseMessage::Talking(audio_data));
  297. }
  298. },
  299. _ => println!("unimplemented!"),
  300. }
  301. Ok(())
  302. }
  303. async fn handle_message(&mut self, message: Message) -> Result<(), Error> {
  304. match message {
  305. Message::UserConnected(session_id) => self.new_user_connected(session_id).await?,
  306. Message::UserDisconnected(session_id) => self.user_disconnected(session_id).await?,
  307. Message::UserTalking(audio_data) => self.user_talking(audio_data).await?,
  308. }
  309. Ok(())
  310. }
  311. async fn handle_audio_packet(&mut self, packet: AudioPacket) -> Result<(), Error> {
  312. match packet {
  313. AudioPacket::Ping(_) => {
  314. if !self.is_audio_tunneling && self.audio_channel_sender.is_some() {
  315. self.audio_channel_sender
  316. .as_mut()
  317. .unwrap()
  318. .send(packet)
  319. .await?;
  320. } else {
  321. self.control_channel_sender
  322. .send(MumblePacket::UdpTunnel(packet))
  323. .await?;
  324. }
  325. }
  326. AudioPacket::AudioData(mut audio_data) => {
  327. audio_data.session_id = Some(self.session_id);
  328. // It isn't critical to lost some audio packets
  329. self.response_sender
  330. .try_send(ResponseMessage::Talking(audio_data));
  331. }
  332. }
  333. Ok(())
  334. }
  335. async fn new_user_connected(&mut self, session_id: u32) -> Result<(), Error> {
  336. if let Some(user) = self.db.get_user_by_session_id(session_id).await {
  337. self.control_channel_sender
  338. .send(MumblePacket::from(user))
  339. .await?;
  340. }
  341. Ok(())
  342. }
  343. async fn user_disconnected(&mut self, session_id: u32) -> Result<(), Error> {
  344. let mut user_remove = UserRemove::new();
  345. user_remove.set_session(session_id);
  346. Ok(self
  347. .control_channel_sender
  348. .send(MumblePacket::UserRemove(user_remove))
  349. .await?)
  350. }
  351. async fn self_disconnected(&mut self) {
  352. self.db.remove_connected_user(self.session_id).await;
  353. self.response_sender
  354. .send(ResponseMessage::Disconnected)
  355. .await;
  356. }
  357. async fn user_talking(&mut self, audio_data: AudioData) -> Result<(), Error> {
  358. let audio_packet = AudioPacket::AudioData(audio_data);
  359. if !self.is_audio_tunneling && self.audio_channel_sender.is_some() {
  360. self.audio_channel_sender
  361. .as_mut()
  362. .unwrap()
  363. .send(audio_packet)
  364. .await?;
  365. } else {
  366. self.control_channel_sender
  367. .send(MumblePacket::UdpTunnel(audio_packet))
  368. .await?;
  369. }
  370. Ok(())
  371. }
  372. }
  373. impl From<User> for UserState {
  374. fn from(user: User) -> Self {
  375. let mut user_state = UserState::new();
  376. if let Some(id) = user.id {
  377. user_state.set_user_id(id)
  378. }
  379. user_state.set_name(user.username);
  380. user_state.set_channel_id(user.channel_id);
  381. user_state.set_session(user.session_id);
  382. user_state
  383. }
  384. }
  385. impl From<User> for MumblePacket {
  386. fn from(user: User) -> Self {
  387. MumblePacket::UserState(UserState::from(user))
  388. }
  389. }
  390. impl From<crate::protocol::Error> for Error {
  391. fn from(_: crate::protocol::Error) -> Self {
  392. StreamError
  393. }
  394. }
  395. impl From<crate::connection::Error> for Error {
  396. fn from(_: crate::connection::Error) -> Self {
  397. StreamError
  398. }
  399. }