From 8c9f59b2cb4dabc3f9ccc8e5219167a31915afe4 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 15:39:54 -0500 Subject: [PATCH 01/13] use m3u index as concurrency key --- mp4_handler.go | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index 7b7b23f5..91b78850 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -10,6 +10,7 @@ import ( "m3u-stream-merger/database" "m3u-stream-merger/utils" "net/http" + "os" "strconv" "strings" "syscall" @@ -20,7 +21,7 @@ import ( func loadBalancer(ctx context.Context, stream database.StreamInfo) (resp *http.Response, selectedUrl *database.StreamURL, err error) { // Concurrency check mode for _, url := range stream.URLs { - if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { + if checkConcurrency(ctx, url.M3UIndex) { log.Printf("Concurrency limit reached (%d): %s", url.MaxConcurrency, url.Content) continue // Skip this stream if concurrency limit reached } @@ -105,7 +106,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { return } log.Printf("Proxying %s to %s\n", r.RemoteAddr, selectedUrl.Content) - updateConcurrency(ctx, selectedUrl.Content, true) + updateConcurrency(ctx, selectedUrl.M3UIndex, true) // Log the successful response log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr) @@ -115,7 +116,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { case <-ctx.Done(): // Connection closed, handle accordingly log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.Content, false) + updateConcurrency(ctx, selectedUrl.M3UIndex, false) return default: // Connection still open, proceed with writing to the response @@ -124,7 +125,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the error if errors.Is(err, syscall.EPIPE) { log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.Content, false) + updateConcurrency(ctx, selectedUrl.M3UIndex, false) } else { log.Printf("Error copying MP4 stream to response: %s\n", err.Error()) } @@ -133,9 +134,19 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { } } -func checkConcurrency(ctx context.Context, url string, maxConcurrency int) bool { +func checkConcurrency(ctx context.Context, m3uIndex int) bool { + maxConcurrency := 1 + var err error + rawMaxConcurrency, maxConcurrencyExists := os.LookupEnv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex)) + if maxConcurrencyExists { + maxConcurrency, err = strconv.Atoi(rawMaxConcurrency) + if err != nil { + maxConcurrency = 1 + } + } + redisClient := database.InitializeRedis() - val, err := redisClient.Get(ctx, url).Result() + val, err := redisClient.Get(ctx, strconv.Itoa(m3uIndex)).Result() if err == redis.Nil { return false // Key does not exist } else if err != nil { @@ -147,13 +158,13 @@ func checkConcurrency(ctx context.Context, url string, maxConcurrency int) bool return count >= maxConcurrency } -func updateConcurrency(ctx context.Context, url string, incr bool) { +func updateConcurrency(ctx context.Context, m3uIndex int, incr bool) { redisClient := database.InitializeRedis() var err error if incr { - err = redisClient.Incr(ctx, url).Err() + err = redisClient.Incr(ctx, strconv.Itoa(m3uIndex)).Err() } else { - err = redisClient.Decr(ctx, url).Err() + err = redisClient.Decr(ctx, strconv.Itoa(m3uIndex)).Err() } if err != nil { log.Printf("Error updating concurrency: %s\n", err.Error()) From ef0bb843a4d9693dd5e4f12f437b819bea680cef Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 15:49:14 -0500 Subject: [PATCH 02/13] add log for current concurrent connections --- mp4_handler.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index 91b78850..5323c9ce 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -146,7 +146,7 @@ func checkConcurrency(ctx context.Context, m3uIndex int) bool { } redisClient := database.InitializeRedis() - val, err := redisClient.Get(ctx, strconv.Itoa(m3uIndex)).Result() + val, err := redisClient.Get(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Result() if err == redis.Nil { return false // Key does not exist } else if err != nil { @@ -154,7 +154,12 @@ func checkConcurrency(ctx context.Context, m3uIndex int) bool { return false // Error occurred, treat as concurrency not reached } - count, _ := strconv.Atoi(val) + count, err := strconv.Atoi(val) + if err != nil { + count = 0 + } + + log.Printf("Current concurrent connections for M3U_%d: %d", m3uIndex, count) return count >= maxConcurrency } @@ -162,9 +167,9 @@ func updateConcurrency(ctx context.Context, m3uIndex int, incr bool) { redisClient := database.InitializeRedis() var err error if incr { - err = redisClient.Incr(ctx, strconv.Itoa(m3uIndex)).Err() + err = redisClient.Incr(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Err() } else { - err = redisClient.Decr(ctx, strconv.Itoa(m3uIndex)).Err() + err = redisClient.Decr(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Err() } if err != nil { log.Printf("Error updating concurrency: %s\n", err.Error()) From f73cfbd05a429c03042182f6a9e01bc8fa13b6a3 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 17:10:14 -0500 Subject: [PATCH 03/13] add DEBUG env var for optional logs --- m3u/parser.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index e0a4bc63..b58de387 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -81,17 +81,23 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int var dbId int64 if existingStream.Title != currentStream.Title { - log.Printf("Creating new database entry: %s", currentStream.Title) + if os.Getenv("DEBUG") == "true" { + log.Printf("Creating new database entry: %s", currentStream.Title) + } dbId, err = database.InsertStream(db, currentStream) if err != nil { return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err) } } else { - log.Printf("Using existing database entry: %s", existingStream.Title) + if os.Getenv("DEBUG") == "true" { + log.Printf("Using existing database entry: %s", existingStream.Title) + } dbId = existingStream.DbId } - log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) + if os.Getenv("DEBUG") == "true" { + log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) + } _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ Content: line, M3UIndex: m3uIndex, From e460c8a5d36d44c83b7bfa8f6ac1b31a83042d68 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 17:45:16 -0500 Subject: [PATCH 04/13] use tvg-name as fallback option for title --- m3u/parser.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/m3u/parser.go b/m3u/parser.go index b58de387..d46f57e3 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -43,10 +43,17 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int for scanner.Scan() { line := scanner.Text() + extInfLine := "" if strings.HasPrefix(line, "#EXTINF:") { currentStream = database.StreamInfo{} - currentStream.Title = strings.TrimSpace(strings.SplitN(line, ",", 2)[1]) + + lineCommaSplit := strings.SplitN(line, ",", 2) + extInfLine = line + + if len(lineCommaSplit) > 1 { + currentStream.Title = strings.TrimSpace(lineCommaSplit[1]) + } // Define a regular expression to capture key-value pairs regex := regexp.MustCompile(`(\S+?)="([^"]*?)"`) @@ -61,6 +68,10 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int switch key { case "tvg-id": currentStream.TvgID = value + case "tvg-name": + if len(strings.TrimSpace(currentStream.Title)) == 0 { + currentStream.Title = value + } case "group-title": currentStream.Group = value case "tvg-logo": @@ -74,6 +85,10 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int currentStream.LogoURL = parts[1] } } else if strings.HasPrefix(line, "http") { + if len(strings.TrimSpace(currentStream.Title)) == 0 { + log.Printf("Error capturing title, line will be skipped: %s\n", extInfLine) + } + existingStream, err := database.GetStreamByTitle(db, currentStream.Title) if err != nil { return fmt.Errorf("GetStreamByTitle error (title: %s): %v", currentStream.Title, err) From a2f49ba39a79b6531a5b289d93ab1a2398d19644 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 18:01:35 -0500 Subject: [PATCH 05/13] skip if line has not been captured --- m3u/parser.go | 1 + 1 file changed, 1 insertion(+) diff --git a/m3u/parser.go b/m3u/parser.go index d46f57e3..e704f1fd 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -87,6 +87,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int } else if strings.HasPrefix(line, "http") { if len(strings.TrimSpace(currentStream.Title)) == 0 { log.Printf("Error capturing title, line will be skipped: %s\n", extInfLine) + continue } existingStream, err := database.GetStreamByTitle(db, currentStream.Title) From f25b202a672313c226687a738d0514ac1cf12151 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 19:04:19 -0500 Subject: [PATCH 06/13] more generalized regex parsing --- m3u/parser.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index e704f1fd..15676748 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -47,16 +47,12 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int if strings.HasPrefix(line, "#EXTINF:") { currentStream = database.StreamInfo{} - - lineCommaSplit := strings.SplitN(line, ",", 2) extInfLine = line - if len(lineCommaSplit) > 1 { - currentStream.Title = strings.TrimSpace(lineCommaSplit[1]) - } + lineWithoutPairs := line // Define a regular expression to capture key-value pairs - regex := regexp.MustCompile(`(\S+?)="([^"]*?)"`) + regex := regexp.MustCompile(`([a-zA-Z0-9_-]+)=("[^"]+"|[^",]+)`) // Find all key-value pairs in the line matches := regex.FindAllStringSubmatch(line, -1) @@ -65,18 +61,36 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int key := strings.TrimSpace(match[1]) value := strings.TrimSpace(match[2]) + if strings.HasPrefix(value, `"`) && strings.HasSuffix(value, `"`) { + value = strings.Trim(value, `"`) + } + switch key { case "tvg-id": currentStream.TvgID = value case "tvg-name": - if len(strings.TrimSpace(currentStream.Title)) == 0 { - currentStream.Title = value - } + currentStream.Title = value case "group-title": currentStream.Group = value case "tvg-logo": currentStream.LogoURL = value } + + var pair string + if strings.Contains(value, `"`) || strings.Contains(value, ",") { + // If the value contains double quotes or commas, format it as key="value" + pair = fmt.Sprintf(`%s="%s"`, key, value) + } else { + // Otherwise, format it as key=value + pair = fmt.Sprintf(`%s=%s`, key, value) + } + lineWithoutPairs = strings.Replace(lineWithoutPairs, pair, "", 1) + } + + lineCommaSplit := strings.SplitN(lineWithoutPairs, ",", 2) + + if len(lineCommaSplit) > 1 { + currentStream.Title = strings.TrimSpace(lineCommaSplit[1]) } } else if strings.HasPrefix(line, "#EXTVLCOPT:") { // Extract logo URL from #EXTVLCOPT line From 986db473d919a1c398d8dec6ef0b5e583ec64c9b Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 19:30:14 -0500 Subject: [PATCH 07/13] add uncaught attribute log --- m3u/parser.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/m3u/parser.go b/m3u/parser.go index 15676748..022c2c15 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -74,6 +74,10 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int currentStream.Group = value case "tvg-logo": currentStream.LogoURL = value + default: + if os.Getenv("DEBUG") == "true" { + log.Printf("Uncaught attribute: %s=%s\n", key, value) + } } var pair string From 341cd3e9705a69fa52bc73e6cdc91276391f6f95 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 19:47:01 -0500 Subject: [PATCH 08/13] replace switch with if-else --- m3u/parser.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index 022c2c15..eb7fbb84 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -65,16 +65,15 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int value = strings.Trim(value, `"`) } - switch key { - case "tvg-id": + if key == "tvg-id" { currentStream.TvgID = value - case "tvg-name": + } else if key == "tvg-name" { currentStream.Title = value - case "group-title": + } else if key == "group-title" { currentStream.Group = value - case "tvg-logo": + } else if key == "tvg-logo" { currentStream.LogoURL = value - default: + } else { if os.Getenv("DEBUG") == "true" { log.Printf("Uncaught attribute: %s=%s\n", key, value) } From f2141c8cc62f9fe81b98383607e969ac3b3742f8 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 19:49:59 -0500 Subject: [PATCH 09/13] revert changes --- m3u/parser.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index eb7fbb84..022c2c15 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -65,15 +65,16 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int value = strings.Trim(value, `"`) } - if key == "tvg-id" { + switch key { + case "tvg-id": currentStream.TvgID = value - } else if key == "tvg-name" { + case "tvg-name": currentStream.Title = value - } else if key == "group-title" { + case "group-title": currentStream.Group = value - } else if key == "tvg-logo" { + case "tvg-logo": currentStream.LogoURL = value - } else { + default: if os.Getenv("DEBUG") == "true" { log.Printf("Uncaught attribute: %s=%s\n", key, value) } From 49eec24cd3309b466779933c3797f658f845f931 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 20:02:19 -0500 Subject: [PATCH 10/13] remove logo url parsing from extvlcopt --- m3u/parser.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/m3u/parser.go b/m3u/parser.go index 022c2c15..e3040c6d 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -100,7 +100,9 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int // Extract logo URL from #EXTVLCOPT line parts := strings.SplitN(line, "=", 2) if len(parts) == 2 { - currentStream.LogoURL = parts[1] + if os.Getenv("DEBUG") == "true" { + log.Printf("Uncaught attribute (#EXTVLCOPT): %s=%s\n", parts[0], parts[1]) + } } } else if strings.HasPrefix(line, "http") { if len(strings.TrimSpace(currentStream.Title)) == 0 { From 33a6ff9eee20abdd4d1f98319799e64e5e426e3a Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 20:24:41 -0500 Subject: [PATCH 11/13] catch unexpected eof and add MAX_RETRIES --- m3u/parser.go | 205 ++++++++++++++++++++++++++++---------------------- 1 file changed, 114 insertions(+), 91 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index e3040c6d..e44255b0 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -4,11 +4,14 @@ import ( "bufio" "database/sql" "fmt" + "io" "log" "net/http" "os" "regexp" + "strconv" "strings" + "time" "m3u-stream-merger/database" ) @@ -20,6 +23,16 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int userAgent = "IPTV Smarters/1.0.3 (iPad; iOS 16.6.1; Scale/2.00)" } + maxRetries := 10 + var err error + maxRetriesStr, maxRetriesExists := os.LookupEnv("MAX_RETRIES") + if !maxRetriesExists { + maxRetries, err = strconv.Atoi(maxRetriesStr) + if err != nil { + maxRetries = 10 + } + } + // Create a new HTTP client with a custom User-Agent header client := &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { @@ -31,123 +44,133 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int log.Printf("Parsing M3U from URL: %s\n", m3uURL) - resp, err := client.Get(m3uURL) - if err != nil { - return fmt.Errorf("HTTP GET error: %v", err) - } - defer resp.Body.Close() + for i := 0; i <= maxRetries; i++ { + resp, err := client.Get(m3uURL) + if err != nil { + return fmt.Errorf("HTTP GET error: %v", err) + } + defer resp.Body.Close() - scanner := bufio.NewScanner(resp.Body) + scanner := bufio.NewScanner(resp.Body) - var currentStream database.StreamInfo + var currentStream database.StreamInfo - for scanner.Scan() { - line := scanner.Text() - extInfLine := "" + for scanner.Scan() { + line := scanner.Text() + extInfLine := "" - if strings.HasPrefix(line, "#EXTINF:") { - currentStream = database.StreamInfo{} - extInfLine = line + if strings.HasPrefix(line, "#EXTINF:") { + currentStream = database.StreamInfo{} + extInfLine = line - lineWithoutPairs := line + lineWithoutPairs := line - // Define a regular expression to capture key-value pairs - regex := regexp.MustCompile(`([a-zA-Z0-9_-]+)=("[^"]+"|[^",]+)`) + // Define a regular expression to capture key-value pairs + regex := regexp.MustCompile(`([a-zA-Z0-9_-]+)=("[^"]+"|[^",]+)`) - // Find all key-value pairs in the line - matches := regex.FindAllStringSubmatch(line, -1) + // Find all key-value pairs in the line + matches := regex.FindAllStringSubmatch(line, -1) - for _, match := range matches { - key := strings.TrimSpace(match[1]) - value := strings.TrimSpace(match[2]) + for _, match := range matches { + key := strings.TrimSpace(match[1]) + value := strings.TrimSpace(match[2]) - if strings.HasPrefix(value, `"`) && strings.HasSuffix(value, `"`) { - value = strings.Trim(value, `"`) + if strings.HasPrefix(value, `"`) && strings.HasSuffix(value, `"`) { + value = strings.Trim(value, `"`) + } + + switch key { + case "tvg-id": + currentStream.TvgID = value + case "tvg-name": + currentStream.Title = value + case "group-title": + currentStream.Group = value + case "tvg-logo": + currentStream.LogoURL = value + default: + if os.Getenv("DEBUG") == "true" { + log.Printf("Uncaught attribute: %s=%s\n", key, value) + } + } + + var pair string + if strings.Contains(value, `"`) || strings.Contains(value, ",") { + // If the value contains double quotes or commas, format it as key="value" + pair = fmt.Sprintf(`%s="%s"`, key, value) + } else { + // Otherwise, format it as key=value + pair = fmt.Sprintf(`%s=%s`, key, value) + } + lineWithoutPairs = strings.Replace(lineWithoutPairs, pair, "", 1) } - switch key { - case "tvg-id": - currentStream.TvgID = value - case "tvg-name": - currentStream.Title = value - case "group-title": - currentStream.Group = value - case "tvg-logo": - currentStream.LogoURL = value - default: + lineCommaSplit := strings.SplitN(lineWithoutPairs, ",", 2) + + if len(lineCommaSplit) > 1 { + currentStream.Title = strings.TrimSpace(lineCommaSplit[1]) + } + } else if strings.HasPrefix(line, "#EXTVLCOPT:") { + // Extract logo URL from #EXTVLCOPT line + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { if os.Getenv("DEBUG") == "true" { - log.Printf("Uncaught attribute: %s=%s\n", key, value) + log.Printf("Uncaught attribute (#EXTVLCOPT): %s=%s\n", parts[0], parts[1]) } } - - var pair string - if strings.Contains(value, `"`) || strings.Contains(value, ",") { - // If the value contains double quotes or commas, format it as key="value" - pair = fmt.Sprintf(`%s="%s"`, key, value) - } else { - // Otherwise, format it as key=value - pair = fmt.Sprintf(`%s=%s`, key, value) + } else if strings.HasPrefix(line, "http") { + if len(strings.TrimSpace(currentStream.Title)) == 0 { + log.Printf("Error capturing title, line will be skipped: %s\n", extInfLine) + continue } - lineWithoutPairs = strings.Replace(lineWithoutPairs, pair, "", 1) - } - lineCommaSplit := strings.SplitN(lineWithoutPairs, ",", 2) - - if len(lineCommaSplit) > 1 { - currentStream.Title = strings.TrimSpace(lineCommaSplit[1]) - } - } else if strings.HasPrefix(line, "#EXTVLCOPT:") { - // Extract logo URL from #EXTVLCOPT line - parts := strings.SplitN(line, "=", 2) - if len(parts) == 2 { - if os.Getenv("DEBUG") == "true" { - log.Printf("Uncaught attribute (#EXTVLCOPT): %s=%s\n", parts[0], parts[1]) + existingStream, err := database.GetStreamByTitle(db, currentStream.Title) + if err != nil { + return fmt.Errorf("GetStreamByTitle error (title: %s): %v", currentStream.Title, err) } - } - } else if strings.HasPrefix(line, "http") { - if len(strings.TrimSpace(currentStream.Title)) == 0 { - log.Printf("Error capturing title, line will be skipped: %s\n", extInfLine) - continue - } - existingStream, err := database.GetStreamByTitle(db, currentStream.Title) - if err != nil { - return fmt.Errorf("GetStreamByTitle error (title: %s): %v", currentStream.Title, err) - } + var dbId int64 + if existingStream.Title != currentStream.Title { + if os.Getenv("DEBUG") == "true" { + log.Printf("Creating new database entry: %s", currentStream.Title) + } + dbId, err = database.InsertStream(db, currentStream) + if err != nil { + return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err) + } + } else { + if os.Getenv("DEBUG") == "true" { + log.Printf("Using existing database entry: %s", existingStream.Title) + } + dbId = existingStream.DbId + } - var dbId int64 - if existingStream.Title != currentStream.Title { if os.Getenv("DEBUG") == "true" { - log.Printf("Creating new database entry: %s", currentStream.Title) + log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) } - dbId, err = database.InsertStream(db, currentStream) + _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ + Content: line, + M3UIndex: m3uIndex, + MaxConcurrency: maxConcurrency, + }) if err != nil { - return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err) + return fmt.Errorf("InsertStreamUrl error (title: %s): %v", currentStream.Title, err) } - } else { - if os.Getenv("DEBUG") == "true" { - log.Printf("Using existing database entry: %s", existingStream.Title) - } - dbId = existingStream.DbId } + } - if os.Getenv("DEBUG") == "true" { - log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) - } - _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ - Content: line, - M3UIndex: m3uIndex, - MaxConcurrency: maxConcurrency, - }) - if err != nil { - return fmt.Errorf("InsertStreamUrl error (title: %s): %v", currentStream.Title, err) - } + if scanner.Err() == io.EOF { + // Unexpected EOF, retry + log.Printf("Unexpected EOF. Retrying in 5 secs...") + time.Sleep(5 * time.Second) + continue } - } - if err := scanner.Err(); err != nil { - return fmt.Errorf("scanner error: %v", err) - } + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanner error: %v", err) + } - return nil + return nil + } + return fmt.Errorf("Max retries reached without success. Failed to fetch %s\n", m3uURL) } From 232075b771a223e22782e3526a55d79606cc90fc Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 20:29:30 -0500 Subject: [PATCH 12/13] fix logs without newlines --- m3u/parser.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index e44255b0..9588ef58 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -132,7 +132,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int var dbId int64 if existingStream.Title != currentStream.Title { if os.Getenv("DEBUG") == "true" { - log.Printf("Creating new database entry: %s", currentStream.Title) + log.Printf("Creating new database entry: %s\n", currentStream.Title) } dbId, err = database.InsertStream(db, currentStream) if err != nil { @@ -140,14 +140,15 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int } } else { if os.Getenv("DEBUG") == "true" { - log.Printf("Using existing database entry: %s", existingStream.Title) + log.Printf("Using existing database entry: %s\n", existingStream.Title) } dbId = existingStream.DbId } if os.Getenv("DEBUG") == "true" { - log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) + log.Printf("Adding MP4 url entry to %s: %s\n", currentStream.Title, line) } + _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ Content: line, M3UIndex: m3uIndex, @@ -161,7 +162,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int if scanner.Err() == io.EOF { // Unexpected EOF, retry - log.Printf("Unexpected EOF. Retrying in 5 secs...") + log.Printf("Unexpected EOF. Retrying in 5 secs... (url: %s)\n", m3uURL) time.Sleep(5 * time.Second) continue } From 90e04784a475c5f022b1c63306729d3b93e992da Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 20:36:32 -0500 Subject: [PATCH 13/13] prevent url duplicates on retry --- database/db.go | 24 ++++++++++++++++++++++++ m3u/parser.go | 18 +++++++++++++----- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/database/db.go b/database/db.go index 5be363b3..999c29db 100644 --- a/database/db.go +++ b/database/db.go @@ -345,6 +345,30 @@ func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) { return s, nil } +func GetStreamUrlByUrlAndIndex(db *sql.DB, url string, m3u_index int) (s StreamURL, err error) { + mutex.Lock() + defer mutex.Unlock() + + rows, err := db.Query("SELECT id, content, m3u_index, max_concurrency FROM stream_urls WHERE content = ? AND m3u_index = ?", url, m3u_index) + if err != nil { + return s, fmt.Errorf("error querying streams: %v", err) + } + defer rows.Close() + + for rows.Next() { + err = rows.Scan(&s.DbId, &s.Content, &s.M3UIndex, &s.MaxConcurrency) + if err != nil { + return s, fmt.Errorf("error scanning stream: %v", err) + } + } + + if err := rows.Err(); err != nil { + return s, fmt.Errorf("error iterating over rows: %v", err) + } + + return s, nil +} + func GetStreams(db *sql.DB) ([]StreamInfo, error) { mutex.Lock() defer mutex.Unlock() diff --git a/m3u/parser.go b/m3u/parser.go index 9588ef58..ffe7cbf2 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -149,11 +149,19 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int log.Printf("Adding MP4 url entry to %s: %s\n", currentStream.Title, line) } - _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ - Content: line, - M3UIndex: m3uIndex, - MaxConcurrency: maxConcurrency, - }) + existingUrl, err := database.GetStreamUrlByUrlAndIndex(db, line, m3uIndex) + if err != nil { + return fmt.Errorf("GetStreamUrlByUrlAndIndex error (url: %s): %v", line, err) + } + + if existingUrl.Content != line || existingUrl.M3UIndex != m3uIndex { + _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ + Content: line, + M3UIndex: m3uIndex, + MaxConcurrency: maxConcurrency, + }) + } + if err != nil { return fmt.Errorf("InsertStreamUrl error (title: %s): %v", currentStream.Title, err) }