Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a worker pool pattern to limit the number of goroutines #76

Merged
merged 2 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ Access the generated M3U playlist at `http://<server ip>:8080/playlist.m3u`.
| REDIS_DB | Set Redis server database to be used | 0 | 0 to 15 |
| USER_AGENT | Set the User-Agent of HTTP requests. | IPTV Smarters/1.0.3 (iPad; iOS 16.6.1; Scale/2.00) | Any valid user agent |
| ~~LOAD_BALANCING_MODE~~ (removed on version 0.10.0) | Set load balancing algorithm to a specific mode | brute-force | brute-force/round-robin |
| PARSER_WORKERS | Set number of workers to spawn for M3U parsing. | 5 | Any positive integer |
| BUFFER_MB | Set buffer size in mb. | 0 (no buffer) | Any positive integer |
| INCLUDE_GROUPS_1, INCLUDE_GROUPS_2, INCLUDE_GROUPS_X | Set channel groups to include | all | Comma-separated values |
| TITLE_SUBSTR_FILTER | Sets a regex pattern used to exclude substrings from channel titles | none | Go regexp |
Expand Down
64 changes: 38 additions & 26 deletions m3u/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error {
maxRetries := 10
var err error
maxRetriesStr, maxRetriesExists := os.LookupEnv("MAX_RETRIES")
if !maxRetriesExists {
if maxRetriesExists {
maxRetries, err = strconv.Atoi(maxRetriesStr)
if err != nil {
maxRetries = 10
Expand All @@ -125,49 +125,61 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error {
scanner := bufio.NewScanner(&buffer)
var wg sync.WaitGroup

// Create channels for workers
parserWorkers := os.Getenv("PARSER_WORKERS")
if parserWorkers != "" {
parserWorkers = "5"
}
streamInfoCh := make(chan database.StreamInfo)
errCh := make(chan error)
numWorkers, err := strconv.Atoi(parserWorkers)
if err != nil {
numWorkers = 5
}

// Worker pool
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for streamInfo := range streamInfoCh {
if err := db.SaveToDb([]database.StreamInfo{streamInfo}); err != nil {
errCh <- err
}
}
}()
}

// Error handler
go func() {
for err := range errCh {
log.Printf("M3U Parser error: %v", err)
}
}()

// Parse lines and send to worker pool
for scanner.Scan() {
line := scanner.Text()

if strings.HasPrefix(line, "#EXTINF:") && checkIncludeGroup(grps, line) {
if scanner.Scan() {
wg.Add(2)
nextLine := scanner.Text()

// Insert parsed stream to database
go func(c chan database.StreamInfo) {
defer wg.Done()
errCh <- db.SaveToDb([]database.StreamInfo{<-c})
}(streamInfoCh)

// Parse stream lines
go func(l string, nl string) {
defer wg.Done()
streamInfoCh <- parseLine(l, nl, m3uIndex)
}(line, nextLine)

// Error handler
go func() {
err := <-errCh
if err != nil {
log.Printf("M3U Parser error: %v", err)
}
}()
streamInfo := parseLine(line, nextLine, m3uIndex)
streamInfoCh <- streamInfo
}
//}
}
}

// Close channels after processing
close(streamInfoCh)
wg.Wait()
close(errCh)

if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %v", err)
}

wg.Wait()
close(streamInfoCh)
close(errCh)

// Free up memory used by buffer
buffer.Reset()

Expand Down