11 Incheckningar e876a0d594 ... bac5b3e138

Upphovsman SHA1 Meddelande Datum
  Frans Bergman bac5b3e138 Correct ffmpeg output format argument 1 månad sedan
  Frans Bergman 316a092a05 Update crates 1 månad sedan
  Frans Bergman aca10a6533 Update Discord API libraries 1 år sedan
  Frans Bergman a97380f26a Switch to yt-dlp instead of youtube-dl 1 år sedan
  Frans Bergman 23060fe13c Do not leave when a song is loading 1 år sedan
  Frans Bergman 744606c542 Use a single Mutex for entire AudioState 1 år sedan
  Frans Bergman a3431ab84a Only remove song once it has been played 1 år sedan
  Frans Bergman 519d616555 Add volume command 2 år sedan
  Frans Bergman ac11c8079b Implement fair queueing 2 år sedan
  Frans Bergman 8da8d57c37 Move SongQueue locking to audio_state.rs 2 år sedan
  Frans Bergman fb800f7bcb Get video information in JSON format 2 år sedan
8 ändrade filer med 574 tillägg och 413 borttagningar
  1. 334 275
      Cargo.lock
  2. 4 3
      Cargo.toml
  3. 45 16
      src/audio/audio.rs
  4. 101 74
      src/audio/audio_state.rs
  5. 11 8
      src/audio/song.rs
  6. 67 25
      src/audio/song_queue.rs
  7. 8 10
      src/audio/subprocess.rs
  8. 4 2
      src/main.rs

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 334 - 275
Cargo.lock


+ 4 - 3
Cargo.toml

@@ -10,7 +10,8 @@ edition = "2018"
 futures = "0.3.5"
 lazy_static = "1.4.0"
 rand = "0.8.3"
-serenity = {version = "0.10", features = ["standard_framework", "voice", "rustls_backend", "cache", "framework"] }
-songbird = "0.2.0-beta.4"
+serenity = {version = "0.11", features = ["standard_framework", "voice", "rustls_backend", "cache", "framework"] }
+songbird = "0.3.2"
+symphonia-core = "0.5.2"
 tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time", "sync"] }
-symphonia-core = "0.2.0"
+serde_json = "1.0.81"

+ 45 - 16
src/audio/audio.rs

@@ -25,6 +25,7 @@ use crate::util::{message_react, send_embed};
     pause,
     resume,
     change_loop,
+    volume,
     shuffle,
     clear,
     queue
@@ -32,11 +33,11 @@ use crate::util::{message_react, send_embed};
 struct Audio;
 
 lazy_static! {
-    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<AudioState>>> = Mutex::new(HashMap::new());
+    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<Mutex<AudioState>>>> = Mutex::new(HashMap::new());
 }
 
