|
@@ -1,25 +1,26 @@
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
+use log::error;
|
|
|
use tokio::sync::mpsc;
|
|
|
use tokio::sync::mpsc::{Receiver, Sender};
|
|
|
use tokio::task::JoinHandle;
|
|
|
-use log::{info, error};
|
|
|
|
|
|
use crate::client::Error::StreamError;
|
|
|
-use crate::connection::{AudioChannel, AudioChannelSender, ControlChannel, ControlChannelSender};
|
|
|
+use crate::connection::{AudioChannel, ControlChannel};
|
|
|
use crate::db::{Db, User};
|
|
|
use crate::proto::mumble::{
|
|
|
ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync, UserRemove, UserState,
|
|
|
Version,
|
|
|
};
|
|
|
use crate::protocol::{AudioData, AudioPacket, MumblePacket, MUMBLE_PROTOCOL_VERSION};
|
|
|
+use std::sync::atomic::Ordering;
|
|
|
|
|
|
pub struct Client {
|
|
|
pub session_id: u32,
|
|
|
- inner_sender: Sender<InnerMessage>,
|
|
|
+ inner_event_sender: Sender<InnerEvent>,
|
|
|
handler_task: JoinHandle<()>,
|
|
|
- packet_task: JoinHandle<()>,
|
|
|
- audio_task: Option<JoinHandle<()>>,
|
|
|
+ control_channel_task: JoinHandle<()>,
|
|
|
+ audio_channel_task: Option<JoinHandle<()>>,
|
|
|
}
|
|
|
|
|
|
pub struct Config {
|
|
@@ -38,17 +39,17 @@ pub struct Config {
|
|
|
pub max_image_message_length: u32,
|
|
|
}
|
|
|
|
|
|
-pub enum Message {
|
|
|
+pub enum ClientEvent {
|
|
|
+ Talking(AudioData),
|
|
|
+ Disconnected,
|
|
|
+}
|
|
|
+
|
|
|
+pub enum ServerEvent {
|
|
|
UserConnected(u32),
|
|
|
UserDisconnected(u32),
|
|
|
UserTalking(AudioData),
|
|
|
}
|
|
|
|
|
|
-pub enum ResponseMessage {
|
|
|
- Disconnected,
|
|
|
- Talking(AudioData),
|
|
|
-}
|
|
|
-
|
|
|
pub enum Error {
|
|
|
AuthenticationError,
|
|
|
StreamError,
|
|
@@ -56,30 +57,28 @@ pub enum Error {
|
|
|
}
|
|
|
|
|
|
struct Handler {
|
|
|
- session_id: u32,
|
|
|
db: Arc<Db>,
|
|
|
- control_channel_sender: ControlChannelSender,
|
|
|
- audio_channel_sender: Option<AudioChannelSender>,
|
|
|
- response_sender: Sender<ResponseMessage>,
|
|
|
- is_audio_tunneling: bool,
|
|
|
+ control_channel: Arc<ControlChannel>,
|
|
|
+ audio_channel: Option<Arc<AudioChannel>>,
|
|
|
+ client_event_sender: Sender<ClientEvent>,
|
|
|
+ session_id: u32,
|
|
|
+ crypto_resyncs: u32,
|
|
|
}
|
|
|
|
|
|
-enum InnerMessage {
|
|
|
- Message(Message),
|
|
|
- Packet(Box<MumblePacket>),
|
|
|
- Audio(AudioPacket),
|
|
|
- AudioChannel(AudioChannelSender),
|
|
|
+enum InnerEvent {
|
|
|
+ ServerEvent(ServerEvent),
|
|
|
+ ControlPacket(MumblePacket),
|
|
|
+ AudioPacket(AudioPacket),
|
|
|
+ AudioChannel(Arc<AudioChannel>),
|
|
|
SelfDisconnected,
|
|
|
}
|
|
|
|
|
|
-type Responder = Receiver<ResponseMessage>;
|
|
|
-
|
|
|
impl Client {
|
|
|
pub async fn establish_connection(
|
|
|
db: Arc<Db>,
|
|
|
- mut control_channel: ControlChannel,
|
|
|
+ control_channel: ControlChannel,
|
|
|
config: Config,
|
|
|
- ) -> Result<(Self, Responder), Error> {
|
|
|
+ ) -> Result<(Self, Receiver<ClientEvent>), Error> {
|
|
|
match control_channel.receive().await? {
|
|
|
MumblePacket::Version(version) => version,
|
|
|
_ => return Err(Error::WrongPacket),
|
|
@@ -160,12 +159,8 @@ impl Client {
|
|
|
control_channel.send(version).await?;
|
|
|
control_channel.send(crypt_setup).await?;
|
|
|
control_channel.send(codec_version).await?;
|
|
|
- for channel_state in channel_states {
|
|
|
- control_channel.send(channel_state).await?;
|
|
|
- }
|
|
|
- for user_state in user_states {
|
|
|
- control_channel.send(user_state).await?;
|
|
|
- }
|
|
|
+ control_channel.send_multiple(channel_states).await?;
|
|
|
+ control_channel.send_multiple(user_states).await?;
|
|
|
control_channel.send(server_sync).await?;
|
|
|
control_channel.send(server_config).await?;
|
|
|
|
|
@@ -174,152 +169,134 @@ impl Client {
|
|
|
}
|
|
|
|
|
|
pub async fn set_audio_channel(&mut self, audio_channel: AudioChannel) {
|
|
|
- let (mut receiver, sender) = audio_channel.split();
|
|
|
- let inner_sender = self.inner_sender.clone();
|
|
|
- self.audio_task = Some(tokio::spawn(async move {
|
|
|
+ let audio_channel = Arc::new(audio_channel);
|
|
|
+ let channel = Arc::clone(&audio_channel);
|
|
|
+ let sender = self.inner_event_sender.clone();
|
|
|
+ self.audio_channel_task = Some(tokio::spawn(async move {
|
|
|
loop {
|
|
|
- match receiver.receive().await {
|
|
|
+ match channel.receive().await {
|
|
|
Ok(packet) => {
|
|
|
- inner_sender.try_send(InnerMessage::Audio(packet));
|
|
|
+ sender.send(InnerEvent::AudioPacket(packet)).await;
|
|
|
}
|
|
|
- Err(_) => return,
|
|
|
+ Err(_) => break,
|
|
|
}
|
|
|
}
|
|
|
}));
|
|
|
|
|
|
- self.inner_sender
|
|
|
- .send(InnerMessage::AudioChannel(sender))
|
|
|
+ self.inner_event_sender
|
|
|
+ .send(InnerEvent::AudioChannel(audio_channel))
|
|
|
.await;
|
|
|
}
|
|
|
|
|
|
- pub async fn send_message(&self, message: Message) {
|
|
|
- match message {
|
|
|
- Message::UserTalking(_) => {
|
|
|
- self.inner_sender.try_send(InnerMessage::Message(message));
|
|
|
- }
|
|
|
- _ => {
|
|
|
- self.inner_sender.send(InnerMessage::Message(message)).await;
|
|
|
- }
|
|
|
- }
|
|
|
+ pub async fn send_event(&self, event: ServerEvent) {
|
|
|
+ self.inner_event_sender
|
|
|
+ .send(InnerEvent::ServerEvent(event))
|
|
|
+ .await;
|
|
|
}
|
|
|
|
|
|
async fn new(
|
|
|
control_channel: ControlChannel,
|
|
|
db: Arc<Db>,
|
|
|
session_id: u32,
|
|
|
- ) -> (Client, Responder) {
|
|
|
- let (inner_sender, mut inner_receiver) = mpsc::channel(2);
|
|
|
- let (response_sender, response_receiver) = mpsc::channel(2);
|
|
|
-
|
|
|
- let (mut control_channel_receiver, control_channel_sender) = control_channel.split();
|
|
|
- let handler_task = tokio::spawn(async move {
|
|
|
- let mut handler = Handler {
|
|
|
- session_id,
|
|
|
- db,
|
|
|
- control_channel_sender,
|
|
|
- audio_channel_sender: None,
|
|
|
- response_sender,
|
|
|
- is_audio_tunneling: false,
|
|
|
- };
|
|
|
+ ) -> (Self, Receiver<ClientEvent>) {
|
|
|
+ let (inner_event_sender, inner_event_receiver) = mpsc::channel(1);
|
|
|
+ let (client_event_sender, response_receiver) = mpsc::channel(1);
|
|
|
+ let control_channel = Arc::new(control_channel);
|
|
|
+ let handler = Handler {
|
|
|
+ db,
|
|
|
+ control_channel: Arc::clone(&control_channel),
|
|
|
+ audio_channel: None,
|
|
|
+ client_event_sender,
|
|
|
+ session_id,
|
|
|
+ crypto_resyncs: 0,
|
|
|
+ };
|
|
|
+ let client = Client {
|
|
|
+ session_id,
|
|
|
+ inner_event_sender: inner_event_sender.clone(),
|
|
|
+ handler_task: Self::run_handler_task(handler, inner_event_receiver).await,
|
|
|
+ control_channel_task: Self::run_control_channel_task(
|
|
|
+ control_channel,
|
|
|
+ inner_event_sender,
|
|
|
+ )
|
|
|
+ .await,
|
|
|
+ audio_channel_task: None,
|
|
|
+ };
|
|
|
+
|
|
|
+ return (client, response_receiver);
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn run_handler_task(
|
|
|
+ mut handler: Handler,
|
|
|
+ mut inner_event_receiver: Receiver<InnerEvent>,
|
|
|
+ ) -> JoinHandle<()> {
|
|
|
+ tokio::spawn(async move {
|
|
|
loop {
|
|
|
- let message = match inner_receiver.recv().await {
|
|
|
+ let message = match inner_event_receiver.recv().await {
|
|
|
Some(msg) => msg,
|
|
|
- None => return,
|
|
|
+ None => {
|
|
|
+ error!("Handler task closed unexpectedly");
|
|
|
+ break;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
- match message {
|
|
|
- InnerMessage::Message(msg) => {
|
|
|
- let result = handler.handle_message(msg).await;
|
|
|
- if result.is_err() {
|
|
|
- handler.self_disconnected().await;
|
|
|
- return;
|
|
|
- }
|
|
|
+ let result = match message {
|
|
|
+ InnerEvent::ServerEvent(event) => handler.handle_server_event(event).await,
|
|
|
+ InnerEvent::ControlPacket(packet) => {
|
|
|
+ handler.handle_control_packet(packet).await
|
|
|
}
|
|
|
- InnerMessage::Packet(packet) => {
|
|
|
- let result = handler.handle_mumble_packet(*packet).await;
|
|
|
- if result.is_err() {
|
|
|
- handler.self_disconnected().await;
|
|
|
- return;
|
|
|
- }
|
|
|
+ InnerEvent::AudioPacket(audio) => handler.handle_audio_packet(audio).await,
|
|
|
+ InnerEvent::AudioChannel(channel) => {
|
|
|
+ handler.audio_channel = Some(channel);
|
|
|
+ Ok(())
|
|
|
}
|
|
|
- InnerMessage::SelfDisconnected => {
|
|
|
+ InnerEvent::SelfDisconnected => {
|
|
|
handler.self_disconnected().await;
|
|
|
- return;
|
|
|
- }
|
|
|
- InnerMessage::Audio(audio) => {
|
|
|
- let result = handler.handle_audio_packet(audio).await;
|
|
|
- if result.is_err() {
|
|
|
- handler.self_disconnected().await;
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
- InnerMessage::AudioChannel(sender) => {
|
|
|
- handler.audio_channel_sender = Some(sender)
|
|
|
+ break;
|
|
|
}
|
|
|
+ };
|
|
|
+
|
|
|
+ if let Err(_) = result {
|
|
|
+ // TODO
|
|
|
+ error!("Handler task error");
|
|
|
}
|
|
|
}
|
|
|
- });
|
|
|
+ })
|
|
|
+ }
|
|
|
|
|
|
- let sender = inner_sender.clone();
|
|
|
- let packet_task = tokio::spawn(async move {
|
|
|
+ async fn run_control_channel_task(
|
|
|
+ control_channel: Arc<ControlChannel>,
|
|
|
+ sender: Sender<InnerEvent>,
|
|
|
+ ) -> JoinHandle<()> {
|
|
|
+ tokio::spawn(async move {
|
|
|
loop {
|
|
|
- match control_channel_receiver.receive().await {
|
|
|
- Ok(packet) => sender.send(InnerMessage::Packet(Box::from(packet))).await,
|
|
|
+ match control_channel.receive().await {
|
|
|
+ Ok(packet) => sender.send(InnerEvent::ControlPacket(packet)).await,
|
|
|
Err(_) => {
|
|
|
- sender.send(InnerMessage::SelfDisconnected).await;
|
|
|
+ // TODO
|
|
|
+ sender.send(InnerEvent::SelfDisconnected).await;
|
|
|
return;
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
- });
|
|
|
-
|
|
|
- return (
|
|
|
- Client {
|
|
|
- session_id,
|
|
|
- inner_sender,
|
|
|
- handler_task,
|
|
|
- packet_task,
|
|
|
- audio_task: None,
|
|
|
- },
|
|
|
- response_receiver,
|
|
|
- );
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl Drop for Client {
|
|
|
- fn drop(&mut self) {
|
|
|
- self.handler_task.abort();
|
|
|
- self.packet_task.abort();
|
|
|
- if let Some(audio_task) = self.audio_task.as_ref() {
|
|
|
- audio_task.abort();
|
|
|
+impl Handler {
|
|
|
+ async fn handle_server_event(&self, message: ServerEvent) -> Result<(), Error> {
|
|
|
+ match message {
|
|
|
+ ServerEvent::UserConnected(session_id) => self.new_user_connected(session_id).await?,
|
|
|
+ ServerEvent::UserDisconnected(session_id) => self.user_disconnected(session_id).await?,
|
|
|
+ ServerEvent::UserTalking(audio_data) => self.user_talking(audio_data).await?,
|
|
|
}
|
|
|
+
|
|
|
+ Ok(())
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-impl Handler {
|
|
|
- async fn handle_mumble_packet(&mut self, packet: MumblePacket) -> Result<(), Error> {
|
|
|
+ async fn handle_control_packet(&self, packet: MumblePacket) -> Result<(), Error> {
|
|
|
match packet {
|
|
|
- MumblePacket::Ping(ping) => {
|
|
|
- if ping.has_timestamp() {
|
|
|
- let mut ping = Ping::new();
|
|
|
- ping.set_timestamp(ping.get_timestamp());
|
|
|
- self.control_channel_sender
|
|
|
- .send(MumblePacket::Ping(ping))
|
|
|
- .await?;
|
|
|
- }
|
|
|
- }
|
|
|
- MumblePacket::UdpTunnel(voice) => match voice {
|
|
|
- AudioPacket::Ping(_) => {
|
|
|
- self.control_channel_sender
|
|
|
- .send(MumblePacket::UdpTunnel(voice))
|
|
|
- .await?;
|
|
|
- }
|
|
|
- AudioPacket::AudioData(mut audio_data) => {
|
|
|
- audio_data.session_id = Some(self.session_id);
|
|
|
- self.response_sender
|
|
|
- .try_send(ResponseMessage::Talking(audio_data));
|
|
|
- }
|
|
|
- },
|
|
|
+ MumblePacket::Ping(ping) => self.handle_control_channel_ping(ping).await?,
|
|
|
+ MumblePacket::UdpTunnel(packet) => self.handle_tunnel(packet).await?,
|
|
|
MumblePacket::ChannelRemove(_) => error!("ChannelRemove unimplemented!"),
|
|
|
MumblePacket::ChannelState(_) => error!("ChannelState unimplemented!"),
|
|
|
MumblePacket::UserRemove(_) => error!("UserRemove unimplemented!"),
|
|
@@ -328,99 +305,117 @@ impl Handler {
|
|
|
MumblePacket::TextMessage(_) => error!("TextMessage unimplemented!"),
|
|
|
MumblePacket::QueryUsers(_) => error!("TextMessage unimplemented!"),
|
|
|
MumblePacket::CryptSetup(_) => error!("CryptSetup unimplemented!"),
|
|
|
- MumblePacket::ContextActionModify(_) => error!("ContextActionModify unimplemented!"),
|
|
|
MumblePacket::ContextAction(_) => error!("ContextAction unimplemented!"),
|
|
|
MumblePacket::UserList(_) => error!("UserList unimplemented!"),
|
|
|
MumblePacket::VoiceTarget(_) => error!("VoiceTarget unimplemented!"),
|
|
|
MumblePacket::PermissionQuery(_) => error!("PermissionQuery unimplemented!"),
|
|
|
MumblePacket::UserStats(_) => error!("UserStats unimplemented!"),
|
|
|
MumblePacket::RequestBlob(_) => error!("RequestBlob unimplemented!"),
|
|
|
+ MumblePacket::Acl(_) => error!("Acl unimplemented!"),
|
|
|
// The rest is only sent by the server
|
|
|
_ => return Err(Error::WrongPacket),
|
|
|
}
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- async fn handle_message(&mut self, message: Message) -> Result<(), Error> {
|
|
|
- match message {
|
|
|
- Message::UserConnected(session_id) => self.new_user_connected(session_id).await?,
|
|
|
- Message::UserDisconnected(session_id) => self.user_disconnected(session_id).await?,
|
|
|
- Message::UserTalking(audio_data) => self.user_talking(audio_data).await?,
|
|
|
+ async fn handle_control_channel_ping(&self, ping: Ping) -> Result<(), Error> {
|
|
|
+ let timestamp = ping.get_timestamp();
|
|
|
+ let mut ping = Ping::new();
|
|
|
+ if ping.has_timestamp() {
|
|
|
+ ping.set_timestamp(timestamp);
|
|
|
+ }
|
|
|
+ if let Some(channel) = self.audio_channel.as_ref() {
|
|
|
+ ping.set_good(channel.good.load(Ordering::Acquire));
|
|
|
+ ping.set_late(channel.late.load(Ordering::Acquire));
|
|
|
+ ping.set_lost(channel.lost.load(Ordering::Acquire));
|
|
|
+ ping.set_resync(self.crypto_resyncs);
|
|
|
}
|
|
|
|
|
|
+ self.control_channel.send(MumblePacket::Ping(ping)).await?;
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- async fn handle_audio_packet(&mut self, packet: AudioPacket) -> Result<(), Error> {
|
|
|
+ async fn handle_tunnel(&self, packet: AudioPacket) -> Result<(), Error> {
|
|
|
match packet {
|
|
|
AudioPacket::Ping(_) => {
|
|
|
- if !self.is_audio_tunneling && self.audio_channel_sender.is_some() {
|
|
|
- self.audio_channel_sender
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .send(packet)
|
|
|
- .await?;
|
|
|
- } else {
|
|
|
- self.control_channel_sender
|
|
|
- .send(MumblePacket::UdpTunnel(packet))
|
|
|
- .await?;
|
|
|
- }
|
|
|
+ self.control_channel
|
|
|
+ .send(MumblePacket::UdpTunnel(packet))
|
|
|
+ .await?;
|
|
|
+ }
|
|
|
+ AudioPacket::AudioData(mut audio_data) => {
|
|
|
+ audio_data.session_id = Some(self.session_id);
|
|
|
+ self.client_event_sender
|
|
|
+ .send(ClientEvent::Talking(audio_data))
|
|
|
+ .await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn handle_audio_packet(&self, packet: AudioPacket) -> Result<(), Error> {
|
|
|
+ match packet {
|
|
|
+ AudioPacket::Ping(_) => {
|
|
|
+ self.audio_channel.as_ref().unwrap().send(packet).await;
|
|
|
}
|
|
|
AudioPacket::AudioData(mut audio_data) => {
|
|
|
audio_data.session_id = Some(self.session_id);
|
|
|
- // It isn't critical to lose some audio packets
|
|
|
- self.response_sender
|
|
|
- .try_send(ResponseMessage::Talking(audio_data));
|
|
|
+ self.client_event_sender
|
|
|
+ .send(ClientEvent::Talking(audio_data))
|
|
|
+ .await;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- async fn new_user_connected(&mut self, session_id: u32) -> Result<(), Error> {
|
|
|
+ async fn new_user_connected(&self, session_id: u32) -> Result<(), Error> {
|
|
|
if let Some(user) = self.db.get_user_by_session_id(session_id).await {
|
|
|
- self.control_channel_sender
|
|
|
- .send(MumblePacket::from(user))
|
|
|
- .await?;
|
|
|
+ self.control_channel.send(MumblePacket::from(user)).await?;
|
|
|
}
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- async fn user_disconnected(&mut self, session_id: u32) -> Result<(), Error> {
|
|
|
+ async fn user_disconnected(&self, session_id: u32) -> Result<(), Error> {
|
|
|
let mut user_remove = UserRemove::new();
|
|
|
user_remove.set_session(session_id);
|
|
|
Ok(self
|
|
|
- .control_channel_sender
|
|
|
+ .control_channel
|
|
|
.send(MumblePacket::UserRemove(user_remove))
|
|
|
.await?)
|
|
|
}
|
|
|
|
|
|
- async fn self_disconnected(&mut self) {
|
|
|
+ async fn self_disconnected(&self) {
|
|
|
self.db.remove_connected_user(self.session_id).await;
|
|
|
- self.response_sender
|
|
|
- .send(ResponseMessage::Disconnected)
|
|
|
+ self.client_event_sender
|
|
|
+ .send(ClientEvent::Disconnected)
|
|
|
.await;
|
|
|
}
|
|
|
|
|
|
- async fn user_talking(&mut self, audio_data: AudioData) -> Result<(), Error> {
|
|
|
+ async fn user_talking(&self, audio_data: AudioData) -> Result<(), Error> {
|
|
|
let audio_packet = AudioPacket::AudioData(audio_data);
|
|
|
-
|
|
|
- if !self.is_audio_tunneling && self.audio_channel_sender.is_some() {
|
|
|
- self.audio_channel_sender
|
|
|
- .as_mut()
|
|
|
- .unwrap()
|
|
|
- .send(audio_packet)
|
|
|
- .await?;
|
|
|
+ if let Some(channel) = self.audio_channel.as_ref() {
|
|
|
+ channel.send(audio_packet).await?;
|
|
|
} else {
|
|
|
- self.control_channel_sender
|
|
|
+ self.control_channel
|
|
|
.send(MumblePacket::UdpTunnel(audio_packet))
|
|
|
- .await?;
|
|
|
+ .await;
|
|
|
}
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+impl Drop for Client {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ self.handler_task.abort();
|
|
|
+ self.control_channel_task.abort();
|
|
|
+ if let Some(audio_task) = self.audio_channel_task.as_ref() {
|
|
|
+ audio_task.abort();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl From<User> for UserState {
|
|
|
fn from(user: User) -> Self {
|
|
|
let mut user_state = UserState::new();
|