1 Commits

Author SHA1 Message Date
7edd14c628 save tracks
All checks were successful
tests / fmt (push) Successful in 48s
tests / clippy (push) Successful in 1m14s
tests / pre-commit (push) Successful in 1m21s
tests / test (push) Successful in 1m32s
tests / build (push) Successful in 1m48s
deploy / release-image (push) Successful in 4m45s
2024-06-27 22:45:09 +02:00
10 changed files with 608 additions and 1024 deletions

1016
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ edition = "2021"
license = "MIT"
[dependencies]
symphonia = { version = "0.5.2", features=["all", "opt-simd"] }
symphonia = { version = "0.5.2", features=["all"] }
songbird = { version = "0.4.0", features = ["driver", "gateway", "twilight", "rustls", "builtin-queue"] }
tokio = { features = ["macros", "rt-multi-thread", "signal", "sync"], version = "1" }
sqlx = { version = "0.7", features = [ "runtime-tokio", "sqlite", "chrono", "migrate"] }

View File

@@ -20,7 +20,7 @@ RUN cargo build --release --locked
# Release image
FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y python3-pip
RUN apt-get update && apt-get install -y python3-pip ffmpeg
RUN pip install -U yt-dlp
COPY --from=build /app/target/release/ohrwurm .

View File

@@ -1,10 +0,0 @@
services:
ohrwurm:
container_name: ohrwurm
image: jheuel/ohrwurm:latest
# build: .
restart: unless-stopped
volumes:
- ./data:/data
env_file:
- .env

View File

@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS blobs
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
data BLOB
);

View File

