Sergey Chushin 3 lat temu
rodzic
commit
96125d8dcd
8 zmienionych plików z 307 dodań i 184 usunięć
  1. 1 1
      build.rs
  2. 32 25
      src/client.rs
  3. 28 12
      src/connection.rs
  4. 20 12
      src/db.rs
  5. 39 34
      src/main.rs
  6. 1 1
      src/proto/mod.rs
  7. 176 89
      src/protocol.rs
  8. 10 10
      src/server.rs

+ 1 - 1
build.rs

@@ -5,4 +5,4 @@ fn main() {
         .include("src/proto")
         .run()
         .expect("protoc");
-}
+}

+ 32 - 25
src/client.rs

@@ -8,7 +8,7 @@ use tokio::task::JoinHandle;
 use crate::connection::Connection;
 use crate::db::{Db, User};
 use crate::proto::mumble::{Ping, UserRemove, UserState};
-use crate::protocol::{MumblePacket, MumblePacketWriter, VoicePacket, AudioData};
+use crate::protocol::{AudioData, MumblePacket, MumblePacketWriter, VoicePacket};
 
 pub enum Message {
     UserConnected(u32),
@@ -49,8 +49,8 @@ type ResponseReceiver = UnboundedReceiver<ResponseMessage>;
 
 impl Client {
     pub async fn new<S>(connection: Connection<S>, db: Arc<Db>) -> (Client, ResponseReceiver)
-        where
-            S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
+    where
+        S: 'static + AsyncRead + AsyncWrite + Unpin + Send,
     {
         let (sender, mut receiver) = mpsc::unbounded_channel();
         let (response_sender, response_receiver) = mpsc::unbounded_channel();
@@ -105,12 +105,15 @@ impl Client {
             }
         });
 
-        return (Client {
-            session_id,
-            inner_sender,
-            handler_task,
-            packet_task,
-        }, response_receiver);
+        return (
+            Client {
+                session_id,
+                inner_sender,
+                handler_task,
+                packet_task,
+            },
+            response_receiver,
+        );
     }
 
     pub fn post_message(&self, message: Message) {
@@ -126,8 +129,8 @@ impl Drop for Client {
 }
 
 impl<W> Handler<W>
-    where
-        W: AsyncWrite + Unpin + Send,
+where
+    W: AsyncWrite + Unpin + Send,
 {
     async fn handle_packet(&mut self, packet: MumblePacket) -> Result<(), Error> {
         match packet {
@@ -138,18 +141,17 @@ impl<W> Handler<W>
                     self.writer.write(MumblePacket::Ping(ping)).await?;
                 }
             }
-            MumblePacket::UdpTunnel(voice) => {
-                match voice {
-                    VoicePacket::Ping(_) => {
-                        self.writer.write(MumblePacket::UdpTunnel(voice)).await;
-                    }
-                    VoicePacket::AudioData(mut audio_data) => {
-                        audio_data.session_id = Some(self.session_id);
-                        self.response_sender.send(ResponseMessage::Talking(audio_data));
-                    }
+            MumblePacket::UdpTunnel(voice) => match voice {
+                VoicePacket::Ping(_) => {
+                    self.writer.write(MumblePacket::UdpTunnel(voice)).await;
                 }
-            }
-            _ => println!("unimplemented!")
+                VoicePacket::AudioData(mut audio_data) => {
+                    audio_data.session_id = Some(self.session_id);
+                    self.response_sender
+                        .send(ResponseMessage::Talking(audio_data));
+                }
+            },
+            _ => println!("unimplemented!"),
         }
         Ok(())
     }
@@ -174,7 +176,10 @@ impl<W> Handler<W>
     async fn user_disconnected(&mut self, session_id: u32) -> Result<(), Error> {
         let mut user_remove = UserRemove::new();
         user_remove.set_session(session_id);
-        Ok(self.writer.write(MumblePacket::UserRemove(user_remove)).await?)
+        Ok(self
+            .writer
+            .write(MumblePacket::UserRemove(user_remove))
+            .await?)
     }
 
     async fn self_disconnected(&mut self) {
@@ -183,7 +188,10 @@ impl<W> Handler<W>
     }
 
     async fn user_talking(&mut self, audio_data: AudioData) -> Result<(), Error> {
-        Ok(self.writer.write(MumblePacket::UdpTunnel(VoicePacket::AudioData(audio_data))).await?)
+        Ok(self
+            .writer
+            .write(MumblePacket::UdpTunnel(VoicePacket::AudioData(audio_data)))
+            .await?)
     }
 }
 
