use crate::protocol::connection::{ControlChannel, ControlChannelStats}; use async_trait::async_trait; use futures::stream::SplitSink; use futures::stream::SplitStream; use futures::stream::StreamExt; use futures::stream::TryStreamExt; use futures::SinkExt; use mumble_protocol::control::ControlPacket; use mumble_protocol::control::ServerControlCodec; use mumble_protocol::voice::{Clientbound, Serverbound}; use std::io::Error as IoError; use std::io::Error; use std::io::ErrorKind; use std::sync::atomic::{AtomicU32, Ordering}; use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio_rustls::server::TlsStream; use tokio_util::codec::Decoder; use tokio_util::codec::Framed; pub struct TcpControlChannel { received: AtomicU32, sink: Mutex< SplitSink, ServerControlCodec>, ControlPacket>, >, stream: Mutex, ServerControlCodec>>>, } impl TcpControlChannel { pub fn new(stream: TlsStream) -> Self { let (sink, stream) = ServerControlCodec::new().framed(stream).split(); TcpControlChannel { sink: Mutex::new(sink), stream: Mutex::new(stream), received: AtomicU32::new(0), } } } #[async_trait] impl ControlChannel for TcpControlChannel { async fn send(&self, message: ControlPacket) -> Result<(), Error> { let mut sink = self.sink.lock().await; Ok(sink.send(message).await?) } async fn receive(&self) -> Result, Error> { let mut stream = self.stream.lock().await; let message = stream.try_next().await?; match message { Some(msg) => { self.received.fetch_add(1, Ordering::Relaxed); Ok(msg) } None => Err(IoError::new(ErrorKind::BrokenPipe, "stream closed")), } } fn get_stats(&self) -> ControlChannelStats { ControlChannelStats { received: self.received.load(Ordering::Acquire), } } }