connection.rs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
  2. use protobuf::{Message, ProtobufError};
  3. use crate::proto::mumble::{Version, Authenticate, Ping, Reject, ServerSync,
  4. ChannelRemove, ChannelState, UserRemove, UserState, BanList,
  5. TextMessage, PermissionDenied, ACL as Acl, QueryUsers, CryptSetup,
  6. ContextActionModify, ContextAction, UserList, VoiceTarget, PermissionQuery,
  7. CodecVersion, UserStats, RequestBlob, ServerConfig, SuggestConfig};
  8. const VERSION: u16 = 0;
  9. const UDP_TUNNEL: u16 = 1;
  10. const AUTHENTICATE: u16 = 2;
  11. const PING: u16 = 3;
  12. const REJECT: u16 = 4;
  13. const SERVER_SYNC: u16 = 5;
  14. const CHANNEL_REMOVE: u16 = 6;
  15. const CHANNEL_STATE: u16 = 7;
  16. const USER_REMOVE: u16 = 8;
  17. const USER_STATE: u16 = 9;
  18. const BAN_LIST: u16 = 10;
  19. const TEXT_MESSAGE: u16 = 11;
  20. const PERMISSION_DENIED: u16 = 12;
  21. const ACL: u16 = 13;
  22. const QUERY_USERS: u16 = 14;
  23. const CRYPT_SETUP: u16 = 15;
  24. const CONTEXT_ACTION_MODIFY: u16 = 16;
  25. const CONTEXT_ACTION: u16 = 17;
  26. const USER_LIST: u16 = 18;
  27. const VOICE_TARGET: u16 = 19;
  28. const PERMISSION_QUERY: u16 = 20;
  29. const CODEC_VERSION: u16 = 21;
  30. const USER_STATS: u16 = 22;
  31. const REQUEST_BLOB: u16 = 23;
  32. const SERVER_CONFIG: u16 = 24;
  33. const SUGGEST_CONFIG: u16 = 25;
  34. const MAX_AUDIO_PACKET_SIZE: usize = 1020;
  35. pub enum MumblePacket {
  36. Version(Version),
  37. UdpTunnel(VoicePacket),
  38. Authenticate(Authenticate),
  39. Ping(Ping),
  40. Reject(Reject),
  41. ServerSync(ServerSync),
  42. ChannelRemove(ChannelRemove),
  43. ChannelState(ChannelState),
  44. UserRemove(UserRemove),
  45. UserState(UserState),
  46. BanList(BanList),
  47. TextMessage(TextMessage),
  48. PermissionDenied(PermissionDenied),
  49. Acl(Acl),
  50. QueryUsers(QueryUsers),
  51. CryptSetup(CryptSetup),
  52. ContextActionModify(ContextActionModify),
  53. ContextAction(ContextAction),
  54. UserList(UserList),
  55. VoiceTarget(VoiceTarget),
  56. PermissionQuery(PermissionQuery),
  57. CodecVersion(CodecVersion),
  58. UserStats(UserStats),
  59. RequestBlob(RequestBlob),
  60. ServerConfig(ServerConfig),
  61. SuggestConfig(SuggestConfig)
  62. }
  63. pub enum VoicePacket {
  64. Ping(VoicePing),
  65. AudioData(AudioData)
  66. }
  67. pub enum Error {
  68. UnknownPacketType,
  69. ConnectionError,
  70. ParsingError
  71. }
  72. pub struct MumblePacketStream<S> {
  73. stream: S
  74. }
  75. pub struct VoicePacketStream<S> {
  76. stream: S
  77. }
  78. pub struct VoicePing {
  79. timestamp: u64
  80. }
  81. pub struct AudioData {
  82. codec: Codecs,
  83. target: u8,
  84. session_id: Option<u64>,
  85. sequence_number: u64,
  86. audio_payload: Vec<u8>,
  87. positional_info: Option<[f32; 3]>
  88. }
  89. enum Codecs {
  90. CeltAlpha,
  91. Speex,
  92. CeltBeta,
  93. Opus,
  94. }
  95. impl<S> MumblePacketStream<S>
  96. where
  97. S: AsyncRead + AsyncWrite + Unpin + Send,
  98. {
  99. pub fn new(stream: S) -> Self {
  100. MumblePacketStream { stream }
  101. }
  102. pub async fn read(&mut self) -> Result<MumblePacket, Error> {
  103. let packet_type = self.stream.read_u16().await?;
  104. let payload_length = self.stream.read_u32().await?;
  105. if packet_type == UDP_TUNNEL {
  106. return Ok(MumblePacket::UdpTunnel(self.read_voice_packet().await?))
  107. }
  108. let payload = self.read_payload(payload_length).await?;
  109. match packet_type {
  110. VERSION => Ok(MumblePacket::Version(Version::parse_from_bytes(&payload)?)),
  111. AUTHENTICATE => Ok(MumblePacket::Authenticate(Authenticate::parse_from_bytes(&payload)?)),
  112. PING => Ok(MumblePacket::Ping(Ping::parse_from_bytes(&payload)?)),
  113. REJECT => Ok(MumblePacket::Reject(Reject::parse_from_bytes(&payload)?)),
  114. SERVER_SYNC => Ok(MumblePacket::ServerSync(ServerSync::parse_from_bytes(&payload)?)),
  115. CHANNEL_REMOVE => Ok(MumblePacket::ChannelRemove(ChannelRemove::parse_from_bytes(&payload)?)),
  116. CHANNEL_STATE => Ok(MumblePacket::ChannelState(ChannelState::parse_from_bytes(&payload)?)),
  117. USER_REMOVE => Ok(MumblePacket::UserRemove(UserRemove::parse_from_bytes(&payload)?)),
  118. USER_STATE => Ok(MumblePacket::UserState(UserState::parse_from_bytes(&payload)?)),
  119. BAN_LIST => Ok(MumblePacket::BanList(BanList::parse_from_bytes(&payload)?)),
  120. TEXT_MESSAGE => Ok(MumblePacket::TextMessage(TextMessage::parse_from_bytes(&payload)?)),
  121. PERMISSION_DENIED => Ok(MumblePacket::PermissionDenied(PermissionDenied::parse_from_bytes(&payload)?)),
  122. ACL => Ok(MumblePacket::Acl(Acl::parse_from_bytes(&payload)?)),
  123. QUERY_USERS => Ok(MumblePacket::QueryUsers(QueryUsers::parse_from_bytes(&payload)?)),
  124. CRYPT_SETUP => Ok(MumblePacket::CryptSetup(CryptSetup::parse_from_bytes(&payload)?)),
  125. CONTEXT_ACTION_MODIFY => Ok(MumblePacket::ContextActionModify(ContextActionModify::parse_from_bytes(&payload)?)),
  126. CONTEXT_ACTION => Ok(MumblePacket::ContextAction(ContextAction::parse_from_bytes(&payload)?)),
  127. USER_LIST => Ok(MumblePacket::UserList(UserList::parse_from_bytes(&payload)?)),
  128. VOICE_TARGET => Ok(MumblePacket::VoiceTarget(VoiceTarget::parse_from_bytes(&payload)?)),
  129. PERMISSION_QUERY => Ok(MumblePacket::PermissionQuery(PermissionQuery::parse_from_bytes(&payload)?)),
  130. CODEC_VERSION => Ok(MumblePacket::CodecVersion(CodecVersion::parse_from_bytes(&payload)?)),
  131. USER_STATS => Ok(MumblePacket::UserStats(UserStats::parse_from_bytes(&payload)?)),
  132. REQUEST_BLOB => Ok(MumblePacket::RequestBlob(RequestBlob::parse_from_bytes(&payload)?)),
  133. SERVER_CONFIG => Ok(MumblePacket::ServerConfig(ServerConfig::parse_from_bytes(&payload)?)),
  134. SUGGEST_CONFIG => Ok(MumblePacket::SuggestConfig(SuggestConfig::parse_from_bytes(&payload)?)),
  135. _ => Err(Error::UnknownPacketType)
  136. }
  137. }
  138. pub async fn write(&mut self, packet: MumblePacket) -> Result<(), Error> {
  139. match packet {
  140. MumblePacket::UdpTunnel(value) => {
  141. let bytes = Self::serialize_voice_packet(value);
  142. self.stream.write_u16(UDP_TUNNEL).await?;
  143. self.stream.write_u32(bytes.len() as u32).await?;
  144. self.stream.write_all(&bytes).await?;
  145. }
  146. MumblePacket::Version(value) => self.write_protobuf_packet(value, VERSION).await?,
  147. MumblePacket::Authenticate(value) => self.write_protobuf_packet(value, AUTHENTICATE).await?,
  148. MumblePacket::Ping(value) => self.write_protobuf_packet(value, PING).await?,
  149. MumblePacket::Reject(value) => self.write_protobuf_packet(value, REJECT).await?,
  150. MumblePacket::ServerSync(value) => self.write_protobuf_packet(value, SERVER_SYNC).await?,
  151. MumblePacket::ChannelRemove(value) => self.write_protobuf_packet(value, CHANNEL_REMOVE).await?,
  152. MumblePacket::ChannelState(value) => self.write_protobuf_packet(value, CHANNEL_STATE).await?,
  153. MumblePacket::UserRemove(value) => self.write_protobuf_packet(value, USER_REMOVE).await?,
  154. MumblePacket::UserState(value) => self.write_protobuf_packet(value, USER_STATE).await?,
  155. MumblePacket::BanList(value) => self.write_protobuf_packet(value, BAN_LIST).await?,
  156. MumblePacket::TextMessage(value) => self.write_protobuf_packet(value, TEXT_MESSAGE).await?,
  157. MumblePacket::PermissionDenied(value) => self.write_protobuf_packet(value, PERMISSION_DENIED).await?,
  158. MumblePacket::Acl(value) => self.write_protobuf_packet(value, ACL).await?,
  159. MumblePacket::QueryUsers(value) => self.write_protobuf_packet(value, QUERY_USERS).await?,
  160. MumblePacket::CryptSetup(value) => self.write_protobuf_packet(value, CRYPT_SETUP).await?,
  161. MumblePacket::ContextActionModify(value) => self.write_protobuf_packet(value, CONTEXT_ACTION_MODIFY).await?,
  162. MumblePacket::ContextAction(value) => self.write_protobuf_packet(value, CONTEXT_ACTION).await?,
  163. MumblePacket::UserList(value) => self.write_protobuf_packet(value, USER_LIST).await?,
  164. MumblePacket::VoiceTarget(value) => self.write_protobuf_packet(value, VOICE_TARGET).await?,
  165. MumblePacket::PermissionQuery(value) => self.write_protobuf_packet(value, PERMISSION_QUERY).await?,
  166. MumblePacket::CodecVersion(value) => self.write_protobuf_packet(value, CODEC_VERSION).await?,
  167. MumblePacket::UserStats(value) => self.write_protobuf_packet(value, USER_STATS).await?,
  168. MumblePacket::RequestBlob(value) => self.write_protobuf_packet(value, REQUEST_BLOB).await?,
  169. MumblePacket::ServerConfig(value) => self.write_protobuf_packet(value, SERVER_CONFIG).await?,
  170. MumblePacket::SuggestConfig(value) => self.write_protobuf_packet(value, SUGGEST_CONFIG).await?,
  171. }
  172. Ok(())
  173. }
  174. async fn read_payload(&mut self, payload_length: u32) -> tokio::io::Result<Vec<u8>> {
  175. let mut payload = vec![0; payload_length as usize];
  176. self.stream.read_exact(&mut payload).await?;
  177. Ok(payload)
  178. }
  179. async fn read_varint(&mut self) -> Result<u64, Error> { //TODO negative number decode
  180. let header = self.stream.read_u8().await?;
  181. //7-bit number
  182. if (header & 0b1000_0000) == 0b0000_0000 {
  183. return Ok(header as u64);
  184. }
  185. //14-bit number
  186. if (header & 0b1100_0000) == 0b1000_0000 {
  187. let first_number_byte = header ^ 0b1000_0000;
  188. return Ok(
  189. ((first_number_byte as u64) << 8) |
  190. (self.stream.read_u8().await? as u64)
  191. );
  192. }
  193. //21-bit number
  194. if (header & 0b1110_0000) == 0b1100_0000 {
  195. let first_number_byte = header ^ 0b1100_0000;
  196. return Ok(
  197. ((first_number_byte as u64) << 16) |
  198. ((self.stream.read_u8().await? as u64) << 8 ) |
  199. (self.stream.read_u8().await? as u64)
  200. );
  201. }
  202. //28-bit number
  203. if (header & 0b1111_0000) == 0b1110_0000 {
  204. let first_number_byte = header ^ 0b1110_0000;
  205. return Ok(
  206. ((first_number_byte as u64) << 24) |
  207. ((self.stream.read_u8().await? as u64) << 16) |
  208. ((self.stream.read_u8().await? as u64) << 8 ) |
  209. (self.stream.read_u8().await? as u64)
  210. );
  211. }
  212. //32-bit number
  213. if (header & 0b1111_1100) == 0b1111_0000 {
  214. return Ok(self.stream.read_u32().await? as u64);
  215. }
  216. //64-bit number
  217. if (header & 0b1111_1100) == 0b1111_0100 {
  218. return Ok(self.stream.read_u64().await?);
  219. }
  220. Err(Error::ParsingError)
  221. }
  222. async fn read_voice_packet(&mut self) -> Result<VoicePacket, Error> {
  223. let header = self.stream.read_u8().await?;
  224. let (audio_packet_type, target) = Self::decode_header(header);
  225. if audio_packet_type == 1 {
  226. let timestamp = self.read_varint().await?;
  227. return Ok(VoicePacket::Ping(VoicePing {
  228. timestamp
  229. }));
  230. }
  231. let codec = match audio_packet_type {
  232. 0 => Codecs::CeltAlpha,
  233. 2 => Codecs::Speex,
  234. 3 => Codecs::CeltBeta,
  235. 4 => Codecs::Opus,
  236. _ => return Err(Error::ParsingError)
  237. };
  238. let sequence_number = self.read_varint().await?;
  239. let audio_payload = self.read_audio_payload(&codec).await?;
  240. Ok(VoicePacket::AudioData(AudioData {
  241. codec,
  242. target,
  243. session_id: None,
  244. sequence_number,
  245. audio_payload,
  246. positional_info: None //TODO
  247. }))
  248. }
  249. async fn read_audio_payload(&mut self, codec_type: &Codecs) -> Result<Vec<u8>, Error> {
  250. match codec_type {
  251. Codecs::CeltAlpha | Codecs::Speex | Codecs::CeltBeta => {
  252. let mut payload = vec![];
  253. loop {
  254. let header = self.stream.read_u8().await?;
  255. let continuation_bit = header & 0b1000_0000;
  256. let length = header & 0b0111_1111;
  257. payload.push(header);
  258. if length == 0 {
  259. payload.push(0);
  260. break;
  261. }
  262. for _ in 0..length {
  263. payload.push(self.stream.read_u8().await?)
  264. }
  265. if continuation_bit == 0 {
  266. break;
  267. }
  268. if payload.len() > MAX_AUDIO_PACKET_SIZE {
  269. return Err(Error::ParsingError);
  270. }
  271. }
  272. Ok(payload)
  273. }
  274. Codecs::Opus => {
  275. let mut payload = vec![];
  276. let header = self.read_varint().await?;
  277. let length = header & 0x1fff;
  278. payload.append(&mut Self::encode_varint(header));
  279. for _ in 0..length {
  280. payload.push(self.stream.read_u8().await?)
  281. }
  282. Ok(payload)
  283. }
  284. }
  285. }
  286. async fn write_protobuf_packet<T> (&mut self, packet: T, packet_type: u16) -> Result<(), Error>
  287. where T: Message
  288. {
  289. let bytes = packet.write_to_bytes()?;
  290. self.stream.write_u16(packet_type).await?;
  291. self.stream.write_u32(bytes.len() as u32).await?;
  292. self.stream.write_all(&bytes).await?;
  293. Ok(())
  294. }
  295. fn decode_header(header: u8) -> (u8, u8) {
  296. let packet_type = header >> 5;
  297. let target = header & 0b0001_1111;
  298. (packet_type, target)
  299. }
  300. fn encode_header(packet_type: u8, target: u8) -> u8 {
  301. (packet_type << 5) | target
  302. }
  303. fn encode_varint(number: u64) -> Vec<u8> { //TODO negative number encode
  304. let mut result = vec![];
  305. if number < 0x80 {
  306. //7-bit number
  307. result.push(number as u8);
  308. } else if number < 0x4000 {
  309. //14-bit number
  310. result.push(((number >> 8) | 0x80) as u8);
  311. result.push((number & 0xFF) as u8);
  312. } else if number < 0x200000 {
  313. //21-bit number
  314. result.push(((number >> 16) | 0xC0) as u8);
  315. result.push(((number >> 8) & 0xFF) as u8);
  316. result.push((number & 0xFF) as u8);
  317. } else if number < 0x10000000 {
  318. //28-bit number
  319. result.push(((number >> 24) | 0xE0) as u8);
  320. result.push(((number >> 16) & 0xFF) as u8);
  321. result.push(((number >> 8) & 0xFF) as u8);
  322. result.push((number & 0xFF) as u8);
  323. } else if number < 0x100000000 {
  324. //32-bit number
  325. result.push(0xF0);
  326. result.push(((number >> 24) & 0xFF) as u8);
  327. result.push(((number >> 16) & 0xFF) as u8);
  328. result.push(((number >> 8) & 0xFF) as u8);
  329. result.push((number & 0xFF) as u8);
  330. } else {
  331. //64-bit number
  332. result.push(0xF4);
  333. result.push(((number >> 56) & 0xFF) as u8);
  334. result.push(((number >> 48) & 0xFF) as u8);
  335. result.push(((number >> 40) & 0xFF) as u8);
  336. result.push(((number >> 32) & 0xFF) as u8);
  337. result.push(((number >> 24) & 0xFF) as u8);
  338. result.push(((number >> 16) & 0xFF) as u8);
  339. result.push(((number >> 8) & 0xFF) as u8);
  340. result.push((number & 0xFF) as u8);
  341. }
  342. result
  343. }
  344. fn serialize_voice_packet(packet: VoicePacket) -> Vec<u8> {
  345. let mut result = vec![];
  346. match packet {
  347. VoicePacket::Ping(value) => {
  348. result.push(0b0010_0000);
  349. let mut varint = Self::encode_varint(value.timestamp);
  350. result.append(&mut varint);
  351. }
  352. VoicePacket::AudioData(mut value) => {
  353. let packet_type = match value.codec {
  354. Codecs::CeltAlpha => 0b0000_0000,
  355. Codecs::Speex => 0b0100_0000,
  356. Codecs::CeltBeta => 0b0110_0000,
  357. Codecs::Opus => 0b1000_0000,
  358. };
  359. let header = Self::encode_header(packet_type, value.target);
  360. result.push(header);
  361. if let Some(session_id) = value.session_id {
  362. let mut session_id = Self::encode_varint(session_id);
  363. result.append(&mut session_id);
  364. }
  365. let mut sequence_number = Self::encode_varint(value.sequence_number);
  366. result.append(&mut sequence_number);
  367. result.append(&mut value.audio_payload);
  368. if let Some(position_info) = value.positional_info {
  369. result.extend_from_slice(&position_info[0].to_be_bytes());
  370. result.extend_from_slice(&position_info[1].to_be_bytes());
  371. result.extend_from_slice(&position_info[2].to_be_bytes());
  372. }
  373. }
  374. }
  375. result
  376. }
  377. }
  378. impl <S> VoicePacketStream<S>
  379. where
  380. S: AsyncRead + AsyncWrite + Unpin + Send,
  381. {
  382. pub fn new(stream: S) -> Self {
  383. VoicePacketStream { stream }
  384. }
  385. pub async fn read(&mut self) -> Result<VoicePacket, Error> {
  386. unimplemented!() //TODO
  387. }
  388. pub async fn write(&mut self, packet: VoicePacket) -> Result<(), Error> {
  389. unimplemented!() //TODO
  390. }
  391. }
  392. impl From<std::io::Error> for Error {
  393. fn from(_: std::io::Error) -> Self {
  394. Error::ConnectionError
  395. }
  396. }
  397. impl From<ProtobufError> for Error {
  398. fn from(error: ProtobufError) -> Self {
  399. match error {
  400. ProtobufError::IoError(_) | ProtobufError::WireError(_) => Error::ConnectionError,
  401. ProtobufError::Utf8(_) | ProtobufError::MessageNotInitialized { .. } => Error::ParsingError
  402. }
  403. }
  404. }
  405. #[cfg(test)]
  406. mod tests {
  407. use super::*;
  408. use tokio::net::TcpStream;
  409. #[test]
  410. fn test_decode_header() {
  411. assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b0100_1000), (2, 8));
  412. assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b0111_1111), (3, 31));
  413. assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b1000_0000), (4, 0));
  414. }
  415. #[test]
  416. fn test_encode_header() {
  417. assert_eq!(MumblePacketStream::<TcpStream>::encode_header(2, 8), 0b0100_1000);
  418. assert_eq!(MumblePacketStream::<TcpStream>::encode_header(3, 31), 0b0111_1111);
  419. assert_eq!(MumblePacketStream::<TcpStream>::encode_header(4, 0), 0b1000_0000);
  420. }
  421. #[test]
  422. fn test_encode_varint() {
  423. let varint_7bit_positive = vec![0b0000_1000];
  424. let varint_14bit_positive = vec![0b1010_0010, 0b0000_0011];
  425. let varint_21bit_positive = vec![0b1101_0100, 0b0000_0000, 0b0000_0000];
  426. let varint_28bit_positive =
  427. vec![0b1110_1100, 0b0100_0000, 0b0010_0000, 0b0000_0001];
  428. let varint_32bit_positive =
  429. vec![0b1111_0000, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001];
  430. let varint_64bit_positive =
  431. vec![0b1111_0100, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001,
  432. 0b0000_0000, 0b0000_0000, 0b0000_0000, 0b0001_0000];
  433. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x8), varint_7bit_positive);
  434. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x2203), varint_14bit_positive);
  435. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x140000), varint_21bit_positive);
  436. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc402001), varint_28bit_positive);
  437. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc0000001), varint_32bit_positive);
  438. assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc000000100000010), varint_64bit_positive);
  439. }
  440. }