diff --git a/cmd/main.go b/cmd/main.go index 4cc1277..b3186d0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -90,6 +90,9 @@ func createDiscordSession(token string) *discordgo.Session { if err != nil { log.Fatal("Error creating Discord session", err) } + + session.ShouldReconnectOnError = true // Not sure if this is needed + return session } diff --git a/mod-music/player/play.go b/mod-music/player/play.go index 7e0f76c..a3e8542 100644 --- a/mod-music/player/play.go +++ b/mod-music/player/play.go @@ -6,6 +6,7 @@ import ( "net/url" "time" + "github.com/bwmarrin/discordgo" "github.com/gookit/slog" "github.com/keshon/melodix-discord-player/internal/config" "github.com/keshon/melodix-discord-player/mod-music/history" @@ -13,80 +14,142 @@ import ( "github.com/keshon/melodix-discord-player/mod-music/utils" ) +// Down below is One Big Fat Function to play a song + func (p *Player) Play(startAt int, song *Song) error { // Get current song (from queue / as arg) - var currentSong *Song - if song == nil { - var err error - currentSong, err = p.Dequeue() + currentSong, err := func() (*Song, error) { + if song != nil { + return song, nil + } + + dequedSong, err := p.Dequeue() if err != nil { - slog.Error(err) - return fmt.Errorf("failed to dequeue song: %w", err) + return nil, fmt.Errorf("failed to dequeue song: %w", err) } - } else { - currentSong = song + + return dequedSong, nil + }() + if err != nil { + return fmt.Errorf("failed to get current song: %w", err) } + p.SetCurrentSong(currentSong) - // Setup encoding - options, err := p.createEncodeOptions(startAt) + // Setup and start encoding + options, err := func(startAt int) (*dca.EncodeOptions, error) { + config, err := config.NewConfig() + if err != nil { + return nil, fmt.Errorf("error loading config: %w", err) + } + options := &dca.EncodeOptions{ + Volume: 1.0, + FrameDuration: config.DcaFrameDuration, + Bitrate: config.DcaBitrate, + PacketLoss: config.DcaPacketLoss, + RawOutput: config.DcaRawOutput, + Application: config.DcaApplication, + CompressionLevel: config.DcaCompressionLevel, + BufferedFrames: config.DcaBufferedFrames, + VBR: config.DcaVBR, + StartTime: startAt, + ReconnectAtEOF: config.DcaReconnectAtEOF, + ReconnectStreamed: config.DcaReconnectStreamed, + ReconnectOnNetworkError: config.DcaReconnectOnNetworkError, + ReconnectOnHttpError: config.DcaReconnectOnHttpError, + ReconnectDelayMax: config.DcaReconnectDelayMax, + FfmpegBinaryPath: config.DcaFfmpegBinaryPath, + EncodingLineLog: config.DcaEncodingLineLog, + UserAgent: config.DcaUserAgent, + } + + return options, nil + }(startAt) if err != nil { - slog.Errorf("Failed to create encode options: %v", err) return fmt.Errorf("failed to create encode options: %w", err) } - // Start encoding encoding, err := dca.EncodeFile(p.GetCurrentSong().DownloadURL, options) if err != nil { - slog.Error(err) return fmt.Errorf("failed to encode file: %w", err) } + p.SetEncodingSession(encoding) - //defer p.GetEncodingSession().Cleanup() - - // Connect to Discord channel and be ready - // https://github.com/bwmarrin/discordgo/issues/1357 - voiceConnection, ok := p.GetDiscordSession().VoiceConnections[p.GetGuildID()] - if !ok { - slog.Warn("No voice connection found. Attempting to join voice channel") - voiceConnection, err = p.GetDiscordSession().ChannelVoiceJoin(p.GetGuildID(), p.GetChannelID(), true, false) - if err != nil { - return fmt.Errorf("failed to join voice channel: %w", err) - } - } + defer p.GetEncodingSession().Cleanup() + + // Set up voice connection for sending audio + // Helpful: https://github.com/bwmarrin/discordgo/issues/1357 + // voiceConnection, ok := p.GetDiscordSession().VoiceConnections[p.GetGuildID()] + // p.GetDiscordSession().ShouldReconnectOnError = true + // if !ok { + // slog.Warn("No voice connection found. Attempting to join voice channel", p.GetChannelID()) + // voiceConnection, err = p.GetDiscordSession().ChannelVoiceJoin(p.GetGuildID(), p.GetChannelID(), true, false) + // if err != nil { + // return fmt.Errorf("failed to join voice channel: %w", err) + // } + // } + + // slog.Info("Found voice connection", voiceConnection.ChannelID) + // slog.Info("Setting it as active", voiceConnection.ChannelID) + // p.SetVoiceConnection(voiceConnection) + // // defer p.GetVoiceConnection().Close() + + // err = p.GetVoiceConnection().Speaking(true) + // if err != nil { + // slog.Error("Failed to start speaking in existing voice connection", err) + // slog.Info("Attempting for a new join to voice channel and setting it as active") + // voiceConnection, err = p.GetDiscordSession().ChannelVoiceJoin(p.GetGuildID(), p.GetChannelID(), true, false) + // if err != nil { + // return fmt.Errorf("failed to join voice channel: %w", err) + // } + + // p.SetVoiceConnection(voiceConnection) + + // err = p.GetVoiceConnection().Speaking(true) + // if err != nil { + // return fmt.Errorf("failed to start speaking after two attempts: %w", err) + // } + // } + // // defer p.GetVoiceConnection().Speaking(false) + setupVoiceConnection := func() (*discordgo.VoiceConnection, error) { + session := p.GetDiscordSession() + guildID, channelID := p.GetGuildID(), p.GetChannelID() + + var voiceConnection *discordgo.VoiceConnection + var err error - slog.Info("Found voice connection", voiceConnection.ChannelID) - // voiceConnection.ChangeChannel(p.GetChannelID(), false, false) + for attempts := 0; attempts < 5; attempts++ { + voiceConnection, err = session.ChannelVoiceJoin(guildID, channelID, false, false) + if err == nil { + break + } - slog.Info("Setting it as active", voiceConnection.ChannelID) - p.SetVoiceConnection(voiceConnection) - // defer p.GetVoiceConnection().Close() + slog.Warnf("Failed to join voice channel (attempt %d): %v", attempts+1, err) + time.Sleep(2 * time.Second) // Wait for 2 seconds before retrying + } - err = p.GetVoiceConnection().Speaking(true) - if err != nil { - slog.Error("Failed to start speaking in existing voice connection", err) - slog.Info("Attempting for a new join to voice channel and setting it as active") - voiceConnection, err = p.GetDiscordSession().ChannelVoiceJoin(p.GetGuildID(), p.GetChannelID(), true, false) if err != nil { - return fmt.Errorf("failed to join voice channel: %w", err) + return nil, fmt.Errorf("failed to join voice channel after multiple attempts (aww fuck!): %w", err) } - p.SetVoiceConnection(voiceConnection) + return voiceConnection, nil + } - err = p.GetVoiceConnection().Speaking(true) - if err != nil { - return fmt.Errorf("failed to start speaking after two attempts: %w", err) - } + voiceConnection, err := setupVoiceConnection() + if err != nil { + return err } - // defer p.GetVoiceConnection().Speaking(false) - // Send encoding to Discord stream + slog.Info("Found voice connection and setting it as active", voiceConnection.ChannelID) + p.SetVoiceConnection(voiceConnection) + + // Send encoding stream to voice connection done := make(chan error, 1) stream := dca.NewStream(p.GetEncodingSession(), p.GetVoiceConnection(), done) p.SetStreamingSession(stream) p.SetCurrentStatus(StatusPlaying) - // Add song playback duration to history + // Add song to history historySong := &history.Song{ Name: p.GetCurrentSong().Title, UserURL: p.GetCurrentSong().UserURL, @@ -95,27 +158,32 @@ func (p *Player) Play(startAt int, song *Song) error { ID: p.GetCurrentSong().ID, Thumbnail: history.Thumbnail(p.GetCurrentSong().Thumbnail), } - h := history.NewHistory() - h.AddTrackToHistory(p.GetVoiceConnection().GuildID, historySong) + p.GetHistory().AddTrackToHistory(p.GetVoiceConnection().GuildID, historySong) + + if err := p.GetHistory().AddPlaybackCountStats(p.GetVoiceConnection().GuildID, p.GetCurrentSong().ID); err != nil { + slog.Errorf("error adding playback count stats to history: %v", err) + } + // Set up periodic playback duration stats update to history interval := 2 * time.Second ticker := time.NewTicker(interval) - defer ticker.Stop() - tickerDone := make(chan bool) + tickerStop := make(chan bool) + defer func() { + ticker.Stop() + tickerStop <- true + }() go func() { for { select { case <-ticker.C: - if p.GetVoiceConnection() != nil && p.GetStreamingSession() != nil && p.GetCurrentSong() != nil { - if !p.GetStreamingSession().Paused() { - err := h.AddPlaybackDurationStats(p.GetVoiceConnection().GuildID, p.GetCurrentSong().ID, float64(interval.Seconds())) - if err != nil { - slog.Warnf("Error adding playback duration stats to history: %v", err) - } + if p.GetVoiceConnection() != nil && p.GetStreamingSession() != nil && p.GetCurrentSong() != nil && !p.GetStreamingSession().Paused() { + err := p.GetHistory().AddPlaybackDurationStats(p.GetVoiceConnection().GuildID, p.GetCurrentSong().ID, float64(interval.Seconds())) + if err != nil { + slog.Warnf("Error adding playback duration stats to history: %v", err) } } - case <-tickerDone: + case <-tickerStop: return } } @@ -123,7 +191,7 @@ func (p *Player) Play(startAt int, song *Song) error { // Handle signals (done / skip / stop) select { - case <-done: + case err := <-done: slog.Info("Song is interrupted due to done signal") p.SetCurrentStatus(StatusResting) @@ -133,8 +201,8 @@ func (p *Player) Play(startAt int, song *Song) error { if p.GetVoiceConnection() != nil { p.GetVoiceConnection().Speaking(false) } - p.GetEncodingSession().Cleanup() + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? return fmt.Errorf("unexpected error occurred: %w", err) } @@ -145,14 +213,14 @@ func (p *Player) Play(startAt int, song *Song) error { if p.GetStreamingSession() == nil { slog.Error("StreamingSession is nil") - //return nil + //return nil // ! Quite unpredictable. } if p.GetCurrentSong() != nil { - if p.GetCurrentSong().Source != SourceStream { - // Treat as a song - slog.Info("Checking for song metrics if interruption was unintentional") + switch { + case p.GetCurrentSong().Source == SourceYouTube: + slog.Info("Source is a YouTube video, checking for song metrics if unexpected interruption") songDuration, songPosition, err := p.calculateSongMetrics(p.GetEncodingSession(), p.GetStreamingSession(), p.GetCurrentSong()) @@ -163,10 +231,10 @@ func (p *Player) Play(startAt int, song *Song) error { if p.GetEncodingSession().Stats().Duration.Seconds() > 0 && songPosition.Seconds() > 0 && songPosition < songDuration { startAt := songPosition.Seconds() - p.GetEncodingSession().Cleanup() p.GetVoiceConnection().Speaking(false) - slog.Warnf("Interruption detected, restarting song %v from %v", p.GetCurrentSong().Title, int(startAt)) + slog.Warnf("Unexpected interruption confirmed, restarting song: \"%v\" from %vs", p.GetCurrentSong().Title, int(startAt)) + go func() { err := p.Play(int(startAt), p.GetCurrentSong()) if err != nil { @@ -174,53 +242,47 @@ func (p *Player) Play(startAt int, song *Song) error { } }() + // p.GetEncodingSession().Cleanup() return nil } + // fallthrough + case p.GetCurrentSong().Source == SourceStream: + slog.Info("Source is a stream, should always restart (unless manually interrupted)") - } else { - // Treat as a stream - slog.Info("Song is a stream, should always restart") - - p.GetEncodingSession().Cleanup() p.GetVoiceConnection().Speaking(false) slog.Infof("Restarting stream %v", p.GetCurrentSong().Title) + go func() { err := p.Play(int(0), p.GetCurrentSong()) if err != nil { slog.Errorf("error restarting song: %w", err) } }() - } - time.Sleep(250 * time.Millisecond) - if err := h.AddPlaybackCountStats(p.GetVoiceConnection().GuildID, p.GetCurrentSong().ID); err != nil { - return fmt.Errorf("error adding playback count stats to history: %v", err) + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? + return nil } } if p.GetCurrentSong() == nil { - slog.Warn("CurrentSong is nil at this point") + slog.Error("CurrentSong is nil at this point (should not happen)") } else { - slog.Warn("CurrentSong is NOT nil at this point") + slog.Warn("CurrentSong is NOT nil at this point:", p.GetCurrentSong().Title) } p.GetVoiceConnection().Speaking(false) - // p.GetEncodingSession().Cleanup() if len(p.GetSongQueue()) == 0 { time.Sleep(250 * time.Millisecond) - slog.Info("Audio done") - p.GetVoiceConnection().Speaking(false) - p.GetVoiceConnection().Disconnect() - - p.GetEncodingSession().Cleanup() if p.GetVoiceConnection() != nil { p.GetVoiceConnection().Speaking(false) - p.SetStreamingSession(nil) + p.GetVoiceConnection().Disconnect() } + p.SetStreamingSession(nil) + p.SetCurrentStatus(StatusResting) p.SetSongQueue(make([]*Song, 0)) p.SetCurrentSong(nil) @@ -228,11 +290,14 @@ func (p *Player) Play(startAt int, song *Song) error { p.StopInterrupt = make(chan bool, 1) p.SwitchChannelInterrupt = make(chan bool, 1) + slog.Info("Stop playing after all signals passed, audio is done") + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? return nil } + slog.Info("Play next in queue after all signals passed") + time.Sleep(250 * time.Millisecond) - slog.Warn("Play next in queue after all signals passed") go func() { err := p.Play(0, nil) if err != nil { @@ -241,30 +306,32 @@ func (p *Player) Play(startAt int, song *Song) error { }() slog.Info("..finished processing done signal") + + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? Wasn't in the original code return nil case <-p.SkipInterrupt: slog.Info("Song is interrupted due to skip signal") - p.SetCurrentStatus(StatusResting) - p.GetEncodingSession().Cleanup() if p.GetVoiceConnection() != nil { p.GetVoiceConnection().Speaking(false) } + p.SetCurrentStatus(StatusResting) + slog.Info("..finished processing skip signal") + + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? return nil case <-p.StopInterrupt: slog.Info("Song is interrupted due to stop signal") - p.GetVoiceConnection().Speaking(false) - p.GetVoiceConnection().Disconnect() - - p.GetEncodingSession().Cleanup() if p.GetVoiceConnection() != nil { p.GetVoiceConnection().Speaking(false) - p.SetStreamingSession(nil) + p.GetVoiceConnection().Disconnect() } + p.SetStreamingSession(nil) + p.SetCurrentStatus(StatusResting) p.SetSongQueue(make([]*Song, 0)) p.SetCurrentSong(nil) @@ -273,50 +340,25 @@ func (p *Player) Play(startAt int, song *Song) error { p.SwitchChannelInterrupt = make(chan bool, 1) slog.Info("..finish processing stop signal") + + // p.GetEncodingSession().Cleanup() // TODO: Is this needed? return nil case <-p.SwitchChannelInterrupt: slog.Info("Song is interrupted due to switch channel signal") - p.GetVoiceConnection().Disconnect() - p.GetEncodingSession().Cleanup() + if p.GetVoiceConnection() != nil { + p.GetVoiceConnection().Disconnect() + } go p.Play(0, p.GetCurrentSong()) slog.Info("..finish processing switch channel signal") - return nil - } - -} -func (p *Player) createEncodeOptions(startAt int) (*dca.EncodeOptions, error) { - config, err := config.NewConfig() - if err != nil { - return nil, fmt.Errorf("error loading config: %w", err) - } - - options := &dca.EncodeOptions{ - Volume: 1.0, - FrameDuration: config.DcaFrameDuration, - Bitrate: config.DcaBitrate, - PacketLoss: config.DcaPacketLoss, - RawOutput: config.DcaRawOutput, - Application: config.DcaApplication, - CompressionLevel: config.DcaCompressionLevel, - BufferedFrames: config.DcaBufferedFrames, - VBR: config.DcaVBR, - StartTime: startAt, - ReconnectAtEOF: config.DcaReconnectAtEOF, - ReconnectStreamed: config.DcaReconnectStreamed, - ReconnectOnNetworkError: config.DcaReconnectOnNetworkError, - ReconnectOnHttpError: config.DcaReconnectOnHttpError, - ReconnectDelayMax: config.DcaReconnectDelayMax, - FfmpegBinaryPath: config.DcaFfmpegBinaryPath, - EncodingLineLog: config.DcaEncodingLineLog, - UserAgent: config.DcaUserAgent, + //p.GetEncodingSession().Cleanup() // TODO: Is this needed? + return nil } - return options, nil } func (p *Player) calculateSongMetrics(encodingSession *dca.EncodeSession, streamingSession *dca.StreamingSession, song *Song) (duration, position time.Duration, err error) { @@ -343,8 +385,8 @@ func (p *Player) calculateSongMetrics(encodingSession *dca.EncodeSession, stream } position = encodingStartTime + streamingPosition + delay.Abs() // delay is negative so we make it positive to jump ahead - slog.Debugf("Total song duration: %s, Stopped at: %s", duration, position) - slog.Debugf("Encoding ahead of streaming: %s, Encoding started time: %s", delay, encodingStartTime) + slog.Debugf("Song stopped at:\t%s,\tSong duration:\t%s", position, duration) + slog.Debugf("Encoding started at:\t%s,\tEncoding ahead:\t%s", encodingStartTime, delay) return duration, position, nil } diff --git a/mod-music/player/player.go b/mod-music/player/player.go index adc00d7..6ab6953 100644 --- a/mod-music/player/player.go +++ b/mod-music/player/player.go @@ -7,6 +7,7 @@ import ( "github.com/bwmarrin/discordgo" + "github.com/keshon/melodix-discord-player/mod-music/history" "github.com/keshon/melodix-discord-player/mod-music/pkg/dca" ) @@ -50,6 +51,7 @@ type Player struct { channelID string guildID string session *discordgo.Session + history history.IHistory SkipInterrupt chan bool StopInterrupt chan bool SwitchChannelInterrupt chan bool @@ -127,6 +129,7 @@ func NewPlayer(guildID string, session *discordgo.Session) IPlayer { status: StatusResting, guildID: guildID, session: session, + history: history.NewHistory(), SkipInterrupt: make(chan bool, 1), StopInterrupt: make(chan bool, 1), SwitchChannelInterrupt: make(chan bool, 1), @@ -222,3 +225,7 @@ func (p *Player) GetGuildID() string { func (p *Player) SetGuildID(guildID string) { p.guildID = guildID } + +func (p *Player) GetHistory() history.IHistory { + return p.history +} diff --git a/mod-music/player/unpause.go b/mod-music/player/unpause.go index c8860fe..e90ffd3 100644 --- a/mod-music/player/unpause.go +++ b/mod-music/player/unpause.go @@ -1,7 +1,6 @@ package player import ( - "errors" "fmt" "github.com/gookit/slog" @@ -29,24 +28,24 @@ func (p *Player) Unpause(channelID string) error { err := p.Play(0, p.GetCurrentSong()) if err != nil { - slog.Error("Error: ", err) + return fmt.Errorf("error: %v", err) } } else { slog.Warn("Current song is nil") err := p.Play(0, nil) if err != nil { - slog.Error("Error: ", err) + return fmt.Errorf("error: %v", err) } } return nil } else { finished, err := p.GetStreamingSession().Finished() if err != nil { - slog.Error("Error: ", err) + return fmt.Errorf("error: %v", err) } if finished { - return errors.New("failed to resume audio playback: stream finished") + return fmt.Errorf("failed to resume audio playback: stream finished") } p.GetStreamingSession().SetPaused(false)