@@ -1,5 +1,4 @@
use crate::state::State;
use anyhow::Context;
use std::error::Error;
use tracing::debug;
use twilight_model::{
@@ -23,7 +22,7 @@ pub(crate) async fn join_channel(
let channel_id = state
.cache
.voice_state(user_id, guild_id)
.context("Could not get voice state for user")?
.ok_or("Cannot get voice state for user")?
.channel_id();
// join the voice channel
@@ -31,12 +30,12 @@ pub(crate) async fn join_channel(
.songbird
.join(guild_id.cast(), channel_id)
.await
.context("Could not join voice channel")?;
.map_err(|e| format!("Could not join voice channel: {:?}", e))?;
// signal that we are not listening
if let Some(call_lock) = state.songbird.get(guild_id.cast()) {
let mut call = call_lock.lock().await;
call.deafen(true).await.context("Could not deafen")?;
call.deafen(true).await?;
}
// create guild config

View File

@@ -1,39 +1,31 @@
use crate::commands::join::join_channel;
use crate::db::track::{insert_blob, Blob};
use crate::metadata::{Metadata, MetadataMap};
use crate::state::State;
use crate::state::{State, StateRef};
use crate::{colors, db};
use anyhow::Context;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use songbird::input::cached::Memory;
use songbird::input::{Compose, YoutubeDl};
use songbird::tracks::Track;
use songbird::{Event, EventContext, EventHandler, TrackEvent};
use std::io::{BufRead, BufReader, Read};
use std::ops::Sub;
use std::sync::Arc;
use std::{error::Error, time::Duration};
use std::{
io::{BufRead, BufReader},
ops::Sub,
};
use tokio::process::Command;
use tracing::debug;
use twilight_model::channel::message::embed::{
EmbedAuthor, EmbedField, EmbedFooter, EmbedThumbnail,
};
use twilight_model::channel::message::{Embed, MessageFlags};
use twilight_model::channel::message::MessageFlags;
use twilight_model::gateway::payload::incoming::InteractionCreate;
use twilight_model::http::interaction::{InteractionResponse, InteractionResponseType};
use twilight_util::builder::embed::EmbedBuilder;
use twilight_util::builder::InteractionResponseDataBuilder;
use url::Url;
#[derive(Debug)]
struct TrackType {
url: String,
title: Option<String>,
duration_string: String,
channel: String,
thumbnail: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct YouTubeTrack {
url: Option<String>,
@@ -43,7 +35,6 @@ struct YouTubeTrack {
playlist: Option<String>,
playlist_id: Option<String>,
duration_string: String,
thumbnail: Option<String>,
}
fn build_playlist_url(playlist_id: &str) -> String {
@@ -58,18 +49,12 @@ async fn get_tracks(
.output()
.await?;
tracing::info!(
"yt-dlp output: {:?}",
String::from_utf8_lossy(&output.stdout)
);
let reader = BufReader::new(output.stdout.as_slice());
let tracks: Vec<YouTubeTrack> = reader
.lines()
.map_while(Result::ok)
.flat_map(|line| serde_json::from_str(&line))
.collect();
tracing::info!("yt-dlp tracks: {:?}", tracks);
if tracks.is_empty() {
if let Ok(stderr) = String::from_utf8(output.stderr) {
@@ -82,210 +67,22 @@ async fn get_tracks(
}
return Err("No tracks found".into());
}
tracing::info!("tracks: {:?}", tracks);
tracing::debug!("tracks: {:?}", tracks);
Ok(tracks)
}
async fn persistence(
interaction: &InteractionCreate,
track: &YouTubeTrack,
state: State,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let Some(guild_id) = interaction.guild_id else {
return Ok(());
};
let Some(user_id) = interaction.author_id() else {
return Ok(());
};
let url = track
.original_url
.clone()
.or(track.url.clone())
.ok_or("Could not find url")?;
let (author_name, author_global_name) = if let Some(author) = interaction.author() {
(author.name.clone(), author.global_name.clone())
} else {
("".to_string(), None)
};
db::track::insert_guild(&state.pool, db::track::Guild::new(guild_id.to_string()))
.await
.context("failed to insert guild")?;
db::track::insert_user(
&state.pool,
db::track::User::new(user_id.to_string(), author_name, author_global_name),
)
.await
.context("failed to insert user")?;
let track_id = db::track::insert_track(
&state.pool,
db::track::Track::new(
url.clone(),
track.title.clone(),
track.channel.clone(),
track.duration_string.clone(),
track.thumbnail.clone().unwrap_or_default(),
),
)
.await
.context("failed to insert track")?;
db::track::insert_query(
&state.pool,
db::track::Query::new(user_id.to_string(), guild_id.to_string(), track_id),
)
.await
.context("failed to insert query")?;
Ok(())
}
fn build_single_track_added_embeds(tracks_added: &[TrackType]) -> Vec<Embed> {
let track = tracks_added.first().unwrap();
let host = if let Ok(host) = Url::parse(&track.url) {
Some(
host.host_str()
.unwrap_or_default()
.trim_start_matches("www.")
.to_string(),
)
} else {
None
};
let footer = match host {
Some(host) => EmbedFooter {
text: format!("Streaming from {}", host),
icon_url: Some(format!(
"https://www.google.com/s2/favicons?domain={}",
host
)),
proxy_icon_url: None,
},
None => EmbedFooter {
text: String::new(),
icon_url: None,
proxy_icon_url: None,
},
};
let mut embed = EmbedBuilder::new()
.author(EmbedAuthor {
name: "🔊 Added to queue".to_string(),
icon_url: None,
proxy_icon_url: None,
url: None,
})
.title(track.title.clone().unwrap_or("Unknown".to_string()))
.url(track.url.clone())
.color(colors::BLURPLE)
.footer(footer)
.field(EmbedField {
inline: true,
name: "Duration".to_string(),
value: track.duration_string.clone(),
})
.field(EmbedField {
inline: true,
name: "Channel".to_string(),
value: track.channel.clone(),
})
.build();
if let Some(thumbnail) = &track.thumbnail {
embed.thumbnail = Some(EmbedThumbnail {
height: None,
proxy_url: None,
url: thumbnail.to_string(),
width: None,
});
}
vec![embed]
}
fn build_playlist_added_embeds(tracks: &[YouTubeTrack], num_tracks_added: usize) -> Vec<Embed> {
let mut content = String::new();
let first_track = tracks.first().unwrap();
content.push_str(&format!(
"Adding playlist: [{}]({})\n",
&first_track
.playlist
.clone()
.unwrap_or("Unknown".to_string()),
build_playlist_url(
&first_track
.playlist_id
.clone()
.unwrap_or("Unknown".to_string())
)
));
content.push_str(&format!(
"Added {} tracks to the queue.\n",
num_tracks_added
));
let embed = EmbedBuilder::new()
.description(content)
.color(colors::BLURPLE)
.build();
vec![embed]
}
fn build_embeds(tracks: &[YouTubeTrack], tracks_added: &[TrackType]) -> Vec<Embed> {
let num_tracks_added = tracks_added.len();
match num_tracks_added {
0 => vec![],
1 => build_single_track_added_embeds(tracks_added),
_ => build_playlist_added_embeds(tracks, num_tracks_added),
}
}
pub(crate) async fn play(
interaction: Box<InteractionCreate>,
state: State,
query: String,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing::info!(
debug!(
"play command in channel {:?} by {:?}",
interaction.channel,
interaction.author(),
);
match play_inner(&interaction, Arc::clone(&state), query).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::debug!("Search did not result in any tracks: {}", e);
let content = "Search did not result in any tracks.".to_string();
let embeds = vec![EmbedBuilder::new()
.description(content)
.color(colors::RED)
.build()];
state
.http
.interaction(interaction.application_id)
.update_response(&interaction.token)
.embeds(Some(&embeds))?
.await?;
Ok(())
}
}
}
pub(crate) async fn play_inner(
interaction: &InteractionCreate,
state: State,
query: String,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing::info!(
"play_inner in channel {:?} by {:?}",
interaction.channel,
interaction.author(),
);
let content = format!("Adding track(s) to the queue: {}", query);
tracing::info!("content: {:?}", content);
let embeds = vec![EmbedBuilder::new()
.description(content)
.color(colors::YELLOW)
@@ -320,10 +117,25 @@ pub(crate) async fn play_inner(
query
};
tracing::info!("query: {:?}", query);
debug!("query: {:?}", query);
let tracks = get_tracks(query).await?;
tracing::info!("got tracks: {:?}", tracks);
let tracks = match get_tracks(query).await {
Err(e) => {
let content = format!("{}", e);
let embeds = vec![EmbedBuilder::new()
.description(content)
.color(colors::RED)
.build()];
state
.http
.interaction(interaction.application_id)
.update_response(&interaction.token)
.embeds(Some(&embeds))?
.await?;
return Ok(());
}
Ok(tracks) => tracks,
};
if tracks.len() > 1 {
let first_track = tracks.first().unwrap();
@@ -349,13 +161,20 @@ pub(crate) async fn play_inner(
.interaction(interaction.application_id)
.update_response(&interaction.token)
.embeds(Some(&embeds))?
.await
.context("Could not send playlist loading message")?;
.await?;
}
if let Some(call_lock) = state.songbird.get(guild_id) {
let call = call_lock.lock().await;
call.queue().resume().context("Could not resume playing")?;
call.queue().resume()?;
}
struct TrackType {
url: String,
title: Option<String>,
duration_string: String,
channel: String,
thumbnail: Option<String>,
}
let mut tracks_added = vec![];
@@ -365,88 +184,210 @@ pub(crate) async fn play_inner(
.original_url
.clone()
.or(yttrack.url.clone())
.context("Could not find url")?;
.ok_or("Could not find url")?;
let mut src = YoutubeDl::new(state.client.clone(), url.clone());
let track: Track = src.clone().into();
let mut src = YoutubeDl::new(reqwest::Client::new(), url.clone());
let memory = Memory::new(src.clone().into()).await.unwrap();
let track: Track = memory.new_handle().into();
match src.aux_metadata().await {
Ok(metadata) => {
debug!("metadata: {:?}", metadata);
if let Ok(metadata) = src.aux_metadata().await {
debug!("metadata: {:?}", metadata);
persistence(interaction, yttrack, Arc::clone(&state))
.await
.unwrap_or_else(|e| {
tracing::error!("could not persist track: {:?}", e);
});
let (author_name, author_global_name) = if let Some(author) = interaction.author() {
(author.name.clone(), author.global_name.clone())
} else {
("".to_string(), None)
};
tracks_added.push(TrackType {
url: url.clone(),
title: metadata.title.clone(),
duration_string: yttrack.duration_string.clone(),
channel: yttrack.channel.clone(),
thumbnail: metadata.thumbnail.clone(),
db::track::insert_guild(&state.pool, db::track::Guild::new(guild_id.to_string()))
.await
.expect("failed to insert guild: {e}");
db::track::insert_user(
&state.pool,
db::track::User::new(user_id.to_string(), author_name, author_global_name),
)
.await
.expect("failed to insert user: {e}");
let track_id = db::track::insert_track(
&state.pool,
db::track::Track::new(
url.clone(),
yttrack.title.clone(),
yttrack.channel.clone(),
yttrack.duration_string.clone(),
metadata.thumbnail.clone().unwrap_or_default(),
),
)
.await
.expect("failed to insert track: {e}");
db::track::insert_query(
&state.pool,
db::track::Query::new(user_id.to_string(), guild_id.to_string(), track_id),
)
.await
.expect("failed to insert track: {e}");
tracks_added.push(TrackType {
url: url.clone(),
title: metadata.title.clone(),
duration_string: yttrack.duration_string.clone(),
channel: yttrack.channel.clone(),
thumbnail: metadata.thumbnail.clone(),
});
if let Some(call_lock) = state.songbird.get(guild_id) {
let mut call = call_lock.lock().await;
let handle = call.enqueue_with_preload(
track,
metadata.duration.map(|duration| -> Duration {
if duration.as_secs() > 5 {
duration.sub(Duration::from_secs(5))
} else {
duration
}
}),
);
let mut x = handle.typemap().write().await;
x.insert::<MetadataMap>(Metadata {
title: metadata.title,
duration: metadata.duration,
url,
src,
});
match state.songbird.get(guild_id) {
Some(call_lock) => {
let mut call = call_lock.lock().await;
let handle = call.enqueue_with_preload(
track,
metadata.duration.map(|duration| -> Duration {
if duration.as_secs() > 5 {
duration.sub(Duration::from_secs(5))
} else {
duration
}
}),
);
let mut x = handle.typemap().write().await;
x.insert::<MetadataMap>(Metadata {
title: metadata.title,
duration: metadata.duration,
url,
src,
});
}
None => tracing::error!("could not get call lock"),
}
}
Err(e) => {
tracing::error!("could not get metadata: {:?}", e);
if e.to_string()
.contains("Sign in to confirm youre not a bot.")
{
let content =
"I seem to have been flagged by YouTube as a bot. :-(".to_string();
let embeds = vec![EmbedBuilder::new()
.description(content)
.color(colors::RED)
.build()];
state
.http
.interaction(interaction.application_id)
.update_response(&interaction.token)
.embeds(Some(&embeds))?
.await?;
return Ok(());
}
handle
.add_event(
Event::Track(TrackEvent::Preparing),
TrackPreparingNotifier {
memory,
track_id,
state: Arc::clone(&state),
},
)
.expect("could not add event");
}
}
}
let mut content = String::new();
let num_tracks_added = tracks_added.len();
let embeds = match num_tracks_added {
0 => {
vec![]
}
1 => {
let track = tracks_added.first().unwrap();
let host = Url::parse(&track.url)?;
let host = host
.host_str()
.unwrap_or_default()
.trim_start_matches("www.");
let mut embed = EmbedBuilder::new()
.author(EmbedAuthor {
name: "🔊 Added to queue".to_string(),
icon_url: None,
proxy_icon_url: None,
url: None,
})
.title(track.title.clone().unwrap_or("Unknown".to_string()))
.url(track.url.clone())
.color(colors::BLURPLE)
.footer(EmbedFooter {
text: format!("Streaming from {}", host),
icon_url: Some(format!(
"https://www.google.com/s2/favicons?domain={}",
host
)),
proxy_icon_url: None,
})
.field(EmbedField {
inline: true,
name: "Duration".to_string(),
value: track.duration_string.clone(),
})
.field(EmbedField {
inline: true,
name: "Channel".to_string(),
value: track.channel.clone(),
})
.build();
if let Some(thumbnail) = &track.thumbnail {
embed.thumbnail = Some(EmbedThumbnail {
height: None,
proxy_url: None,
url: thumbnail.to_string(),
width: None,
});
}
vec![embed]
}
_ => {
let first_track = tracks.first().unwrap();
content.push_str(&format!(
"Adding playlist: [{}]({})\n",
&first_track
.playlist
.clone()
.unwrap_or("Unknown".to_string()),
build_playlist_url(
&first_track
.playlist_id
.clone()
.unwrap_or("Unknown".to_string())
)
));
content.push_str(&format!(
"Added {} tracks to the queue:\n",
num_tracks_added
));
let embed = EmbedBuilder::new()
.description(content)
.color(colors::BLURPLE)
.build();
vec![embed]
}
};
let embeds = build_embeds(&tracks, &tracks_added);
state
.http
.interaction(interaction.application_id)
.update_response(&interaction.token)
.embeds(Some(&embeds))?
.await
.context("Could not send final play message")?;
.await?;
Ok(())
}
struct TrackPreparingNotifier {
memory: Memory,
track_id: i64,
state: Arc<StateRef>,
}
#[async_trait]
impl EventHandler for TrackPreparingNotifier {
async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
tracing::info!("Build buffer");
let mut reader = BufReader::new(self.memory.new_handle());
let mut bytes = Vec::new();
reader
.read_to_end(&mut bytes)
.expect("could not read track in memory");
tracing::info!("Saving track");
insert_blob(&self.state.pool, Blob::new(self.track_id, bytes))
.await
.expect("could not insert blob");
tracing::info!("Saved");
None
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -36,29 +36,18 @@ pub(crate) async fn insert_track(
pool: &sqlx::SqlitePool,
track: Track,
) -> Result<i64, sqlx::Error> {
let query = r#"
INSERT INTO tracks (url, title, channel, duration, thumbnail, updated)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
channel = EXCLUDED.channel,
duration = EXCLUDED.duration,
thumbnail = EXCLUDED.thumbnail,
updated = EXCLUDED.updated
RETURNING id
"#;
let id = sqlx::query_scalar(query)
.bind(&track.url)
let query =
"INSERT OR REPLACE INTO tracks (url, title, channel, duration, thumbnail, updated) VALUES ($1, $2, $3, $4, $5, $6)";
let res = sqlx::query(query)
.bind(track.url)
.bind(track.title)
.bind(track.channel)
.bind(track.duration)
.bind(track.thumbnail)
.bind(track.updated)
.fetch_one(pool)
.execute(pool)
.await?;
Ok(id)
Ok(res.last_insert_rowid())
}
#[derive(Debug, FromRow)]
@@ -81,14 +70,8 @@ impl User {
}
pub(crate) async fn insert_user(pool: &sqlx::SqlitePool, user: User) -> Result<(), sqlx::Error> {
let query = r#"
INSERT INTO users (id, name, global_name, updated)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
global_name = EXCLUDED.global_name,
updated = EXCLUDED.updated
"#;
let query =
"INSERT OR REPLACE INTO users (id, name, global_name, updated) VALUES ($1, $2, $3, $4)";
sqlx::query(query)
.bind(user.id)
.bind(user.name)
@@ -122,10 +105,8 @@ impl Query {
}
pub(crate) async fn insert_query(pool: &sqlx::SqlitePool, q: Query) -> Result<i64, sqlx::Error> {
let query = r#"
INSERT INTO queries (user_id, guild_id, track_id, updated)
VALUES (?, ?, ?, ?)
"#;
let query =
"INSERT OR REPLACE INTO queries (user_id, guild_id, track_id, updated) VALUES ($1, $2, $3, $4)";
let res = sqlx::query(query)
.bind(q.user_id)
.bind(q.guild_id)
@@ -151,17 +132,37 @@ impl Guild {
}
}
pub(crate) async fn insert_guild(pool: &sqlx::SqlitePool, guild: Guild) -> Result<(), sqlx::Error> {
let query = r#"
INSERT INTO guilds (id, updated)
VALUES (?, ?)
ON CONFLICT (id) DO UPDATE SET
updated = EXCLUDED.updated
"#;
sqlx::query(query)
pub(crate) async fn insert_guild(
pool: &sqlx::SqlitePool,
guild: Guild,
) -> Result<i64, sqlx::Error> {
let query = "INSERT OR REPLACE INTO guilds (id, updated) VALUES ($1, $2)";
let res = sqlx::query(query)
.bind(guild.id)
.bind(guild.updated)
.execute(pool)
.await?;
Ok(res.last_insert_rowid())
}
#[derive(Debug, FromRow)]
pub(crate) struct Blob {
pub(crate) id: i64,
pub(crate) data: Vec<u8>,
}
impl Blob {
pub(crate) fn new(id: i64, data: Vec<u8>) -> Self {
Self { id, data }
}
}
pub(crate) async fn insert_blob(pool: &sqlx::SqlitePool, blob: Blob) -> Result<(), sqlx::Error> {
let query = "INSERT OR REPLACE INTO blobs (id, data) VALUES ($1, $2)";
sqlx::query(query)
.bind(blob.id)
.bind(blob.data)
.execute(pool)
.await?;
Ok(())
}

View File

@@ -16,7 +16,7 @@ use futures::StreamExt;
use signal::signal_handler;
use songbird::{shards::TwilightMap, Songbird};
use state::StateRef;
use std::{env, error::Error, str::FromStr, sync::Arc, time::Duration};
use std::{env, error::Error, str::FromStr, sync::Arc};
use tokio::select;
use tracing::{debug, info};
use twilight_cache_inmemory::InMemoryCache;
@@ -34,6 +34,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
println!("Starting up...");
// Initialize the tracing subscriber.
tracing_subscriber::fmt::init();
info!("Starting up...");
@@ -82,11 +83,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
);
let songbird = Songbird::twilight(Arc::new(senders), user_id);
let cache = InMemoryCache::new();
let client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(3600))
.build()
.expect("could not build http client");
(
shards,
@@ -97,7 +93,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
standby: Standby::new(),
guild_settings: Default::default(),
pool,
client,
}),
)
};

View File

@@ -33,5 +33,4 @@ pub(crate) struct StateRef {
pub(crate) standby: Standby,
pub(crate) guild_settings: DashMap<Id<GuildMarker>, Settings>,
pub(crate) pool: sqlx::SqlitePool,
pub(crate) client: reqwest::Client,
}