diff --git a/README.md b/README.md index e64c7307..240479df 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ Access the generated M3U playlist at `http://:8080/playlist.m3u`. | REDIS_DB | Set Redis server database to be used | 0 | 0 to 15 | | USER_AGENT | Set the User-Agent of HTTP requests. | IPTV Smarters/1.0.3 (iPad; iOS 16.6.1; Scale/2.00) | Any valid user agent | | ~~LOAD_BALANCING_MODE~~ (removed on version 0.10.0) | Set load balancing algorithm to a specific mode | brute-force | brute-force/round-robin | +| PARSER_WORKERS | Set number of workers to spawn for M3U parsing. | 5 | Any positive integer | | BUFFER_MB | Set buffer size in mb. | 0 (no buffer) | Any positive integer | | INCLUDE_GROUPS_1, INCLUDE_GROUPS_2, INCLUDE_GROUPS_X | Set channel groups to include | all | Comma-separated values | | TITLE_SUBSTR_FILTER | Sets a regex pattern used to exclude substrings from channel titles | none | Go regexp | diff --git a/m3u/parser.go b/m3u/parser.go index 309b30f9..7b3f188a 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -98,7 +98,7 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { maxRetries := 10 var err error maxRetriesStr, maxRetriesExists := os.LookupEnv("MAX_RETRIES") - if !maxRetriesExists { + if maxRetriesExists { maxRetries, err = strconv.Atoi(maxRetriesStr) if err != nil { maxRetries = 10 @@ -125,49 +125,61 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { scanner := bufio.NewScanner(&buffer) var wg sync.WaitGroup + // Create channels for workers + parserWorkers := os.Getenv("PARSER_WORKERS") + if parserWorkers != "" { + parserWorkers = "5" + } streamInfoCh := make(chan database.StreamInfo) errCh := make(chan error) + numWorkers, err := strconv.Atoi(parserWorkers) + if err != nil { + numWorkers = 5 + } + + // Worker pool + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for streamInfo := range streamInfoCh { + if err := db.SaveToDb([]database.StreamInfo{streamInfo}); err != nil { + errCh <- err + } + } + }() + } + + // Error handler + go func() { + for err := range errCh { + log.Printf("M3U Parser error: %v", err) + } + }() + // Parse lines and send to worker pool for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "#EXTINF:") && checkIncludeGroup(grps, line) { if scanner.Scan() { - wg.Add(2) nextLine := scanner.Text() - // Insert parsed stream to database - go func(c chan database.StreamInfo) { - defer wg.Done() - errCh <- db.SaveToDb([]database.StreamInfo{<-c}) - }(streamInfoCh) - - // Parse stream lines - go func(l string, nl string) { - defer wg.Done() - streamInfoCh <- parseLine(l, nl, m3uIndex) - }(line, nextLine) - - // Error handler - go func() { - err := <-errCh - if err != nil { - log.Printf("M3U Parser error: %v", err) - } - }() + streamInfo := parseLine(line, nextLine, m3uIndex) + streamInfoCh <- streamInfo } - //} } } + // Close channels after processing + close(streamInfoCh) + wg.Wait() + close(errCh) + if err := scanner.Err(); err != nil { return fmt.Errorf("scanner error: %v", err) } - wg.Wait() - close(streamInfoCh) - close(errCh) - // Free up memory used by buffer buffer.Reset()