-async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>> {
-    let guild = msg.guild(&ctx.cache).await.unwrap();
+async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<Mutex<AudioState>>> {
+    let guild = msg.guild(&ctx.cache).unwrap();
     let guild_id = guild.id;
 
     let mut audio_states = AUDIO_STATES.lock().await;
@@ -87,7 +88,7 @@ async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>
     let mut call = call_lock.lock().await;
 
     if call.current_channel() != Some(channel_id.into()) {
-        if let Err(err) = call.join(channel_id.into()).await {
+        if let Err(err) = call.join(channel_id).await {
             println!("Error joining call: {:?}", err);
         }
     }
@@ -107,7 +108,7 @@ async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>
 }
 
 async fn remove_audio_state(ctx: &Context, msg: &Message) -> Result<(), String> {
-    let guild = msg.guild(&ctx.cache).await.unwrap();
+    let guild = msg.guild(&ctx.cache).unwrap();
     let guild_id = guild.id;
 
     let mut audio_states = AUDIO_STATES.lock().await;
@@ -146,7 +147,7 @@ async fn disconnect(ctx: &Context, msg: &Message) -> CommandResult {
 
 #[command]
 async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
-    let query = args.rest();
+    let query = args.rest().to_string();
 
     message_react(ctx, msg, "🎶").await;
 
@@ -156,16 +157,26 @@ async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
         None => return Ok(()),
     };
 
-    match Song::from_query(query).await {
-        Ok(song) => {
-            AudioState::add_audio(audio_state, song).await;
-            message_react(ctx, msg, "✅").await;
-        }
-        Err(why) => {
-            message_react(ctx, msg, "❎").await;
-            send_embed(ctx, msg, &format!("Error: {}", why)).await;
-        }
-    }
+    let song = {
+        let ctx = ctx.clone();
+        let msg = msg.clone();
+        tokio::spawn(async move {
+            let song = Song::from_query(msg.author.clone(), query).await;
+            match song {
+                Ok(song) => {
+                    message_react(&ctx, &msg, "✅").await;
+                    Some(song)
+                }
+                Err(why) => {
+                    message_react(&ctx, &msg, "❎").await;
+                    send_embed(&ctx, &msg, &format!("Error: {}", why)).await;
+                    None
+                }
+            }
+        })
+    };
+
+    AudioState::add_audio(audio_state, song).await;
 
     Ok(())
 }
@@ -268,6 +279,24 @@ async fn change_loop(ctx: &Context, msg: &Message) -> CommandResult {
     Ok(())
 }
 
+#[command]
+#[aliases("vol")]
+async fn volume(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
+    let audio_state = get_audio_state(ctx, msg).await;
+    let audio_state = match audio_state {
+        Some(audio_state) => audio_state,
+        None => return Ok(()),
+    };
+
+    let volume: u8 = args.rest().parse()?;
+
+    match AudioState::set_volume(audio_state, volume.into()).await {
+        Ok(()) => message_react(ctx, msg, "🎶").await,
+        Err(why) => send_embed(ctx, msg, &format!("Error: {}", why)).await,
+    };
+    Ok(())
+}
+
 #[command]
 async fn queue(ctx: &Context, msg: &Message) -> CommandResult {
     let audio_state = get_audio_state(ctx, msg).await;

+ 101 - 74
src/audio/audio_state.rs

@@ -13,41 +13,47 @@ use songbird::{
     Call, Event, EventContext, EventHandler as VoiceEventHandler, TrackEvent,
 };
 use std::time::Duration;
-use std::{mem::drop, sync::Arc};
+use std::sync::Arc;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
+use tokio::task::JoinHandle;
 
 pub struct AudioState {
     queue: SongQueue,
+    in_flight: usize,
     handler: Arc<SerenityMutex<Call>>,
-    current_song: Mutex<Option<Song>>,
-    track_handle: Mutex<Option<TrackHandle>>,
-    is_looping: Mutex<bool>,
+    current_song: Option<Song>,
+    track_handle: Option<TrackHandle>,
+    is_looping: bool,
+    volume: f32,
 
-    channel_id: Mutex<ChannelId>,
-    http: Mutex<Arc<Http>>,
+    channel_id: ChannelId,
+    http: Arc<Http>,
 }
 
 impl AudioState {
-    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<AudioState> {
+    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<Mutex<AudioState>> {
         let audio_state = AudioState {
             queue: SongQueue::new(),
+            in_flight: 0,
             handler,
-            current_song: Mutex::new(None),
-            track_handle: Mutex::new(None),
-            is_looping: Mutex::new(false),
+            current_song: None,
+            track_handle: None,
+            is_looping: false,
+            volume: 1.0,
 
-            channel_id: Mutex::new(msg.channel_id),
-            http: Mutex::new(ctx.http.clone()),
+            channel_id: msg.channel_id,
+            http: ctx.http.clone(),
         };
-        let audio_state = Arc::new(audio_state);
+        let audio_state = Arc::new(Mutex::new(audio_state));
         let my_audio_state = audio_state.clone();
         tokio::spawn(async move {
             // Leave if no music is playing within 1 minute
             sleep(Duration::from_secs(60)).await;
-            let current_song = my_audio_state.current_song.lock().await;
+            let audio_state = my_audio_state.lock().await;
+            let current_song = audio_state.current_song.clone();
             if current_song.is_none() {
-                let mut handler = my_audio_state.handler.lock().await;
+                let mut handler = audio_state.handler.lock().await;
                 if let Err(e) = handler.leave().await {
                     println!("Automatic leave failed: {:?}", e);
                 }
@@ -56,33 +62,29 @@ impl AudioState {
         audio_state
     }
 
-    pub async fn set_context(audio_state: Arc<AudioState>, ctx: &Context, msg: &Message) {
-        {
-            let mut channel_id = audio_state.channel_id.lock().await;
-            *channel_id = msg.channel_id;
-        }
-        {
-            let mut http = audio_state.http.lock().await;
-            *http = ctx.http.clone();
-        }
+    pub async fn set_context(audio_state: Arc<Mutex<AudioState>>, ctx: &Context, msg: &Message) {
+        let mut state = audio_state.lock().await;
+        state.channel_id = msg.channel_id;
+        state.http = ctx.http.clone();
     }
 
-    async fn play_audio(audio_state: Arc<AudioState>) {
-        let is_looping = audio_state.is_looping.lock().await;
-        let song = if *is_looping {
-            let mut current_song = audio_state.current_song.lock().await;
-            current_song.take()
+    async fn play_audio(audio_state: Arc<Mutex<AudioState>>) {
+        let mut state = audio_state.lock().await;
+        let song = if state.is_looping {
+            state.current_song.take()
         } else {
-            audio_state.queue.pop().await
+            state.queue.peek()
         };
-        drop(is_looping);
 
         let song = match song {
             Some(song) => song,
             None => {
-                let mut handler = audio_state.handler.lock().await;
-                if let Err(e) = handler.leave().await {
-                    println!("Error leaving channel: {:?}", e);
+                state.current_song = None;
+                if state.in_flight == 0 {
+                    let mut handler = state.handler.lock().await;
+                    if let Err(e) = handler.leave().await {
+                        println!("Error leaving channel: {:?}", e);
+                    }
                 }
                 return;
             }
@@ -99,9 +101,13 @@ impl AudioState {
         let reader = Reader::Extension(source);
         let source = input::Input::float_pcm(true, reader);
 
-        let mut handler = audio_state.handler.lock().await;
+        let handler = state.handler.clone();
+        let mut handler = handler.lock().await;
 
         let handle = handler.play_source(source);
+        if let Err(e) = handle.set_volume(state.volume) {
+            println!("{}", e);
+        }
 
         if let Err(why) = handle.add_event(
             Event::Track(TrackEvent::End),
@@ -112,39 +118,47 @@ impl AudioState {
             panic!("Err AudioState::play_audio: {:?}", why);
         }
         {
-            let text = song.get_string().await;
-            let channel_id = audio_state.channel_id.lock().await;
-            let http = audio_state.http.lock().await;
+            let text = song.get_string();
             send_embed_http(
-                *channel_id,
-                http.clone(),
+                state.channel_id,
+                state.http.clone(),
                 &format!("Now playing:\n\n {}", text),
             )
             .await;
         }
-        let mut current_song = audio_state.current_song.lock().await;
-        *current_song = Some(song);
-        let mut track_handle = audio_state.track_handle.lock().await;
-        *track_handle = Some(handle);
+        state.current_song = Some(song);
+        state.track_handle = Some(handle);
     }
 
-    pub async fn add_audio(audio_state: Arc<AudioState>, song: Song) {
-        audio_state.queue.push(vec![song]).await;
-        let current_song = audio_state.current_song.lock().await;
-        if current_song.is_none() {
-            let audio_state = audio_state.clone();
-            tokio::spawn(async {
-                AudioState::play_audio(audio_state).await;
-            });
+    pub async fn add_audio(audio_state: Arc<Mutex<AudioState>>, song: JoinHandle<Option<Song>>) {
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight += 1;
+        }
+        {
+            if let Ok(Some(song)) = song.await {
+                let mut state = audio_state.lock().await;
+                state.queue.push(vec![song]);
+                if state.current_song.is_none() {
+                    let audio_state = audio_state.clone();
+                    tokio::spawn(async {
+                        AudioState::play_audio(audio_state).await;
+                    });
+                }
+            }
+        }
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight -= 1;
         }
     }
 
     pub async fn send_track_command(
-        audio_state: Arc<AudioState>,
+        audio_state: Arc<Mutex<AudioState>>,
         cmd: TrackCommand,
     ) -> Result<(), String> {
-        let track_handle = audio_state.track_handle.lock().await;
-        match track_handle.as_ref() {
+        let state = audio_state.lock().await;
+        match state.track_handle.as_ref() {
             Some(track_handle) => match track_handle.send(cmd) {
                 Ok(()) => Ok(()),
                 Err(why) => Err(format!("{:?}", why)),
@@ -153,48 +167,61 @@ impl AudioState {
         }
     }
 
-    pub async fn shuffle(audio_state: Arc<AudioState>) -> Result<(), String> {
-        audio_state.queue.shuffle().await
+    pub async fn shuffle(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.shuffle()
     }
 
-    pub async fn clear(audio_state: Arc<AudioState>) -> Result<(), String> {
-        audio_state.queue.clear().await
+    pub async fn clear(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.clear()
     }
 
     // on success, returns a bool that specifies whether the queue is now being looped
-    pub async fn change_looping(audio_state: Arc<AudioState>) -> Result<bool, String> {
-        {
-            let current_song = audio_state.current_song.lock().await;
-            if current_song.is_none() {
-                return Err("no song is playing".to_string());
-            }
+    pub async fn change_looping(audio_state: Arc<Mutex<AudioState>>) -> Result<bool, String> {
+        let mut state = audio_state.lock().await;
+        if state.current_song.is_none() {
+            return Err("no song is playing".to_string());
         }
-        let mut is_looping = audio_state.is_looping.lock().await;
-        *is_looping = !*is_looping;
-        Ok(*is_looping)
+        state.is_looping = !state.is_looping;
+        Ok(state.is_looping)
     }
 
-    pub async fn get_string(audio_state: Arc<AudioState>) -> String {
-        let current_song = audio_state.current_song.lock().await;
-        let current_song = match &*current_song {
-            Some(song) => song.get_string().await,
+    pub async fn set_volume(audio_state: Arc<Mutex<AudioState>>, new_volume: f32) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        let new_volume = new_volume / 100.0;
+
+        state.volume = new_volume;
+        state.track_handle.as_mut().map(move |handle| handle.set_volume(new_volume));
+
+        Ok(())
+    }
+
+    pub async fn get_string(audio_state: Arc<Mutex<AudioState>>) -> String {
+        let state = audio_state.lock().await;
+        let current_song = match state.current_song.clone() {
+            Some(song) => song.get_string(),
             None => "*Not playing*\n".to_string(),
         };
         format!(
             "**Current Song:**\n{}\n\n**Queue:**\n{}",
             current_song,
-            audio_state.queue.get_string().await
+            state.queue.get_string()
         )
     }
 }
 
 struct SongEndNotifier {
-    audio_state: Arc<AudioState>,
+    audio_state: Arc<Mutex<AudioState>>,
 }
 
 #[async_trait]
 impl VoiceEventHandler for SongEndNotifier {
     async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
+        {
+            let mut state = self.audio_state.lock().await;
+            state.queue.pop();
+        }
         AudioState::play_audio(self.audio_state.clone()).await;
 
         None

+ 11 - 8
src/audio/song.rs

@@ -1,33 +1,36 @@
 use super::subprocess::ytdl;
+use serenity::model::user::User;
 
+#[derive(Clone)]
 pub struct Song {
     pub url: String,
     pub artist: Option<String>,
     pub title: Option<String>,
     pub duration: Option<u64>,
+    pub queuer: User,
 }
 
 impl Song {
-    pub async fn from_query(query: &str) -> Result<Song, std::io::Error> {
+    pub async fn from_query(user: User, query: String) -> Result<Song, std::io::Error> {
         let query = if query.contains("watch?v=") {
             query.to_string()
         } else {
             format!("ytsearch:{}", query)
         };
 
-        let stdout = ytdl(&query).await?;
-        let mut lines = stdout.lines();
+        let video = ytdl(&query).await?;
 
         let song = Song {
-            artist: None,
-            title: lines.next().map(|t| t.to_string()),
-            url: lines.next().unwrap().to_string(),
-            duration: None,
+            artist: video["channel"].as_str().map(|t| t.to_string()),
+            title: video["title"].as_str().map(|t| t.to_string()),
+            url: video["url"].as_str().unwrap().to_string(),
+            duration: video["duration"].as_u64(),
+            queuer: user,
         };
         Ok(song)
     }
 
-    pub async fn get_string(&self) -> String {
+    pub fn get_string(&self) -> String {
         let artist = self.artist.clone().unwrap_or("unknown".to_string());
         let title = self.title.clone().unwrap_or("unknown".to_string());
         let duration = match self.duration {

+ 67 - 25
src/audio/song_queue.rs

@@ -1,60 +1,102 @@
 use super::song::Song;
 use rand::seq::SliceRandom;
-use std::{cmp::min, collections::VecDeque, sync::Arc};
-use tokio::sync::Mutex;
+use serenity::model::id::UserId;
+use std::collections::HashMap;
+use std::{cmp::min, collections::VecDeque};
+
+#[derive(Clone)]
 pub struct SongQueue {
-    queue: Arc<Mutex<VecDeque<Song>>>,
+    queues: HashMap<UserId, VecDeque<Song>>,
+    users: VecDeque<UserId>,
 }
 
 impl SongQueue {
     pub fn new() -> SongQueue {
         SongQueue {
-            queue: Arc::new(Mutex::new(VecDeque::new())),
+            queues: HashMap::new(),
+            users: VecDeque::new(),
+        }
+    }
+    fn new_user_queue(&mut self, user: UserId) {
+        if self.queues.get(&user).is_none() {
+            self.queues.insert(user, VecDeque::new());
+        }
+        if !self.users.contains(&user) {
+            self.users.push_back(user);
         }
     }
-    pub async fn push(&self, songs: Vec<Song>) {
-        let mut queue = self.queue.lock().await;
-        for item in songs.into_iter() {
-            queue.push_back(item);
+    pub fn push(&mut self, songs: Vec<Song>) {
+        for song in songs.into_iter() {
+            self.new_user_queue(song.queuer.id);
+            let deque = self.queues.get_mut(&song.queuer.id).unwrap();
+            deque.push_back(song);
         }
     }
-    pub async fn pop(&self) -> Option<Song> {
-        let mut queue = self.queue.lock().await;
-        queue.pop_front()
+    pub fn peek(&self) -> Option<Song> {
+        let user = match self.users.front() {
+            Some(user) => user,
+            None => return None,
+        };
+        let deque = self.queues.get(&user).unwrap();
+        deque.front().cloned()
     }
-    pub async fn shuffle(&self) -> Result<(), String> {
-        let mut queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn pop(&mut self) -> Option<Song> {
+        let user = match self.users.pop_front() {
+            Some(user) => user,
+            None => return None,
+        };
+        let deque = self.queues.get_mut(&user).unwrap();
+        let song = deque.pop_front();
+        if deque.len() > 0 {
+            self.users.push_back(user);
+        }
+        song
+    }
+    pub fn shuffle(&mut self) -> Result<(), String> {
+        if self.users.len() == 0 {
             return Err("queue is empty".to_string());
         }
-        queue.make_contiguous().shuffle(&mut rand::thread_rng());
+        for deque in self.queues.values_mut() {
+            deque.make_contiguous().shuffle(&mut rand::thread_rng());
+        }
 
         Ok(())
     }
-    pub async fn clear(&self) -> Result<(), String> {
-        let mut queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn clear(&mut self) -> Result<(), String> {
+        if self.users.len() == 0 {
             return Err("queue is empty".to_string());
         };
-        queue.clear();
+        self.queues.clear();
+        self.users.clear();
         Ok(())
     }
-    pub async fn get_string(&self) -> String {
-        let queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn get_string(&self) -> String {
+        if self.users.len() == 0 {
             return "*empty*".to_string();
         };
         let mut s = String::new();
+        let queue = self.as_vec();
         s.push_str(&format!(
             "*Showing {} of {} songs*\n",
             min(20, queue.len()),
             queue.len()
         ));
         for (i, song) in queue.iter().take(20).enumerate() {
-            s += &format!("{}: ", i);
-            s += &song.get_string().await;
-            s += "\n";
+            s += &format!(
+                "{}: {} (queued by {})\n",
+                i + 1,
+                &song.get_string(),
+                song.queuer.name
+            );
         }
         s
     }
+    fn as_vec(&self) -> Vec<Song> {
+        let mut clone = self.clone();
+        let mut queue = vec![];
+        while let Some(song) = clone.pop() {
+            queue.push(song);
+        }
+        queue
+    }
 }

+ 8 - 10
src/audio/subprocess.rs

@@ -1,3 +1,4 @@
+use serde_json::Value;
 use songbird::input::reader::MediaSource;
 use std::{
     io::BufReader,
@@ -6,15 +7,12 @@ use std::{
 use symphonia_core::io::ReadOnlySource;
 use tokio::process::Command as TokioCommand;
 
-pub async fn ytdl(query: &str) -> Result<String, std::io::Error> {
-    let mut cmd = TokioCommand::new("youtube-dl");
+pub async fn ytdl(query: &str) -> Result<Value, std::io::Error> {
+    let mut cmd = TokioCommand::new("yt-dlp");
     let cmd = cmd
-        .arg("-x")
-        .arg("--skip-download")
-        .arg("--get-title")
-        .arg("--get-url")
-        .arg("--audio-quality")
-        .arg("128k")
+        .arg("-j")
+        .arg("-f")
+        .arg("bestaudio")
         .arg(query)
         .stdout(Stdio::piped())
         .stderr(Stdio::piped());
@@ -23,7 +21,7 @@ pub async fn ytdl(query: &str) -> Result<String, std::io::Error> {
     let stdout = String::from_utf8(output.stdout).unwrap();
     let stderr = String::from_utf8(output.stderr).unwrap();
     if output.status.success() {
-        Ok(stdout)
+        Ok(serde_json::from_str(&stdout)?)
     } else {
         Err(std::io::Error::new(
             std::io::ErrorKind::Other,
@@ -47,7 +45,7 @@ pub async fn ffmpeg_pcm(url: &str) -> Result<Box<dyn MediaSource + Send>, String
         .arg("-i")
         .arg(url)
         .arg("-f")
-        .arg("s16le")
+        .arg("f32le")
         .arg("-ar")
         .arg("48000")
         .arg("-ac")

+ 4 - 2
src/main.rs

@@ -2,7 +2,7 @@ use serenity::{
     async_trait,
     client::{Client, Context, EventHandler},
     framework::StandardFramework,
-    model::gateway::Ready,
+    model::gateway::{GatewayIntents, Ready},
 };
 use std::env;
 
@@ -27,7 +27,9 @@ async fn main() {
         .configure(|c| c.prefix("!"))
         .group(&audio::AUDIO_GROUP);
 
-    let mut client = Client::builder(token)
+    let intents = GatewayIntents::non_privileged() | GatewayIntents::MESSAGE_CONTENT;
+
+    let mut client = Client::builder(token, intents)
         .event_handler(Handler)
         .framework(framework)
         .register_songbird()

Vissa filer visades inte eftersom för många filer har ändrats