1
0

11 Коммиты e876a0d594 ... bac5b3e138

Автор SHA1 Сообщение Дата
  Frans Bergman bac5b3e138 Correct ffmpeg output format argument 10 месяцев назад
  Frans Bergman 316a092a05 Update crates 10 месяцев назад
  Frans Bergman aca10a6533 Update Discord API libraries 2 лет назад
  Frans Bergman a97380f26a Switch to yt-dlp instead of youtube-dl 2 лет назад
  Frans Bergman 23060fe13c Do not leave when a song is loading 2 лет назад
  Frans Bergman 744606c542 Use a single Mutex for entire AudioState 2 лет назад
  Frans Bergman a3431ab84a Only remove song once it has been played 2 лет назад
  Frans Bergman 519d616555 Add volume command 2 лет назад
  Frans Bergman ac11c8079b Implement fair queueing 3 лет назад
  Frans Bergman 8da8d57c37 Move SongQueue locking to audio_state.rs 3 лет назад
  Frans Bergman fb800f7bcb Get video information in JSON format 3 лет назад
8 измененных файлов с 574 добавлено и 413 удалено
  1. 334 275
      Cargo.lock
  2. 4 3
      Cargo.toml
  3. 45 16
      src/audio/audio.rs
  4. 101 74
      src/audio/audio_state.rs
  5. 11 8
      src/audio/song.rs
  6. 67 25
      src/audio/song_queue.rs
  7. 8 10
      src/audio/subprocess.rs
  8. 4 2
      src/main.rs

Разница между файлами не показана из-за своего большого размера
+ 334 - 275
Cargo.lock


+ 4 - 3
Cargo.toml

@@ -10,7 +10,8 @@ edition = "2018"
 futures = "0.3.5"
 lazy_static = "1.4.0"
 rand = "0.8.3"
-serenity = {version = "0.10", features = ["standard_framework", "voice", "rustls_backend", "cache", "framework"] }
-songbird = "0.2.0-beta.4"
+serenity = {version = "0.11", features = ["standard_framework", "voice", "rustls_backend", "cache", "framework"] }
+songbird = "0.3.2"
+symphonia-core = "0.5.2"
 tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time", "sync"] }
-symphonia-core = "0.2.0"
+serde_json = "1.0.81"

+ 45 - 16
src/audio/audio.rs

@@ -25,6 +25,7 @@ use crate::util::{message_react, send_embed};
     pause,
     resume,
     change_loop,
+    volume,
     shuffle,
     clear,
     queue
@@ -32,11 +33,11 @@ use crate::util::{message_react, send_embed};
 struct Audio;
 
 lazy_static! {
-    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<AudioState>>> = Mutex::new(HashMap::new());
+    static ref AUDIO_STATES: Mutex<HashMap<GuildId, Arc<Mutex<AudioState>>>> = Mutex::new(HashMap::new());
 }
 
