Skip to content

Commit

Permalink
Youtube Publishing reliability improvements. (#1644)
Browse files Browse the repository at this point in the history
* Changes to improve youtube video reliability

* make video list cache duration optionally configurable

* use logger instead of logrus

---------

Co-authored-by: Ashish Jhanwar <[email protected]>
  • Loading branch information
ashishjh-bst and ashishjh-bst authored May 8, 2024
1 parent 733d886 commit 447a527
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 45 deletions.
85 changes: 47 additions & 38 deletions youtube/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (

const (
WebSubCheckInterval = time.Second * 10
// PollInterval = time.Second * 5 // <- used for debug purposes
)

func (p *Plugin) StartFeed() {
p.Stop = make(chan *sync.WaitGroup)
p.runWebsubChecker()
go p.runWebsubChecker()
go p.autoSyncWebsubs()
go p.deleteOldVideos()
}

func (p *Plugin) StopFeed(wg *sync.WaitGroup) {
Expand All @@ -54,18 +55,45 @@ func (p *Plugin) SetupClient() error {
return nil
}

func (p *Plugin) deleteOldVideos() {
ticker := time.NewTicker(time.Minute * 1)
// Remove videos older than 24 hours
for {
select {
case <-ticker.C:
var expiring int64
videoCacheDays := confYoutubeVideoCacheDays.GetInt()
if videoCacheDays < 1 {
videoCacheDays = 1
}
common.RedisPool.Do(radix.FlatCmd(&expiring, "ZREMRANGEBYSCORE", RedisKeyPublishedVideoList, "-inf", time.Now().AddDate(0, 0, -1*videoCacheDays).Unix()))
logger.Infof("Removed %d old videos", expiring)
}
}
}

func (p *Plugin) autoSyncWebsubs() {
// force syncs all websubs from db every 24 hours in case of outages or missed updates
ticker := time.NewTicker(time.Hour * 24)
for {
select {
case <-ticker.C:
go p.syncWebSubs()
}
}
}

// keeps the subscriptions up to date by updating the ones soon to be expiring
func (p *Plugin) runWebsubChecker() {
go p.syncWebSubs()

websubTicker := time.NewTicker(WebSubCheckInterval)
ticker := time.NewTicker(WebSubCheckInterval)
for {
select {
case wg := <-p.Stop:
wg.Done()
return
case <-websubTicker.C:
p.checkExpiringWebsubs()
case <-ticker.C:
go p.checkExpiringWebsubs()
}
}
}
Expand Down Expand Up @@ -548,18 +576,21 @@ func (p *Plugin) CheckVideo(parsedVideo XMLFeed) error {
return err
}

lastVid, lastVidTime, err := p.getLastVidTimes(channelID)
if err != nil {
return err
videoCacheDays := confYoutubeVideoCacheDays.GetInt()
if videoCacheDays < 1 {
videoCacheDays = 1
}

if lastVidTime.After(parsedPublishedTime) {
// wasn't a new vid
if time.Since(parsedPublishedTime) > time.Hour*24*time.Duration(videoCacheDays) {
// don't post videos older than videoCacheDays
logger.Infof("Skipped Stale video for youtube channel %s: video_id: %s", channelID, videoID)
return nil
}

if lastVid == videoID {
// the video was already posted and was probably just edited
mn := radix.MaybeNil{}
common.RedisPool.Do(radix.Cmd(&mn, "ZSCORE", RedisKeyPublishedVideoList, videoID))
if !mn.Nil {
// video was already published, maybe just an update on it?
logger.Infof("Skipped Already Published video for youtube channel %s: video_id: %s", channelID, videoID)
return nil
}

Expand Down Expand Up @@ -620,19 +651,14 @@ func (p *Plugin) isShortsRedirect(videoId string) bool {
}

func (p *Plugin) postVideo(subs []*ChannelSubscription, publishedAt time.Time, video *youtube.Video, channelID string) error {
err := common.MultipleCmds(
radix.FlatCmd(nil, "SET", KeyLastVidTime(channelID), publishedAt.Unix()),
radix.FlatCmd(nil, "SET", KeyLastVidID(channelID), video.Id),
)
// add video to list of published videos
err := common.RedisPool.Do(radix.FlatCmd(nil, "ZADD", RedisKeyPublishedVideoList, publishedAt.Unix(), video.Id))
if err != nil {
return err
}

contentType := video.Snippet.LiveBroadcastContent
logger.Infof("Got a new video for channel %s (%s) with videoid %s (%s), of type %s and publishing to %d subscriptions", channelID, video.Snippet.ChannelTitle, video.Id, video.Snippet.Title, contentType, len(subs))
if contentType != "live" && contentType != "none" {
return nil
}

isLivestream := contentType == "live"
isUpcoming := contentType == "upcoming"
Expand Down Expand Up @@ -681,20 +707,3 @@ func (p *Plugin) getRemoveSubs(channelID string) ([]*ChannelSubscription, error)

return subs, nil
}

func (p *Plugin) getLastVidTimes(channelID string) (lastVid string, lastVidTime time.Time, err error) {
// Find the last video time for this channel
var unixSeconds int64
err = common.RedisPool.Do(radix.Cmd(&unixSeconds, "GET", KeyLastVidTime(channelID)))

var lastProcessedVidTime time.Time
if err != nil || unixSeconds == 0 {
lastProcessedVidTime = time.Time{}
} else {
lastProcessedVidTime = time.Unix(unixSeconds, 0)
}

var lastVidID string
err = common.RedisPool.Do(radix.Cmd(&lastVidID, "GET", KeyLastVidID(channelID)))
return lastVidID, lastProcessedVidTime, err
}
15 changes: 8 additions & 7 deletions youtube/youtube.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
)

const (
RedisChannelsLockKey = "youtube_subbed_channel_lock"

RedisKeyWebSubChannels = "youtube_registered_websub_channels"
GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe"
RedisChannelsLockKey = "youtube_subbed_channel_lock"
RedisKeyPublishedVideoList = "youtube_published_videos"
RedisKeyWebSubChannels = "youtube_registered_websub_channels"
GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe"
)

var (
confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako")
confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1)
logger = common.GetPluginLogger(&Plugin{})
confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako")
confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1)
confYoutubeVideoCacheDays = config.RegisterOption("yagpdb.youtube.video_cache_duration", "Duration in days to cache youtube video data", 1)
logger = common.GetPluginLogger(&Plugin{})
)

func KeyLastVidTime(channel string) string { return "youtube_last_video_time:" + channel }
Expand Down

0 comments on commit 447a527

Please sign in to comment.