1
0

3 Revīzijas 19365465d5 ... 65f8b21c6a

Autors SHA1 Ziņojums Datums
  Frans Bergman 65f8b21c6a Do not leave when a song is loading 1 gadu atpakaļ
  Frans Bergman 3dedaf6494 Use a single Mutex for entire AudioState 1 gadu atpakaļ
  Frans Bergman fab707e6c3 WIP: Only remove song once it has been played 1 gadu atpakaļ
4 mainītis faili ar 126 papildinājumiem un 103 dzēšanām
  1. 23 13
      src/audio/audio.rs
  2. 94 89
      src/audio/audio_state.rs
  3. 1 1
      src/audio/song.rs
  4. 8 0
      src/audio/song_queue.rs

+ 23 - 13
src/audio/audio.rs

@@ -33,10 +33,10 @@ use crate::util::{message_react, send_embed};
 struct Audio;
 
 lazy_static! {
-    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<AudioState>>> = Mutex::new(HashMap::new());
+    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<Mutex<AudioState>>>> = Mutex::new(HashMap::new());
 }
 
-async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>> {
+async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<Mutex<AudioState>>> {
     let guild = msg.guild(&ctx.cache).await.unwrap();
     let guild_id = guild.id;
 
@@ -147,7 +147,7 @@ async fn disconnect(ctx: &Context, msg: &Message) -> CommandResult {
 
 #[command]
 async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
-    let query = args.rest();
+    let query = args.rest().to_string();
 
     message_react(ctx, msg, "🎶").await;
 
@@ -157,16 +157,26 @@ async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
         None => return Ok(()),
     };
 
-    match Song::from_query(msg.author.clone(), query).await {
-        Ok(song) => {
-            AudioState::add_audio(audio_state, song).await;
-            message_react(ctx, msg, "✅").await;
-        }
-        Err(why) => {
-            message_react(ctx, msg, "❎").await;
-            send_embed(ctx, msg, &format!("Error: {}", why)).await;
-        }
-    }
+    let song = {
+        let ctx = ctx.clone();
+        let msg = msg.clone();
+        tokio::spawn(async move {
+            let song = Song::from_query(msg.author.clone(), query).await;
+            match song {
+                Ok(song) => {
+                    message_react(&ctx, &msg, "✅").await;
+                    Some(song)
+                }
+                Err(why) => {
+                    message_react(&ctx, &msg, "❎").await;
+                    send_embed(&ctx, &msg, &format!("Error: {}", why)).await;
+                    None
+                }
+            }
+        })
+    };
+
+    AudioState::add_audio(audio_state, song).await;
 
     Ok(())
 }

+ 94 - 89
src/audio/audio_state.rs

@@ -13,43 +13,47 @@ use songbird::{
     Call, Event, EventContext, EventHandler as VoiceEventHandler, TrackEvent,
 };
 use std::time::Duration;
-use std::{mem::drop, sync::Arc};
+use std::sync::Arc;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
+use tokio::task::JoinHandle;
 
 pub struct AudioState {
-    queue: Arc<Mutex<SongQueue>>,
+    queue: SongQueue,
+    in_flight: usize,
     handler: Arc<SerenityMutex<Call>>,
-    current_song: Mutex<Option<Song>>,
-    track_handle: Mutex<Option<TrackHandle>>,
-    is_looping: Mutex<bool>,
-    volume: Mutex<f32>,
+    current_song: Option<Song>,
+    track_handle: Option<TrackHandle>,
+    is_looping: bool,
+    volume: f32,
 
-    channel_id: Mutex<ChannelId>,
-    http: Mutex<Arc<Http>>,
+    channel_id: ChannelId,
+    http: Arc<Http>,
 }
 
 impl AudioState {
-    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<AudioState> {
+    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<Mutex<AudioState>> {
         let audio_state = AudioState {
-            queue: Arc::new(Mutex::new(SongQueue::new())),
+            queue: SongQueue::new(),
+            in_flight: 0,
             handler,
-            current_song: Mutex::new(None),
-            track_handle: Mutex::new(None),
-            is_looping: Mutex::new(false),
-            volume: Mutex::new(1.0),
+            current_song: None,
+            track_handle: None,
+            is_looping: false,
+            volume: 1.0,
 
-            channel_id: Mutex::new(msg.channel_id),
-            http: Mutex::new(ctx.http.clone()),
+            channel_id: msg.channel_id,
+            http: ctx.http.clone(),
         };
-        let audio_state = Arc::new(audio_state);
+        let audio_state = Arc::new(Mutex::new(audio_state));
         let my_audio_state = audio_state.clone();
         tokio::spawn(async move {
             // Leave if no music is playing within 1 minute
             sleep(Duration::from_secs(60)).await;
-            let current_song = my_audio_state.current_song.lock().await;
+            let audio_state = my_audio_state.lock().await;
+            let current_song = audio_state.current_song.clone();
             if current_song.is_none() {
-                let mut handler = my_audio_state.handler.lock().await;
+                let mut handler = audio_state.handler.lock().await;
                 if let Err(e) = handler.leave().await {
                     println!("Automatic leave failed: {:?}", e);
                 }
@@ -58,34 +62,29 @@ impl AudioState {
         audio_state
     }
 
-    pub async fn set_context(audio_state: Arc<AudioState>, ctx: &Context, msg: &Message) {
-        {
-            let mut channel_id = audio_state.channel_id.lock().await;
-            *channel_id = msg.channel_id;
-        }
-        {
-            let mut http = audio_state.http.lock().await;
-            *http = ctx.http.clone();
-        }
+    pub async fn set_context(audio_state: Arc<Mutex<AudioState>>, ctx: &Context, msg: &Message) {
+        let mut state = audio_state.lock().await;
+        state.channel_id = msg.channel_id;
+        state.http = ctx.http.clone();
     }
 
-    async fn play_audio(audio_state: Arc<AudioState>) {
-        let is_looping = audio_state.is_looping.lock().await;
-        let mut queue = audio_state.queue.lock().await;
-        let song = if *is_looping {
-            let mut current_song = audio_state.current_song.lock().await;
-            current_song.take()
+    async fn play_audio(audio_state: Arc<Mutex<AudioState>>) {
+        let mut state = audio_state.lock().await;
+        let song = if state.is_looping {
+            state.current_song.take()
         } else {
-            queue.pop()
+            state.queue.peek()
         };
-        drop(is_looping);
 
         let song = match song {
             Some(song) => song,
             None => {
-                let mut handler = audio_state.handler.lock().await;
-                if let Err(e) = handler.leave().await {
-                    println!("Error leaving channel: {:?}", e);
+                state.current_song = None;
+                if state.in_flight == 0 {
+                    let mut handler = state.handler.lock().await;
+                    if let Err(e) = handler.leave().await {
+                        println!("Error leaving channel: {:?}", e);
+                    }
                 }
                 return;
             }
@@ -102,11 +101,11 @@ impl AudioState {
         let reader = Reader::Extension(source);
         let source = input::Input::float_pcm(true, reader);
 
-        let mut handler = audio_state.handler.lock().await;
-        let volume = audio_state.volume.lock().await;
+        let handler = state.handler.clone();
+        let mut handler = handler.lock().await;
 
         let handle = handler.play_source(source);
-        if let Err(e) = handle.set_volume(*volume) {
+        if let Err(e) = handle.set_volume(state.volume) {
             println!("{}", e);
         }
 
@@ -120,39 +119,46 @@ impl AudioState {
         }
         {
             let text = song.get_string();
-            let channel_id = audio_state.channel_id.lock().await;
-            let http = audio_state.http.lock().await;
             send_embed_http(
-                *channel_id,
-                http.clone(),
+                state.channel_id,
+                state.http.clone(),
                 &format!("Now playing:\n\n {}", text),
             )
             .await;
         }
-        let mut current_song = audio_state.current_song.lock().await;
-        *current_song = Some(song);
-        let mut track_handle = audio_state.track_handle.lock().await;
-        *track_handle = Some(handle);
+        state.current_song = Some(song);
+        state.track_handle = Some(handle);
     }
 
-    pub async fn add_audio(audio_state: Arc<AudioState>, song: Song) {
-        let mut queue = audio_state.queue.lock().await;
-        queue.push(vec![song]);
-        let current_song = audio_state.current_song.lock().await;
-        if current_song.is_none() {
-            let audio_state = audio_state.clone();
-            tokio::spawn(async {
-                AudioState::play_audio(audio_state).await;
-            });
+    pub async fn add_audio(audio_state: Arc<Mutex<AudioState>>, song: JoinHandle<Option<Song>>) {
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight += 1;
+        }
+        {
+            if let Ok(Some(song)) = song.await {
+                let mut state = audio_state.lock().await;
+                state.queue.push(vec![song]);
+                if state.current_song.is_none() {
+                    let audio_state = audio_state.clone();
+                    tokio::spawn(async {
+                        AudioState::play_audio(audio_state).await;
+                    });
+                }
+            }
+        }
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight -= 1;
         }
     }
 
     pub async fn send_track_command(
-        audio_state: Arc<AudioState>,
+        audio_state: Arc<Mutex<AudioState>>,
         cmd: TrackCommand,
     ) -> Result<(), String> {
-        let track_handle = audio_state.track_handle.lock().await;
-        match track_handle.as_ref() {
+        let state = audio_state.lock().await;
+        match state.track_handle.as_ref() {
             Some(track_handle) => match track_handle.send(cmd) {
                 Ok(()) => Ok(()),
                 Err(why) => Err(format!("{:?}", why)),
@@ -161,62 +167,61 @@ impl AudioState {
         }
     }
 
-    pub async fn shuffle(audio_state: Arc<AudioState>) -> Result<(), String> {
-        let mut queue = audio_state.queue.lock().await;
-        queue.shuffle()
+    pub async fn shuffle(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.shuffle()
     }
 
-    pub async fn clear(audio_state: Arc<AudioState>) -> Result<(), String> {
-        let mut queue = audio_state.queue.lock().await;
-        queue.clear()
+    pub async fn clear(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.clear()
     }
 
     // on success, returns a bool that specifies whether the queue is now being looped
-    pub async fn change_looping(audio_state: Arc<AudioState>) -> Result<bool, String> {
-        {
-            let current_song = audio_state.current_song.lock().await;
-            if current_song.is_none() {
-                return Err("no song is playing".to_string());
-            }
+    pub async fn change_looping(audio_state: Arc<Mutex<AudioState>>) -> Result<bool, String> {
+        let mut state = audio_state.lock().await;
+        if state.current_song.is_none() {
+            return Err("no song is playing".to_string());
         }
-        let mut is_looping = audio_state.is_looping.lock().await;
-        *is_looping = !*is_looping;
-        Ok(*is_looping)
+        state.is_looping = !state.is_looping;
+        Ok(state.is_looping)
     }
 
-    pub async fn set_volume(audio_state: Arc<AudioState>, new_volume: f32) -> Result<(), String> {
-        let mut track_handle = audio_state.track_handle.lock().await;
-        let mut volume = audio_state.volume.lock().await;
-
-        *volume = new_volume / 100.0;
+    pub async fn set_volume(audio_state: Arc<Mutex<AudioState>>, new_volume: f32) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        let new_volume = new_volume / 100.0;
 
-        track_handle.as_mut().map(move |handle| handle.set_volume(*volume));
+        state.volume = new_volume;
+        state.track_handle.as_mut().map(move |handle| handle.set_volume(new_volume));
 
         Ok(())
     }
 
-    pub async fn get_string(audio_state: Arc<AudioState>) -> String {
-        let current_song = audio_state.current_song.lock().await;
-        let current_song = match &*current_song {
+    pub async fn get_string(audio_state: Arc<Mutex<AudioState>>) -> String {
+        let state = audio_state.lock().await;
+        let current_song = match state.current_song.clone() {
             Some(song) => song.get_string(),
             None => "*Not playing*\n".to_string(),
         };
-        let queue = audio_state.queue.lock().await;
         format!(
             "**Current Song:**\n{}\n\n**Queue:**\n{}",
             current_song,
-            queue.get_string()
+            state.queue.get_string()
         )
     }
 }
 
 struct SongEndNotifier {
-    audio_state: Arc<AudioState>,
+    audio_state: Arc<Mutex<AudioState>>,
 }
 
 #[async_trait]
 impl VoiceEventHandler for SongEndNotifier {
     async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
+        {
+            let mut state = self.audio_state.lock().await;
+            state.queue.pop();
+        }
         AudioState::play_audio(self.audio_state.clone()).await;
 
         None

+ 1 - 1
src/audio/song.rs

@@ -11,7 +11,7 @@ pub struct Song {
 }
 
 impl Song {
-    pub async fn from_query(user: User, query: &str) -> Result<Song, std::io::Error> {
+    pub async fn from_query(user: User, query: String) -> Result<Song, std::io::Error> {
         let query = if query.contains("watch?v=") {
             query.to_string()
         } else {

+ 8 - 0
src/audio/song_queue.rs

@@ -32,6 +32,14 @@ impl SongQueue {
             deque.push_back(song);
         }
     }
+    pub fn peek(&self) -> Option<Song> {
+        let user = match self.users.front() {
+            Some(user) => user,
+            None => return None,
+        };
+        let deque = self.queues.get(&user).unwrap();
+        deque.front().cloned()
+    }
     pub fn pop(&mut self) -> Option<Song> {
         let user = match self.users.pop_front() {
             Some(user) => user,