-async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>> {
-    let guild = msg.guild(&ctx.cache).await.unwrap();
+async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<Mutex<AudioState>>> {
+    let guild = msg.guild(&ctx.cache).unwrap();
     let guild_id = guild.id;
 
     let mut audio_states = AUDIO_STATES.lock().await;
@@ -87,7 +88,7 @@ async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>
     let mut call = call_lock.lock().await;
 
     if call.current_channel() != Some(channel_id.into()) {
-        if let Err(err) = call.join(channel_id.into()).await {
+        if let Err(err) = call.join(channel_id).await {
             println!("Error joining call: {:?}", err);
         }
     }
@@ -107,7 +108,7 @@ async fn get_audio_state(ctx: &Context, msg: &Message) -> Option<Arc<AudioState>
 }
 
 async fn remove_audio_state(ctx: &Context, msg: &Message) -> Result<(), String> {
-    let guild = msg.guild(&ctx.cache).await.unwrap();
+    let guild = msg.guild(&ctx.cache).unwrap();
     let guild_id = guild.id;
 
     let mut audio_states = AUDIO_STATES.lock().await;
@@ -146,7 +147,7 @@ async fn disconnect(ctx: &Context, msg: &Message) -> CommandResult {
 
 #[command]
 async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
-    let query = args.rest();
+    let query = args.rest().to_string();
 
     message_react(ctx, msg, "🎶").await;
 
@@ -156,16 +157,26 @@ async fn play(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
         None => return Ok(()),
     };
 
-    match Song::from_query(query).await {
-        Ok(song) => {
-            AudioState::add_audio(audio_state, song).await;
-            message_react(ctx, msg, "✅").await;
-        }
-        Err(why) => {
-            message_react(ctx, msg, "❎").await;
-            send_embed(ctx, msg, &format!("Error: {}", why)).await;
-        }
-    }
+    let song = {
+        let ctx = ctx.clone();
+        let msg = msg.clone();
+        tokio::spawn(async move {
+            let song = Song::from_query(msg.author.clone(), query).await;
+            match song {
+                Ok(song) => {
+                    message_react(&ctx, &msg, "✅").await;
+                    Some(song)
+                }
+                Err(why) => {
+                    message_react(&ctx, &msg, "❎").await;
+                    send_embed(&ctx, &msg, &format!("Error: {}", why)).await;
+                    None
+                }
+            }
+        })
+    };
+
+    AudioState::add_audio(audio_state, song).await;
 
     Ok(())
 }
@@ -268,6 +279,24 @@ async fn change_loop(ctx: &Context, msg: &Message) -> CommandResult {
     Ok(())
 }
 
+#[command]
+#[aliases("vol")]
+async fn volume(ctx: &Context, msg: &Message, args: Args) -> CommandResult {
+    let audio_state = get_audio_state(ctx, msg).await;
+    let audio_state = match audio_state {
+        Some(audio_state) => audio_state,
+        None => return Ok(()),
+    };
+
+    let volume: u8 = args.rest().parse()?;
+
+    match AudioState::set_volume(audio_state, volume.into()).await {
+        Ok(()) => message_react(ctx, msg, "🎶").await,
+        Err(why) => send_embed(ctx, msg, &format!("Error: {}", why)).await,
+    };
+    Ok(())
+}
+
 #[command]
 async fn queue(ctx: &Context, msg: &Message) -> CommandResult {
     let audio_state = get_audio_state(ctx, msg).await;

+ 101 - 74
src/audio/audio_state.rs

@@ -13,41 +13,47 @@ use songbird::{
     Call, Event, EventContext, EventHandler as VoiceEventHandler, TrackEvent,
 };
 use std::time::Duration;
-use std::{mem::drop, sync::Arc};
+use std::sync::Arc;
 use tokio::sync::Mutex;
 use tokio::time::sleep;
+use tokio::task::JoinHandle;
 
 pub struct AudioState {
     queue: SongQueue,
+    in_flight: usize,
     handler: Arc<SerenityMutex<Call>>,
-    current_song: Mutex<Option<Song>>,
-    track_handle: Mutex<Option<TrackHandle>>,
-    is_looping: Mutex<bool>,
+    current_song: Option<Song>,
+    track_handle: Option<TrackHandle>,
+    is_looping: bool,
+    volume: f32,
 
-    channel_id: Mutex<ChannelId>,
-    http: Mutex<Arc<Http>>,
+    channel_id: ChannelId,
+    http: Arc<Http>,
 }
 
 impl AudioState {
-    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<AudioState> {
+    pub fn new(handler: Arc<SerenityMutex<Call>>, ctx: &Context, msg: &Message) -> Arc<Mutex<AudioState>> {
         let audio_state = AudioState {
             queue: SongQueue::new(),
+            in_flight: 0,
             handler,
-            current_song: Mutex::new(None),
-            track_handle: Mutex::new(None),
-            is_looping: Mutex::new(false),
+            current_song: None,
+            track_handle: None,
+            is_looping: false,
+            volume: 1.0,
 
-            channel_id: Mutex::new(msg.channel_id),
-            http: Mutex::new(ctx.http.clone()),
+            channel_id: msg.channel_id,
+            http: ctx.http.clone(),
         };
-        let audio_state = Arc::new(audio_state);
+        let audio_state = Arc::new(Mutex::new(audio_state));
         let my_audio_state = audio_state.clone();
         tokio::spawn(async move {
             // Leave if no music is playing within 1 minute
             sleep(Duration::from_secs(60)).await;
-            let current_song = my_audio_state.current_song.lock().await;
+            let audio_state = my_audio_state.lock().await;
+            let current_song = audio_state.current_song.clone();
             if current_song.is_none() {
-                let mut handler = my_audio_state.handler.lock().await;
+                let mut handler = audio_state.handler.lock().await;
                 if let Err(e) = handler.leave().await {
                     println!("Automatic leave failed: {:?}", e);
                 }
@@ -56,33 +62,29 @@ impl AudioState {
         audio_state
     }
 
-    pub async fn set_context(audio_state: Arc<AudioState>, ctx: &Context, msg: &Message) {
-        {
-            let mut channel_id = audio_state.channel_id.lock().await;
-            *channel_id = msg.channel_id;
-        }
-        {
-            let mut http = audio_state.http.lock().await;
-            *http = ctx.http.clone();
-        }
+    pub async fn set_context(audio_state: Arc<Mutex<AudioState>>, ctx: &Context, msg: &Message) {
+        let mut state = audio_state.lock().await;
+        state.channel_id = msg.channel_id;
+        state.http = ctx.http.clone();
     }
 
-    async fn play_audio(audio_state: Arc<AudioState>) {
-        let is_looping = audio_state.is_looping.lock().await;
-        let song = if *is_looping {
-            let mut current_song = audio_state.current_song.lock().await;
-            current_song.take()
+    async fn play_audio(audio_state: Arc<Mutex<AudioState>>) {
+        let mut state = audio_state.lock().await;
+        let song = if state.is_looping {
+            state.current_song.take()
         } else {
-            audio_state.queue.pop().await
+            state.queue.peek()
         };
-        drop(is_looping);
 
         let song = match song {
             Some(song) => song,
             None => {
-                let mut handler = audio_state.handler.lock().await;
-                if let Err(e) = handler.leave().await {
-                    println!("Error leaving channel: {:?}", e);
+                state.current_song = None;
+                if state.in_flight == 0 {
+                    let mut handler = state.handler.lock().await;
+                    if let Err(e) = handler.leave().await {
+                        println!("Error leaving channel: {:?}", e);
+                    }
                 }
                 return;
             }
@@ -99,9 +101,13 @@ impl AudioState {
         let reader = Reader::Extension(source);
         let source = input::Input::float_pcm(true, reader);
 
-        let mut handler = audio_state.handler.lock().await;
+        let handler = state.handler.clone();
+        let mut handler = handler.lock().await;
 
         let handle = handler.play_source(source);
+        if let Err(e) = handle.set_volume(state.volume) {
+            println!("{}", e);
+        }
 
         if let Err(why) = handle.add_event(
             Event::Track(TrackEvent::End),
@@ -112,39 +118,47 @@ impl AudioState {
             panic!("Err AudioState::play_audio: {:?}", why);
         }
         {
-            let text = song.get_string().await;
-            let channel_id = audio_state.channel_id.lock().await;
-            let http = audio_state.http.lock().await;
+            let text = song.get_string();
             send_embed_http(
-                *channel_id,
-                http.clone(),
+                state.channel_id,
+                state.http.clone(),
                 &format!("Now playing:\n\n {}", text),
             )
             .await;
         }
-        let mut current_song = audio_state.current_song.lock().await;
-        *current_song = Some(song);
-        let mut track_handle = audio_state.track_handle.lock().await;
-        *track_handle = Some(handle);
+        state.current_song = Some(song);
+        state.track_handle = Some(handle);
     }
 
-    pub async fn add_audio(audio_state: Arc<AudioState>, song: Song) {
-        audio_state.queue.push(vec![song]).await;
-        let current_song = audio_state.current_song.lock().await;
-        if current_song.is_none() {
-            let audio_state = audio_state.clone();
-            tokio::spawn(async {
-                AudioState::play_audio(audio_state).await;
-            });
+    pub async fn add_audio(audio_state: Arc<Mutex<AudioState>>, song: JoinHandle<Option<Song>>) {
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight += 1;
+        }
+        {
+            if let Ok(Some(song)) = song.await {
+                let mut state = audio_state.lock().await;
+                state.queue.push(vec![song]);
+                if state.current_song.is_none() {
+                    let audio_state = audio_state.clone();
+                    tokio::spawn(async {
+                        AudioState::play_audio(audio_state).await;
+                    });
+                }
+            }
+        }
+        {
+            let mut state = audio_state.lock().await;
+            state.in_flight -= 1;
         }
     }
 
     pub async fn send_track_command(
-        audio_state: Arc<AudioState>,
+        audio_state: Arc<Mutex<AudioState>>,
         cmd: TrackCommand,
     ) -> Result<(), String> {
-        let track_handle = audio_state.track_handle.lock().await;
-        match track_handle.as_ref() {
+        let state = audio_state.lock().await;
+        match state.track_handle.as_ref() {
             Some(track_handle) => match track_handle.send(cmd) {
                 Ok(()) => Ok(()),
                 Err(why) => Err(format!("{:?}", why)),
@@ -153,48 +167,61 @@ impl AudioState {
         }
     }
 
-    pub async fn shuffle(audio_state: Arc<AudioState>) -> Result<(), String> {
-        audio_state.queue.shuffle().await
+    pub async fn shuffle(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.shuffle()
     }
 
-    pub async fn clear(audio_state: Arc<AudioState>) -> Result<(), String> {
-        audio_state.queue.clear().await
+    pub async fn clear(audio_state: Arc<Mutex<AudioState>>) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        state.queue.clear()
     }
 
     // on success, returns a bool that specifies whether the queue is now being looped
-    pub async fn change_looping(audio_state: Arc<AudioState>) -> Result<bool, String> {
-        {
-            let current_song = audio_state.current_song.lock().await;
-            if current_song.is_none() {
-                return Err("no song is playing".to_string());
-            }
+    pub async fn change_looping(audio_state: Arc<Mutex<AudioState>>) -> Result<bool, String> {
+        let mut state = audio_state.lock().await;
+        if state.current_song.is_none() {
+            return Err("no song is playing".to_string());
         }
-        let mut is_looping = audio_state.is_looping.lock().await;
-        *is_looping = !*is_looping;
-        Ok(*is_looping)
+        state.is_looping = !state.is_looping;
+        Ok(state.is_looping)
     }
 
-    pub async fn get_string(audio_state: Arc<AudioState>) -> String {
-        let current_song = audio_state.current_song.lock().await;
-        let current_song = match &*current_song {
-            Some(song) => song.get_string().await,
+    pub async fn set_volume(audio_state: Arc<Mutex<AudioState>>, new_volume: f32) -> Result<(), String> {
+        let mut state = audio_state.lock().await;
+        let new_volume = new_volume / 100.0;
+
+        state.volume = new_volume;
+        state.track_handle.as_mut().map(move |handle| handle.set_volume(new_volume));
+
+        Ok(())
+    }
+
+    pub async fn get_string(audio_state: Arc<Mutex<AudioState>>) -> String {
+        let state = audio_state.lock().await;
+        let current_song = match state.current_song.clone() {
+            Some(song) => song.get_string(),
             None => "*Not playing*\n".to_string(),
         };
         format!(
             "**Current Song:**\n{}\n\n**Queue:**\n{}",
             current_song,
-            audio_state.queue.get_string().await
+            state.queue.get_string()
         )
     }
 }
 
 struct SongEndNotifier {
-    audio_state: Arc<AudioState>,
+    audio_state: Arc<Mutex<AudioState>>,
 }
 
 #[async_trait]
 impl VoiceEventHandler for SongEndNotifier {
     async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
+        {
+            let mut state = self.audio_state.lock().await;
+            state.queue.pop();
+        }
         AudioState::play_audio(self.audio_state.clone()).await;
 
         None

+ 11 - 8
src/audio/song.rs

@@ -1,33 +1,36 @@
 use super::subprocess::ytdl;
+use serenity::model::user::User;
 
+#[derive(Clone)]
 pub struct Song {
     pub url: String,
     pub artist: Option<String>,
     pub title: Option<String>,
     pub duration: Option<u64>,
+    pub queuer: User,
 }
 
 impl Song {
-    pub async fn from_query(query: &str) -> Result<Song, std::io::Error> {
+    pub async fn from_query(user: User, query: String) -> Result<Song, std::io::Error> {
         let query = if query.contains("watch?v=") {
             query.to_string()
         } else {
             format!("ytsearch:{}", query)
         };
 
-        let stdout = ytdl(&query).await?;
-        let mut lines = stdout.lines();
+        let video = ytdl(&query).await?;
 
         let song = Song {
-            artist: None,
-            title: lines.next().map(|t| t.to_string()),
-            url: lines.next().unwrap().to_string(),
-            duration: None,
+            artist: video["channel"].as_str().map(|t| t.to_string()),
+            title: video["title"].as_str().map(|t| t.to_string()),
+            url: video["url"].as_str().unwrap().to_string(),
+            duration: video["duration"].as_u64(),
+            queuer: user,
         };
         Ok(song)
     }
 
-    pub async fn get_string(&self) -> String {
+    pub fn get_string(&self) -> String {
         let artist = self.artist.clone().unwrap_or("unknown".to_string());
         let title = self.title.clone().unwrap_or("unknown".to_string());
         let duration = match self.duration {

+ 67 - 25
src/audio/song_queue.rs

@@ -1,60 +1,102 @@
 use super::song::Song;
 use rand::seq::SliceRandom;
-use std::{cmp::min, collections::VecDeque, sync::Arc};
-use tokio::sync::Mutex;
+use serenity::model::id::UserId;
+use std::collections::HashMap;
+use std::{cmp::min, collections::VecDeque};
+
+#[derive(Clone)]
 pub struct SongQueue {
-    queue: Arc<Mutex<VecDeque<Song>>>,
+    queues: HashMap<UserId, VecDeque<Song>>,
+    users: VecDeque<UserId>,
 }
 
 impl SongQueue {
     pub fn new() -> SongQueue {
         SongQueue {
-            queue: Arc::new(Mutex::new(VecDeque::new())),
+            queues: HashMap::new(),
+            users: VecDeque::new(),
+        }
+    }
+    fn new_user_queue(&mut self, user: UserId) {
+        if self.queues.get(&user).is_none() {
+            self.queues.insert(user, VecDeque::new());
+        }
+        if !self.users.contains(&user) {
+            self.users.push_back(user);
         }
     }
-    pub async fn push(&self, songs: Vec<Song>) {
-        let mut queue = self.queue.lock().await;
-        for item in songs.into_iter() {
-            queue.push_back(item);
+    pub fn push(&mut self, songs: Vec<Song>) {
+        for song in songs.into_iter() {
+            self.new_user_queue(song.queuer.id);
+            let deque = self.queues.get_mut(&song.queuer.id).unwrap();
+            deque.push_back(song);
         }
     }
-    pub async fn pop(&self) -> Option<Song> {
-        let mut queue = self.queue.lock().await;
-        queue.pop_front()
+    pub fn peek(&self) -> Option<Song> {
+        let user = match self.users.front() {
+            Some(user) => user,
+            None => return None,
+        };
+        let deque = self.queues.get(&user).unwrap();
+        deque.front().cloned()
     }
-    pub async fn shuffle(&self) -> Result<(), String> {
-        let mut queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn pop(&mut self) -> Option<Song> {
+        let user = match self.users.pop_front() {
+            Some(user) => user,
+            None => return None,
+        };
+        let deque = self.queues.get_mut(&user).unwrap();
+        let song = deque.pop_front();
+        if deque.len() > 0 {
+            self.users.push_back(user);
+        }
+        song
+    }
+    pub fn shuffle(&mut self) -> Result<(), String> {
+        if self.users.len() == 0 {
             return Err("queue is empty".to_string());
         }
-        queue.make_contiguous().shuffle(&mut rand::thread_rng());
+        for deque in self.queues.values_mut() {
+            deque.make_contiguous().shuffle(&mut rand::thread_rng());
+        }
 
         Ok(())
     }
-    pub async fn clear(&self) -> Result<(), String> {
-        let mut queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn clear(&mut self) -> Result<(), String> {
+        if self.users.len() == 0 {
             return Err("queue is empty".to_string());
         };
-        queue.clear();
+        self.queues.clear();
+        self.users.clear();
         Ok(())
     }
-    pub async fn get_string(&self) -> String {
-        let queue = self.queue.lock().await;
-        if queue.len() == 0 {
+    pub fn get_string(&self) -> String {
+        if self.users.len() == 0 {
             return "*empty*".to_string();
         };
         let mut s = String::new();
+        let queue = self.as_vec();
         s.push_str(&format!(
             "*Showing {} of {} songs*\n",
             min(20, queue.len()),
             queue.len()
         ));
         for (i, song) in queue.iter().take(20).enumerate() {
-            s += &format!("{}: ", i);
-            s += &song.get_string().await;
-            s += "\n";
+            s += &format!(
+                "{}: {} (queued by {})\n",
+                i + 1,
+                &song.get_string(),
+                song.queuer.name
+            );
         }
         s
     }
+    fn as_vec(&self) -> Vec<Song> {
+        let mut clone = self.clone();
+        let mut queue = vec![];
+        while let Some(song) = clone.pop() {
+            queue.push(song);
+        }
+        queue
+    }
 }

+ 8 - 10
src/audio/subprocess.rs

@@ -1,3 +1,4 @@
+use serde_json::Value;
 use songbird::input::reader::MediaSource;
 use std::{
     io::BufReader,
@@ -6,15 +7,12 @@ use std::{
 use symphonia_core::io::ReadOnlySource;
 use tokio::process::Command as TokioCommand;
 
-pub async fn ytdl(query: &str) -> Result<String, std::io::Error> {
-    let mut cmd = TokioCommand::new("youtube-dl");
+pub async fn ytdl(query: &str) -> Result<Value, std::io::Error> {
+    let mut cmd = TokioCommand::new("yt-dlp");
     let cmd = cmd
-        .arg("-x")
-        .arg("--skip-download")
-        .arg("--get-title")
-        .arg("--get-url")
-        .arg("--audio-quality")
-        .arg("128k")
+        .arg("-j")
+        .arg("-f")
+        .arg("bestaudio")
         .arg(query)
         .stdout(Stdio::piped())
         .stderr(Stdio::piped());
@@ -23,7 +21,7 @@ pub async fn ytdl(query: &str) -> Result<String, std::io::Error> {
     let stdout = String::from_utf8(output.stdout).unwrap();
     let stderr = String::from_utf8(output.stderr).unwrap();
     if output.status.success() {
-        Ok(stdout)
+        Ok(serde_json::from_str(&stdout)?)
     } else {
         Err(std::io::Error::new(
             std::io::ErrorKind::Other,
@@ -47,7 +45,7 @@ pub async fn ffmpeg_pcm(url: &str) -> Result<Box<dyn MediaSource + Send>, String
         .arg("-i")
         .arg(url)
         .arg("-f")
-        .arg("s16le")
+        .arg("f32le")
         .arg("-ar")
         .arg("48000")
         .arg("-ac")

+ 4 - 2
src/main.rs

@@ -2,7 +2,7 @@ use serenity::{
     async_trait,
     client::{Client, Context, EventHandler},
     framework::StandardFramework,
-    model::gateway::Ready,
+    model::gateway::{GatewayIntents, Ready},
 };
 use std::env;
 
@@ -27,7 +27,9 @@ async fn main() {
         .configure(|c| c.prefix("!"))
         .group(&audio::AUDIO_GROUP);
 
-    let mut client = Client::builder(token)
+    let intents = GatewayIntents::non_privileged() | GatewayIntents::MESSAGE_CONTENT;
+
+    let mut client = Client::builder(token, intents)
         .event_handler(Handler)
         .framework(framework)
         .register_songbird()

Некоторые файлы не были показаны из-за большого количества измененных файлов