Skip to content

Commit

Permalink
Merge pull request #135 from sonroyaalmerol/improve-m3u-gen
Browse files Browse the repository at this point in the history
Add file cache layer for M3U generate
  • Loading branch information
sonroyaalmerol authored Aug 26, 2024
2 parents 330fd71 + 24194ef commit a7b617f
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 66 deletions.
124 changes: 89 additions & 35 deletions m3u/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ type Cache struct {

var M3uCache = &Cache{}

const cacheFilePath = "/cache.m3u"

func InitCache(db *database.Instance) {
debug := isDebugMode()

M3uCache.Lock()
if M3uCache.Revalidating {
M3uCache.Unlock()
if debug {
log.Println("[DEBUG] Cache revalidation is already in progress. Skipping.")
}
return
}
M3uCache.Revalidating = true
M3uCache.Unlock()

go func() {
content := GenerateAndCacheM3UContent(db, nil)
err := WriteCacheToFile(content)
if err != nil {
log.Printf("Error writing cache to file: %v\n", err)
}
}()
}

func isDebugMode() bool {
return os.Getenv("DEBUG") == "true"
}

func getFileExtensionFromUrl(rawUrl string) (string, error) {
u, err := url.Parse(rawUrl)
if err != nil {
Expand All @@ -42,32 +71,19 @@ func GenerateStreamURL(baseUrl string, slug string, sampleUrl string) string {
}

func GenerateAndCacheM3UContent(db *database.Instance, r *http.Request) string {
debug := os.Getenv("DEBUG") == "true"
debug := isDebugMode()
if debug {
log.Println("[DEBUG] Regenerating M3U cache in the background")
}

var content string

baseUrl := "" // Setup base URL logic
if r != nil {
if r.TLS == nil {
baseUrl = fmt.Sprintf("http://%s/stream", r.Host)
} else {
baseUrl = fmt.Sprintf("https://%s/stream", r.Host)
}
}

if customBase, ok := os.LookupEnv("BASE_URL"); ok {
customBase = strings.TrimSuffix(customBase, "/")
baseUrl = fmt.Sprintf("%s/stream", customBase)
}
baseUrl := determineBaseURL(r)

if debug {
utils.SafeLogPrintf(r, nil, "[DEBUG] Base URL set to %s\n", baseUrl)
}

content += "#EXTM3U\n"
var content strings.Builder
content.WriteString("#EXTM3U\n")

// Retrieve the streams from the database using channels
streamChan := db.GetStreams()
Expand All @@ -80,12 +96,10 @@ func GenerateAndCacheM3UContent(db *database.Instance, r *http.Request) string {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID)
}

// Append the stream info to content
content += fmt.Sprintf("#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title)
content.WriteString(fmt.Sprintf("#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title))

// Append the actual stream URL to content
content += GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0])
content.WriteString(GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0]))
}

if debug {
Expand All @@ -94,15 +108,31 @@ func GenerateAndCacheM3UContent(db *database.Instance, r *http.Request) string {

// Update cache
M3uCache.Lock()
M3uCache.data = content
M3uCache.data = content.String()
M3uCache.Revalidating = false
M3uCache.Unlock()

return content
return content.String()
}

func determineBaseURL(r *http.Request) string {
if r != nil {
if r.TLS == nil {
return fmt.Sprintf("http://%s/stream", r.Host)
} else {
return fmt.Sprintf("https://%s/stream", r.Host)
}
}

if customBase, ok := os.LookupEnv("BASE_URL"); ok {
return fmt.Sprintf("%s/stream", strings.TrimSuffix(customBase, "/"))
}

return ""
}

func Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
debug := os.Getenv("DEBUG") == "true"
debug := isDebugMode()

if debug {
log.Println("[DEBUG] Generating M3U content")
Expand All @@ -116,24 +146,48 @@ func Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
cacheData := M3uCache.data
M3uCache.Unlock()

if cacheData == "#EXTM3U\n" || cacheData == "" {
// Check the file-based cache
if fileData, err := ReadCacheFromFile(); err == nil {
cacheData = fileData
M3uCache.Lock()
M3uCache.data = fileData // update in-memory cache
M3uCache.Unlock()
}
}

// serve old cache and regenerate in the background
if cacheData != "" {
if cacheData != "#EXTM3U\n" && cacheData != "" {
if debug {
log.Println("[DEBUG] Serving old cache and regenerating in background")
}
_, _ = w.Write([]byte(cacheData))
if !M3uCache.Revalidating {
M3uCache.Revalidating = true
go GenerateAndCacheM3UContent(db, r)
} else {
if debug {
log.Println("[DEBUG] Cache revalidation is already in progress. Skipping.")
}
if _, err := w.Write([]byte(cacheData)); err != nil {
log.Printf("[ERROR] Failed to write response: %v\n", err)
}

InitCache(db)
return
}

// If no valid cache, generate content and update cache
content := GenerateAndCacheM3UContent(db, r)
_, _ = w.Write([]byte(content))
if err := WriteCacheToFile(content); err != nil {
log.Printf("[ERROR] Failed to write cache to file: %v\n", err)
}

if _, err := w.Write([]byte(content)); err != nil {
log.Printf("[ERROR] Failed to write response: %v\n", err)
}
}

func ReadCacheFromFile() (string, error) {
data, err := os.ReadFile(cacheFilePath)
if err != nil {
return "", err
}
return string(data), nil
}

func WriteCacheToFile(content string) error {
return os.WriteFile(cacheFilePath, []byte(content), 0644)
}
2 changes: 1 addition & 1 deletion m3u/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error {
var wg sync.WaitGroup

parserWorkers := os.Getenv("PARSER_WORKERS")
if parserWorkers != "" {
if strings.TrimSpace(parserWorkers) != "" {
parserWorkers = "5"
}

Expand Down
44 changes: 14 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ func updateSources(ctx context.Context, ewg *sync.WaitGroup) {
}

func main() {
debug := os.Getenv("DEBUG") == "true"

// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -101,6 +99,18 @@ func main() {
log.Fatalf("Error clearing concurrency database: %v", err)
}

clearOnBoot := os.Getenv("CLEAR_ON_BOOT")
if len(strings.TrimSpace(clearOnBoot)) == 0 {
clearOnBoot = "false"
}

if clearOnBoot == "true" {
log.Println("CLEAR_ON_BOOT enabled. Clearing current database.")
if err := db.ClearDb(); err != nil {
log.Fatalf("Error clearing database: %v", err)
}
}

cacheOnSync := os.Getenv("CACHE_ON_SYNC")
if len(strings.TrimSpace(cacheOnSync)) == 0 {
cacheOnSync = "false"
Expand All @@ -124,14 +134,7 @@ func main() {
}
wg.Wait()
log.Println("CACHE_ON_SYNC enabled. Building cache.")
if !m3u.M3uCache.Revalidating {
m3u.M3uCache.Revalidating = true
go m3u.GenerateAndCacheM3UContent(db, nil)
} else {
if debug {
log.Println("[DEBUG] Cache revalidation is already in progress. Skipping.")
}
}
m3u.InitCache(db)
}
})
if err != nil {
Expand All @@ -157,26 +160,7 @@ func main() {
}
wg.Wait()
log.Println("CACHE_ON_SYNC enabled. Building cache.")
if !m3u.M3uCache.Revalidating {
m3u.M3uCache.Revalidating = true
go m3u.GenerateAndCacheM3UContent(db, nil)
} else {
if debug {
log.Println("[DEBUG] Cache revalidation is already in progress. Skipping.")
}
}
}
}

clearOnBoot := os.Getenv("CLEAR_ON_BOOT")
if len(strings.TrimSpace(clearOnBoot)) == 0 {
clearOnBoot = "false"
}

if clearOnBoot == "true" {
log.Println("CLEAR_ON_BOOT enabled. Clearing current database.")
if err := db.ClearDb(); err != nil {
log.Fatalf("Error clearing database: %v", err)
m3u.InitCache(db)
}
}

Expand Down

0 comments on commit a7b617f

Please sign in to comment.