@@ -211,4 +219,3 @@ impl From<crate::protocol::Error> for Error {
         Error::StreamError(err)
     }
 }
-

+ 28 - 12
src/connection.rs

@@ -3,8 +3,12 @@ use std::sync::Arc;
 use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
 
 use crate::db::Db;
-use crate::proto::mumble::{ChannelState, CodecVersion, PermissionQuery, ServerConfig, ServerSync, UserState, Version};
-use crate::protocol::{MUMBLE_PROTOCOL_VERSION, MumblePacket, MumblePacketReader, MumblePacketWriter};
+use crate::proto::mumble::{
+    ChannelState, CodecVersion, PermissionQuery, ServerConfig, ServerSync, UserState, Version,
+};
+use crate::protocol::{
+    MumblePacket, MumblePacketReader, MumblePacketWriter, MUMBLE_PROTOCOL_VERSION,
+};
 
 pub struct Connection<S> {
     pub reader: MumblePacketReader<ReadHalf<S>>,
@@ -24,16 +28,20 @@ pub enum Error {
 }
 
 impl<S> Connection<S>
-    where
-        S: AsyncRead + AsyncWrite + Unpin + Send,
+where
+    S: AsyncRead + AsyncWrite + Unpin + Send,
 {
-    pub async fn setup_connection(db: Arc<Db>, stream: S, config: ConnectionConfig) -> Result<Connection<S>, Error> {
+    pub async fn setup_connection(
+        db: Arc<Db>,
+        stream: S,
+        config: ConnectionConfig,
+    ) -> Result<Connection<S>, Error> {
         let (mut reader, mut writer) = crate::protocol::new(stream);
 
         //Version exchange
         let _ = match reader.read().await? {
             MumblePacket::Version(version) => version,
-            _ => return Err(Error::ConnectionSetupError)
+            _ => return Err(Error::ConnectionSetupError),
         };
         let mut version = Version::new();
         version.set_version(MUMBLE_PROTOCOL_VERSION);
@@ -42,7 +50,7 @@ impl<S> Connection<S>
         //Authentication
         let mut auth = match reader.read().await? {
             MumblePacket::Authenticate(auth) => auth,
-            _ => return Err(Error::ConnectionSetupError)
+            _ => return Err(Error::ConnectionSetupError),
         };
         if !auth.has_username() {
             return Err(Error::AuthenticationError);
@@ -57,7 +65,9 @@ impl<S> Connection<S>
         codec_version.set_beta(0);
         codec_version.set_prefer_alpha(true);
         codec_version.set_opus(true);
-        writer.write(MumblePacket::CodecVersion(codec_version)).await?;
+        writer
+            .write(MumblePacket::CodecVersion(codec_version))
+            .await?;
 
         //Channel state
         let channels = db.get_channels().await;
@@ -65,14 +75,18 @@ impl<S> Connection<S>
             let mut channel_state = ChannelState::new();
             channel_state.set_channel_id(channel.id);
             channel_state.set_name(channel.name);
-            writer.write(MumblePacket::ChannelState(channel_state)).await?;
+            writer
+                .write(MumblePacket::ChannelState(channel_state))
+                .await?;
         }
 
         //PermissionQuery
         let mut permission_query = PermissionQuery::new();
         permission_query.set_permissions(134743822);
         permission_query.set_channel_id(0);
-        writer.write(MumblePacket::PermissionQuery(permission_query)).await?;
+        writer
+            .write(MumblePacket::PermissionQuery(permission_query))
+            .await?;
 
         //User states
         let connected_users = db.get_connected_users().await;
@@ -98,7 +112,9 @@ impl<S> Connection<S>
         server_config.set_allow_html(true);
         server_config.set_message_length(5000);
         server_config.set_image_message_length(131072);
-        writer.write(MumblePacket::ServerConfig(server_config)).await?;
+        writer
+            .write(MumblePacket::ServerConfig(server_config))
+            .await?;
 
         Ok(Connection {
             reader,
@@ -112,4 +128,4 @@ impl From<crate::protocol::Error> for Error {
     fn from(err: crate::protocol::Error) -> Self {
         Error::StreamError(err)
     }
-}
+}

+ 20 - 12
src/db.rs

@@ -46,11 +46,14 @@ impl Db {
         let root_channel = bincode::serialize(&Channel {
             id: 0,
             name: "Root".to_string(),
-        }).unwrap();
-        channels.compare_and_swap(
-            ROOT_CHANNEL_ID.to_be_bytes(),
-            Option::<&[u8]>::None,
-            Some(root_channel))
+        })
+        .unwrap();
+        channels
+            .compare_and_swap(
+                ROOT_CHANNEL_ID.to_be_bytes(),
+                Option::<&[u8]>::None,
+                Some(root_channel),
+            )
             .unwrap();
 
         Db {
@@ -64,17 +67,22 @@ impl Db {
     pub async fn add_new_user(&self, username: String) -> u32 {
         let mut connected_users = self.connected_users.write().await;
         let session_id = connected_users.len() as u32;
-        connected_users.insert(session_id, User {
-            id: None,
-            username,
-            channel_id: ROOT_CHANNEL_ID,
+        connected_users.insert(
             session_id,
-        });
+            User {
+                id: None,
+                username,
+                channel_id: ROOT_CHANNEL_ID,
+                session_id,
+            },
+        );
         session_id
     }
 
     pub async fn get_channels(&self) -> Vec<Channel> {
-        self.channels.iter().values()
+        self.channels
+            .iter()
+            .values()
             .map(|channel| bincode::deserialize(&channel.unwrap()).unwrap())
             .collect()
     }
@@ -96,4 +104,4 @@ impl Db {
         let mut connected_users = self.connected_users.write().await;
         connected_users.remove(&session_id);
     }
-}
+}

+ 39 - 34
src/main.rs

@@ -3,43 +3,51 @@ use std::io::BufReader;
 
 use clap::{App, Arg};
 use tokio::runtime::Builder;
-use tokio_rustls::rustls::{Certificate, internal::pemfile, PrivateKey};
+use tokio_rustls::rustls::{internal::pemfile, Certificate, PrivateKey};
 
-mod server;
-mod proto;
-mod protocol;
-mod connection;
-mod db;
 mod client;
+mod connection;
 mod crypto;
+mod db;
+mod proto;
+mod protocol;
+mod server;
 
 fn main() {
     let matches = App::new("Rumble")
         .version("0.0.1")
         .about("Rumble is a mumble server written in Rust.")
-        .arg(Arg::with_name("ip")
-            .long("ip")
-            .default_value("0.0.0.0")
-            .takes_value(true)
-            .help("Specific IP or hostname to bind to"))
-        .arg(Arg::with_name("port")
-            .long("port")
-            .short("p")
-            .default_value("64738")
-            .takes_value(true)
-            .help("Port to use"))
-        .arg(Arg::with_name("certificate")
-            .long("cert_file")
-            .short("c")
-            .takes_value(true)
-            .required(true)
-            .help("Path to a ssl certificate"))
-        .arg(Arg::with_name("private key")
-            .long("private_key")
-            .short("k")
-            .takes_value(true)
-            .required(true)
-            .help("Path to a ssl keyfile"))
+        .arg(
+            Arg::with_name("ip")
+                .long("ip")
+                .default_value("0.0.0.0")
+                .takes_value(true)
+                .help("Specific IP or hostname to bind to"),
+        )
+        .arg(
+            Arg::with_name("port")
+                .long("port")
+                .short("p")
+                .default_value("64738")
+                .takes_value(true)
+                .help("Port to use"),
+        )
+        .arg(
+            Arg::with_name("certificate")
+                .long("cert_file")
+                .short("c")
+                .takes_value(true)
+                .required(true)
+                .help("Path to a ssl certificate"),
+        )
+        .arg(
+            Arg::with_name("private key")
+                .long("private_key")
+                .short("k")
+                .takes_value(true)
+                .required(true)
+                .help("Path to a ssl keyfile"),
+        )
         .get_matches();
 
     let ip = matches.value_of("ip").unwrap();
@@ -56,10 +64,7 @@ fn main() {
         path_to_db_file: path,
     };
 
-    let tokio_rt = Builder::new_multi_thread()
-        .enable_all()
-        .build()
-        .unwrap();
+    let tokio_rt = Builder::new_multi_thread().enable_all().build().unwrap();
     tokio_rt.block_on(async {
         server::run(config).await.unwrap();
     });
@@ -73,4 +78,4 @@ fn read_certificate(path: &str) -> Certificate {
 fn read_private_key(path: &str) -> PrivateKey {
     let mut file = BufReader::new(File::open(path).unwrap());
     pemfile::pkcs8_private_keys(&mut file).unwrap().remove(0)
-}
+}

+ 1 - 1
src/proto/mod.rs

@@ -1 +1 @@
-pub mod mumble;
+pub mod mumble;

+ 176 - 89
src/protocol.rs

@@ -1,11 +1,12 @@
 use protobuf::{Message, ProtobufError};
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
 
-use crate::proto::mumble::{ACL as Acl, Authenticate, BanList, ChannelRemove, ChannelState,
-                           CodecVersion, ContextAction, ContextActionModify, CryptSetup, PermissionDenied,
-                           PermissionQuery, Ping, QueryUsers, Reject, RequestBlob,
-                           ServerConfig, ServerSync, SuggestConfig, TextMessage, UserList,
-                           UserRemove, UserState, UserStats, Version, VoiceTarget};
+use crate::proto::mumble::{
+    Authenticate, BanList, ChannelRemove, ChannelState, CodecVersion, ContextAction,
+    ContextActionModify, CryptSetup, PermissionDenied, PermissionQuery, Ping, QueryUsers, Reject,
+    RequestBlob, ServerConfig, ServerSync, SuggestConfig, TextMessage, UserList, UserRemove,
+    UserState, UserStats, Version, VoiceTarget, ACL as Acl,
+};
 
 pub const MUMBLE_PROTOCOL_VERSION: u32 = 0b0000_0001_0011_0100;
 
@@ -86,7 +87,7 @@ pub struct MumblePacketWriter<W> {
 }
 
 pub struct VoicePing {
-    bytes: Vec<u8>
+    bytes: Vec<u8>,
 }
 
 #[derive(Clone)]
@@ -95,22 +96,28 @@ pub struct AudioData {
     bytes: Vec<u8>,
 }
 
-pub fn new<S>(stream: S) -> (MumblePacketReader<ReadHalf<S>>, MumblePacketWriter<WriteHalf<S>>)
-    where
-        S: AsyncRead + AsyncWrite + Unpin + Send,
+pub fn new<S>(
+    stream: S,
+) -> (
+    MumblePacketReader<ReadHalf<S>>,
+    MumblePacketWriter<WriteHalf<S>>,
+)
+where
+    S: AsyncRead + AsyncWrite + Unpin + Send,
 {
     let (reader, writer) = tokio::io::split(stream);
-    (MumblePacketReader::new(reader), MumblePacketWriter::new(writer))
+    (
+        MumblePacketReader::new(reader),
+        MumblePacketWriter::new(writer),
+    )
 }
 
 impl<R> MumblePacketReader<R>
-    where
-        R: AsyncRead + Unpin + Send,
+where
+    R: AsyncRead + Unpin + Send,
 {
     pub fn new(reader: R) -> Self {
-        MumblePacketReader {
-            reader
-        }
+        MumblePacketReader { reader }
     }
 
     pub async fn read(&mut self) -> Result<MumblePacket, Error> {
@@ -120,36 +127,79 @@ impl<R> MumblePacketReader<R>
 
         match packet_type {
             VERSION => Ok(MumblePacket::Version(Version::parse_from_bytes(&payload)?)),
-            UDP_TUNNEL => Ok(MumblePacket::UdpTunnel(VoicePacket::parse_from_bytes(payload)?)),
-            AUTHENTICATE => Ok(MumblePacket::Authenticate(Authenticate::parse_from_bytes(&payload)?)),
+            UDP_TUNNEL => Ok(MumblePacket::UdpTunnel(VoicePacket::parse_from_bytes(
+                payload,
+            )?)),
+            AUTHENTICATE => Ok(MumblePacket::Authenticate(Authenticate::parse_from_bytes(
+                &payload,
+            )?)),
             PING => Ok(MumblePacket::Ping(Ping::parse_from_bytes(&payload)?)),
             REJECT => Ok(MumblePacket::Reject(Reject::parse_from_bytes(&payload)?)),
-            SERVER_SYNC => Ok(MumblePacket::ServerSync(ServerSync::parse_from_bytes(&payload)?)),
-            CHANNEL_REMOVE => Ok(MumblePacket::ChannelRemove(ChannelRemove::parse_from_bytes(&payload)?)),
-            CHANNEL_STATE => Ok(MumblePacket::ChannelState(ChannelState::parse_from_bytes(&payload)?)),
-            USER_REMOVE => Ok(MumblePacket::UserRemove(UserRemove::parse_from_bytes(&payload)?)),
-            USER_STATE => Ok(MumblePacket::UserState(UserState::parse_from_bytes(&payload)?)),
+            SERVER_SYNC => Ok(MumblePacket::ServerSync(ServerSync::parse_from_bytes(
+                &payload,
+            )?)),
+            CHANNEL_REMOVE => Ok(MumblePacket::ChannelRemove(
+                ChannelRemove::parse_from_bytes(&payload)?,
+            )),
+            CHANNEL_STATE => Ok(MumblePacket::ChannelState(ChannelState::parse_from_bytes(
+                &payload,
+            )?)),
+            USER_REMOVE => Ok(MumblePacket::UserRemove(UserRemove::parse_from_bytes(
+                &payload,
+            )?)),
+            USER_STATE => Ok(MumblePacket::UserState(UserState::parse_from_bytes(
+                &payload,
+            )?)),
             BAN_LIST => Ok(MumblePacket::BanList(BanList::parse_from_bytes(&payload)?)),
-            TEXT_MESSAGE => Ok(MumblePacket::TextMessage(TextMessage::parse_from_bytes(&payload)?)),
-            PERMISSION_DENIED => Ok(MumblePacket::PermissionDenied(PermissionDenied::parse_from_bytes(&payload)?)),
+            TEXT_MESSAGE => Ok(MumblePacket::TextMessage(TextMessage::parse_from_bytes(
+                &payload,
+            )?)),
+            PERMISSION_DENIED => Ok(MumblePacket::PermissionDenied(
+                PermissionDenied::parse_from_bytes(&payload)?,
+            )),
             ACL => Ok(MumblePacket::Acl(Acl::parse_from_bytes(&payload)?)),
-            QUERY_USERS => Ok(MumblePacket::QueryUsers(QueryUsers::parse_from_bytes(&payload)?)),
-            CRYPT_SETUP => Ok(MumblePacket::CryptSetup(CryptSetup::parse_from_bytes(&payload)?)),
-            CONTEXT_ACTION_MODIFY => Ok(MumblePacket::ContextActionModify(ContextActionModify::parse_from_bytes(&payload)?)),
-            CONTEXT_ACTION => Ok(MumblePacket::ContextAction(ContextAction::parse_from_bytes(&payload)?)),
-            USER_LIST => Ok(MumblePacket::UserList(UserList::parse_from_bytes(&payload)?)),
-            VOICE_TARGET => Ok(MumblePacket::VoiceTarget(VoiceTarget::parse_from_bytes(&payload)?)),
-            PERMISSION_QUERY => Ok(MumblePacket::PermissionQuery(PermissionQuery::parse_from_bytes(&payload)?)),
-            CODEC_VERSION => Ok(MumblePacket::CodecVersion(CodecVersion::parse_from_bytes(&payload)?)),
-            USER_STATS => Ok(MumblePacket::UserStats(UserStats::parse_from_bytes(&payload)?)),
-            REQUEST_BLOB => Ok(MumblePacket::RequestBlob(RequestBlob::parse_from_bytes(&payload)?)),
-            SERVER_CONFIG => Ok(MumblePacket::ServerConfig(ServerConfig::parse_from_bytes(&payload)?)),
-            SUGGEST_CONFIG => Ok(MumblePacket::SuggestConfig(SuggestConfig::parse_from_bytes(&payload)?)),
-            _ => Err(Error::UnknownPacketType)
+            QUERY_USERS => Ok(MumblePacket::QueryUsers(QueryUsers::parse_from_bytes(
+                &payload,
+            )?)),
+            CRYPT_SETUP => Ok(MumblePacket::CryptSetup(CryptSetup::parse_from_bytes(
+                &payload,
+            )?)),
+            CONTEXT_ACTION_MODIFY => Ok(MumblePacket::ContextActionModify(
+                ContextActionModify::parse_from_bytes(&payload)?,
+            )),
+            CONTEXT_ACTION => Ok(MumblePacket::ContextAction(
+                ContextAction::parse_from_bytes(&payload)?,
+            )),
+            USER_LIST => Ok(MumblePacket::UserList(UserList::parse_from_bytes(
+                &payload,
+            )?)),
+            VOICE_TARGET => Ok(MumblePacket::VoiceTarget(VoiceTarget::parse_from_bytes(
+                &payload,
+            )?)),
+            PERMISSION_QUERY => Ok(MumblePacket::PermissionQuery(
+                PermissionQuery::parse_from_bytes(&payload)?,
+            )),
+            CODEC_VERSION => Ok(MumblePacket::CodecVersion(CodecVersion::parse_from_bytes(
+                &payload,
+            )?)),
+            USER_STATS => Ok(MumblePacket::UserStats(UserStats::parse_from_bytes(
+                &payload,
+            )?)),
+            REQUEST_BLOB => Ok(MumblePacket::RequestBlob(RequestBlob::parse_from_bytes(
+                &payload,
+            )?)),
+            SERVER_CONFIG => Ok(MumblePacket::ServerConfig(ServerConfig::parse_from_bytes(
+                &payload,
+            )?)),
+            SUGGEST_CONFIG => Ok(MumblePacket::SuggestConfig(
+                SuggestConfig::parse_from_bytes(&payload)?,
+            )),
+            _ => Err(Error::UnknownPacketType),
         }
     }
 
-    async fn read_varint(&mut self) -> Result<u64, Error> { //TODO negative number decode
+    async fn read_varint(&mut self) -> Result<u64, Error> {
+        //TODO negative number decode
         let header = self.reader.read_u8().await?;
 
         //7-bit number
@@ -159,29 +209,22 @@ impl<R> MumblePacketReader<R>
         //14-bit number
         if (header & 0b1100_0000) == 0b1000_0000 {
             let first_number_byte = header ^ 0b1000_0000;
-            return Ok(
-                ((first_number_byte as u64) << 8) |
-                    (self.reader.read_u8().await? as u64)
-            );
+            return Ok(((first_number_byte as u64) << 8) | (self.reader.read_u8().await? as u64));
         }
         //21-bit number
         if (header & 0b1110_0000) == 0b1100_0000 {
             let first_number_byte = header ^ 0b1100_0000;
-            return Ok(
-                ((first_number_byte as u64) << 16) |
-                    ((self.reader.read_u8().await? as u64) << 8) |
-                    (self.reader.read_u8().await? as u64)
-            );
+            return Ok(((first_number_byte as u64) << 16)
+                | ((self.reader.read_u8().await? as u64) << 8)
+                | (self.reader.read_u8().await? as u64));
         }
         //28-bit number
         if (header & 0b1111_0000) == 0b1110_0000 {
             let first_number_byte = header ^ 0b1110_0000;
-            return Ok(
-                ((first_number_byte as u64) << 24) |
-                    ((self.reader.read_u8().await? as u64) << 16) |
-                    ((self.reader.read_u8().await? as u64) << 8) |
-                    (self.reader.read_u8().await? as u64)
-            );
+            return Ok(((first_number_byte as u64) << 24)
+                | ((self.reader.read_u8().await? as u64) << 16)
+                | ((self.reader.read_u8().await? as u64) << 8)
+                | (self.reader.read_u8().await? as u64));
         }
         //32-bit number
         if (header & 0b1111_1100) == 0b1111_0000 {
@@ -203,13 +246,11 @@ impl<R> MumblePacketReader<R>
 }
 
 impl<W> MumblePacketWriter<W>
-    where
-        W: AsyncWrite + Unpin + Send,
+where
+    W: AsyncWrite + Unpin + Send,
 {
     pub fn new(writer: W) -> Self {
-        MumblePacketWriter {
-            writer
-        }
+        MumblePacketWriter { writer }
     }
 
     pub async fn write(&mut self, packet: MumblePacket) -> Result<(), Error> {
@@ -221,30 +262,65 @@ impl<W> MumblePacketWriter<W>
                 self.writer.write_all(&bytes).await?;
             }
             MumblePacket::Version(value) => self.write_protobuf_packet(value, VERSION).await?,
-            MumblePacket::Authenticate(value) => self.write_protobuf_packet(value, AUTHENTICATE).await?,
+            MumblePacket::Authenticate(value) => {
+                self.write_protobuf_packet(value, AUTHENTICATE).await?
+            }
             MumblePacket::Ping(value) => self.write_protobuf_packet(value, PING).await?,
             MumblePacket::Reject(value) => self.write_protobuf_packet(value, REJECT).await?,
-            MumblePacket::ServerSync(value) => self.write_protobuf_packet(value, SERVER_SYNC).await?,
-            MumblePacket::ChannelRemove(value) => self.write_protobuf_packet(value, CHANNEL_REMOVE).await?,
-            MumblePacket::ChannelState(value) => self.write_protobuf_packet(value, CHANNEL_STATE).await?,
-            MumblePacket::UserRemove(value) => self.write_protobuf_packet(value, USER_REMOVE).await?,
+            MumblePacket::ServerSync(value) => {
+                self.write_protobuf_packet(value, SERVER_SYNC).await?
+            }
+            MumblePacket::ChannelRemove(value) => {
+                self.write_protobuf_packet(value, CHANNEL_REMOVE).await?
+            }
+            MumblePacket::ChannelState(value) => {
+                self.write_protobuf_packet(value, CHANNEL_STATE).await?
+            }
+            MumblePacket::UserRemove(value) => {
+                self.write_protobuf_packet(value, USER_REMOVE).await?
+            }
             MumblePacket::UserState(value) => self.write_protobuf_packet(value, USER_STATE).await?,
             MumblePacket::BanList(value) => self.write_protobuf_packet(value, BAN_LIST).await?,
-            MumblePacket::TextMessage(value) => self.write_protobuf_packet(value, TEXT_MESSAGE).await?,
-            MumblePacket::PermissionDenied(value) => self.write_protobuf_packet(value, PERMISSION_DENIED).await?,
+            MumblePacket::TextMessage(value) => {
+                self.write_protobuf_packet(value, TEXT_MESSAGE).await?
+            }
+            MumblePacket::PermissionDenied(value) => {
+                self.write_protobuf_packet(value, PERMISSION_DENIED).await?
+            }
             MumblePacket::Acl(value) => self.write_protobuf_packet(value, ACL).await?,
-            MumblePacket::QueryUsers(value) => self.write_protobuf_packet(value, QUERY_USERS).await?,
-            MumblePacket::CryptSetup(value) => self.write_protobuf_packet(value, CRYPT_SETUP).await?,
-            MumblePacket::ContextActionModify(value) => self.write_protobuf_packet(value, CONTEXT_ACTION_MODIFY).await?,
-            MumblePacket::ContextAction(value) => self.write_protobuf_packet(value, CONTEXT_ACTION).await?,
+            MumblePacket::QueryUsers(value) => {
+                self.write_protobuf_packet(value, QUERY_USERS).await?
+            }
+            MumblePacket::CryptSetup(value) => {
+                self.write_protobuf_packet(value, CRYPT_SETUP).await?
+            }
+            MumblePacket::ContextActionModify(value) => {
+                self.write_protobuf_packet(value, CONTEXT_ACTION_MODIFY)
+                    .await?
+            }
+            MumblePacket::ContextAction(value) => {
+                self.write_protobuf_packet(value, CONTEXT_ACTION).await?
+            }
             MumblePacket::UserList(value) => self.write_protobuf_packet(value, USER_LIST).await?,
-            MumblePacket::VoiceTarget(value) => self.write_protobuf_packet(value, VOICE_TARGET).await?,
-            MumblePacket::PermissionQuery(value) => self.write_protobuf_packet(value, PERMISSION_QUERY).await?,
-            MumblePacket::CodecVersion(value) => self.write_protobuf_packet(value, CODEC_VERSION).await?,
+            MumblePacket::VoiceTarget(value) => {
+                self.write_protobuf_packet(value, VOICE_TARGET).await?
+            }
+            MumblePacket::PermissionQuery(value) => {
+                self.write_protobuf_packet(value, PERMISSION_QUERY).await?
+            }
+            MumblePacket::CodecVersion(value) => {
+                self.write_protobuf_packet(value, CODEC_VERSION).await?
+            }
             MumblePacket::UserStats(value) => self.write_protobuf_packet(value, USER_STATS).await?,
-            MumblePacket::RequestBlob(value) => self.write_protobuf_packet(value, REQUEST_BLOB).await?,
-            MumblePacket::ServerConfig(value) => self.write_protobuf_packet(value, SERVER_CONFIG).await?,
-            MumblePacket::SuggestConfig(value) => self.write_protobuf_packet(value, SUGGEST_CONFIG).await?,
+            MumblePacket::RequestBlob(value) => {
+                self.write_protobuf_packet(value, REQUEST_BLOB).await?
+            }
+            MumblePacket::ServerConfig(value) => {
+                self.write_protobuf_packet(value, SERVER_CONFIG).await?
+            }
+            MumblePacket::SuggestConfig(value) => {
+                self.write_protobuf_packet(value, SUGGEST_CONFIG).await?
+            }
         }
 
         self.writer.flush().await?;
@@ -252,7 +328,8 @@ impl<W> MumblePacketWriter<W>
     }
 
     async fn write_protobuf_packet<T>(&mut self, packet: T, packet_type: u16) -> Result<(), Error>
-        where T: Message
+    where
+        T: Message,
     {
         let bytes = packet.write_to_bytes()?;
         self.writer.write_u16(packet_type).await?;
@@ -266,15 +343,13 @@ impl<W> MumblePacketWriter<W>
 impl VoicePacket {
     fn parse_from_bytes(bytes: Vec<u8>) -> Result<VoicePacket, Error> {
         if bytes.is_empty() {
-            return Err(Error::ParsingError)
+            return Err(Error::ParsingError);
         }
 
         let header = bytes.first().unwrap();
         let (packet_type, _) = decode_header(header.clone());
         if packet_type == 1 {
-            return Ok(VoicePacket::Ping(VoicePing {
-                bytes
-            }))
+            return Ok(VoicePacket::Ping(VoicePing { bytes }));
         }
 
         Ok(VoicePacket::AudioData(AudioData {
@@ -294,7 +369,8 @@ fn encode_header(packet_type: u8, target: u8) -> u8 {
     (packet_type << 5) | target
 }
 
-fn encode_varint(number: u64) -> Vec<u8> { //TODO negative number encode
+fn encode_varint(number: u64) -> Vec<u8> {
+    //TODO negative number encode
     let mut result = vec![];
 
     if number < 0x80 {
@@ -394,13 +470,25 @@ mod tests {
         let varint_7bit_positive = vec![0b0000_1000];
         let varint_14bit_positive = vec![0b1010_0010, 0b0000_0011];
         let varint_21bit_positive = vec![0b1101_0100, 0b0000_0000, 0b0000_0000];
-        let varint_28bit_positive =
-            vec![0b1110_1100, 0b0100_0000, 0b0010_0000, 0b0000_0001];
-        let varint_32bit_positive =
-            vec![0b1111_0000, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001];
-        let varint_64bit_positive =
-            vec![0b1111_0100, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001,
-                 0b0000_0000, 0b0000_0000, 0b0000_0000, 0b0001_0000];
+        let varint_28bit_positive = vec![0b1110_1100, 0b0100_0000, 0b0010_0000, 0b0000_0001];
+        let varint_32bit_positive = vec![
+            0b1111_0000,
+            0b1100_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0001,
+        ];
+        let varint_64bit_positive = vec![
+            0b1111_0100,
+            0b1100_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0001,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0001_0000,
+        ];
 
         assert_eq!(encode_varint(0x8), varint_7bit_positive);
         assert_eq!(encode_varint(0x2203), varint_14bit_positive);
@@ -410,4 +498,3 @@ mod tests {
         assert_eq!(encode_varint(0xc000000100000010), varint_64bit_positive);
     }
 }
-

+ 10 - 10
src/server.rs

@@ -4,8 +4,8 @@ use std::sync::Arc;
 
 use tokio::net::{TcpListener, TcpStream};
 use tokio::sync::RwLock;
-use tokio_rustls::{server::TlsStream, TlsAcceptor};
 use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
+use tokio_rustls::{server::TlsStream, TlsAcceptor};
 
 use crate::client::{Client, Message, ResponseMessage};
 use crate::connection::{Connection, ConnectionConfig};
@@ -25,13 +25,12 @@ pub async fn run(config: Config) -> std::io::Result<()> {
     let db = Arc::new(Db::open(&config.path_to_db_file));
 
     let mut tls_config = ServerConfig::new(NoClientAuth::new());
-    tls_config.set_single_cert(vec![config.certificate], config.private_key)
+    tls_config
+        .set_single_cert(vec![config.certificate], config.private_key)
         .expect("Invalid private key");
 
     let acceptor = TlsAcceptor::from(Arc::new(tls_config));
-    let listener = TcpListener::bind(
-        SocketAddr::new(config.ip_address, config.port)
-    ).await?;
+    let listener = TcpListener::bind(SocketAddr::new(config.ip_address, config.port)).await?;
 
     let clients = Arc::new(RwLock::new(HashMap::new()));
     loop {
@@ -54,7 +53,8 @@ async fn process(db: Arc<Db>, stream: TlsStream<TcpStream>, clients: Clients) {
         max_bandwidth: 128000,
         welcome_text: "Welcome!".to_string(),
     };
-    let connection = match Connection::setup_connection(db.clone(), stream, connection_config).await {
+    let connection = match Connection::setup_connection(db.clone(), stream, connection_config).await
+    {
         Ok(connection) => connection,
         Err(_) => {
             eprintln!("Error establishing a connection");
@@ -89,13 +89,13 @@ async fn process(db: Arc<Db>, stream: TlsStream<TcpStream>, clients: Clients) {
             }
             ResponseMessage::Talking(audio_data) => {
                 let clients = clients.read().await;
-                for client in clients.values().filter(|client| client.session_id != session_id) {
+                for client in clients
+                    .values()
+                    .filter(|client| client.session_id != session_id)
+                {
                     client.post_message(Message::UserTalking(audio_data.clone()));
                 }
             }
         }
     }
 }
-
-
-