Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Youtube Publishing reliability improvements. #1644

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 47 additions & 38 deletions youtube/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (

const (
WebSubCheckInterval = time.Second * 10
// PollInterval = time.Second * 5 // <- used for debug purposes
)

func (p *Plugin) StartFeed() {
p.Stop = make(chan *sync.WaitGroup)
p.runWebsubChecker()
go p.runWebsubChecker()
go p.autoSyncWebsubs()
go p.deleteOldVideos()
}

func (p *Plugin) StopFeed(wg *sync.WaitGroup) {
Expand All @@ -54,18 +55,45 @@ func (p *Plugin) SetupClient() error {
return nil
}

func (p *Plugin) deleteOldVideos() {
ticker := time.NewTicker(time.Minute * 1)
// Remove videos older than 24 hours
for {
select {
case <-ticker.C:
var expiring int64
videoCacheDays := confYoutubeVideoCacheDays.GetInt()
if videoCacheDays < 1 {
videoCacheDays = 1
}
common.RedisPool.Do(radix.FlatCmd(&expiring, "ZREMRANGEBYSCORE", RedisKeyPublishedVideoList, "-inf", time.Now().AddDate(0, 0, -1*videoCacheDays).Unix()))
logger.Infof("Removed %d old videos", expiring)
}
}
}

func (p *Plugin) autoSyncWebsubs() {
// force syncs all websubs from db every 24 hours in case of outages or missed updates
ticker := time.NewTicker(time.Hour * 24)
for {
select {
case <-ticker.C:
go p.syncWebSubs()
}
}
}

// keeps the subscriptions up to date by updating the ones soon to be expiring
func (p *Plugin) runWebsubChecker() {
go p.syncWebSubs()

websubTicker := time.NewTicker(WebSubCheckInterval)
ticker := time.NewTicker(WebSubCheckInterval)
for {
select {
case wg := <-p.Stop:
wg.Done()
return
case <-websubTicker.C:
p.checkExpiringWebsubs()
case <-ticker.C:
go p.checkExpiringWebsubs()
}
}
}
Expand Down Expand Up @@ -548,18 +576,21 @@ func (p *Plugin) CheckVideo(parsedVideo XMLFeed) error {
return err
}

lastVid, lastVidTime, err := p.getLastVidTimes(channelID)
if err != nil {
return err
videoCacheDays := confYoutubeVideoCacheDays.GetInt()
if videoCacheDays < 1 {
videoCacheDays = 1
}

if lastVidTime.After(parsedPublishedTime) {
// wasn't a new vid
if time.Since(parsedPublishedTime) > time.Hour*24*time.Duration(videoCacheDays) {
// don't post videos older than videoCacheDays
logger.Infof("Skipped Stale video for youtube channel %s: video_id: %s", channelID, videoID)
return nil
}

if lastVid == videoID {
// the video was already posted and was probably just edited
mn := radix.MaybeNil{}
common.RedisPool.Do(radix.Cmd(&mn, "ZSCORE", RedisKeyPublishedVideoList, videoID))
if !mn.Nil {
// video was already published, maybe just an update on it?
logger.Infof("Skipped Already Published video for youtube channel %s: video_id: %s", channelID, videoID)
return nil
}

Expand Down Expand Up @@ -620,19 +651,14 @@ func (p *Plugin) isShortsRedirect(videoId string) bool {
}

func (p *Plugin) postVideo(subs []*ChannelSubscription, publishedAt time.Time, video *youtube.Video, channelID string) error {
err := common.MultipleCmds(
radix.FlatCmd(nil, "SET", KeyLastVidTime(channelID), publishedAt.Unix()),
radix.FlatCmd(nil, "SET", KeyLastVidID(channelID), video.Id),
)
// add video to list of published videos
err := common.RedisPool.Do(radix.FlatCmd(nil, "ZADD", RedisKeyPublishedVideoList, publishedAt.Unix(), video.Id))
if err != nil {
return err
}

contentType := video.Snippet.LiveBroadcastContent
logger.Infof("Got a new video for channel %s (%s) with videoid %s (%s), of type %s and publishing to %d subscriptions", channelID, video.Snippet.ChannelTitle, video.Id, video.Snippet.Title, contentType, len(subs))
if contentType != "live" && contentType != "none" {
return nil
}

isLivestream := contentType == "live"
isUpcoming := contentType == "upcoming"
Expand Down Expand Up @@ -681,20 +707,3 @@ func (p *Plugin) getRemoveSubs(channelID string) ([]*ChannelSubscription, error)

return subs, nil
}

func (p *Plugin) getLastVidTimes(channelID string) (lastVid string, lastVidTime time.Time, err error) {
// Find the last video time for this channel
var unixSeconds int64
err = common.RedisPool.Do(radix.Cmd(&unixSeconds, "GET", KeyLastVidTime(channelID)))

var lastProcessedVidTime time.Time
if err != nil || unixSeconds == 0 {
lastProcessedVidTime = time.Time{}
} else {
lastProcessedVidTime = time.Unix(unixSeconds, 0)
}

var lastVidID string
err = common.RedisPool.Do(radix.Cmd(&lastVidID, "GET", KeyLastVidID(channelID)))
return lastVidID, lastProcessedVidTime, err
}
15 changes: 8 additions & 7 deletions youtube/youtube.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
)

const (
RedisChannelsLockKey = "youtube_subbed_channel_lock"

RedisKeyWebSubChannels = "youtube_registered_websub_channels"
GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe"
RedisChannelsLockKey = "youtube_subbed_channel_lock"
RedisKeyPublishedVideoList = "youtube_published_videos"
RedisKeyWebSubChannels = "youtube_registered_websub_channels"
GoogleWebsubHub = "https://pubsubhubbub.appspot.com/subscribe"
)

var (
confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako")
confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1)
logger = common.GetPluginLogger(&Plugin{})
confWebsubVerifytoken = config.RegisterOption("yagpdb.youtube.verify_token", "Youtube websub push verify token, set it to a random string and never change it", "asdkpoasdkpaoksdpako")
confResubBatchSize = config.RegisterOption("yagpdb.youtube.resub_batch_size", "Number of Websubs to resubscribe to concurrently", 1)
confYoutubeVideoCacheDays = config.RegisterOption("yagpdb.youtube.video_cache_duration", "Duration in days to cache youtube video data", 1)
logger = common.GetPluginLogger(&Plugin{})
)

func KeyLastVidTime(channel string) string { return "youtube_last_video_time:" + channel }
Expand Down