From a702e85915a16b1f22b0c835245ffd28ee7936e1 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:16:04 -0500 Subject: [PATCH 01/19] implement atomic renaming on swapDb --- main.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 5cf19e45..7e62b088 100644 --- a/main.go +++ b/main.go @@ -17,19 +17,46 @@ import ( var db *sql.DB func swapDb() error { - err := database.DeleteSQLite(db, "current_streams") + // Generate a unique temporary name + tempName := fmt.Sprintf("temp_%d", time.Now().UnixNano()) + + // Rename the current database to a temporary name + err := database.RenameSQLite("current_streams", tempName) if err != nil { - return fmt.Errorf("Error deleting current_streams: %v\n", err) + return fmt.Errorf("Error renaming current_streams to temp: %v\n", err) } + // Rename the next database to current err = database.RenameSQLite("next_streams", "current_streams") if err != nil { - return fmt.Errorf("Error renaming to current_streams: %v\n", err) + // If renaming fails, revert the previous renaming to maintain consistency + revertErr := database.RenameSQLite(tempName, "current_streams") + if revertErr != nil { + return fmt.Errorf("Error renaming back to current_streams: %v\n", revertErr) + } + return fmt.Errorf("Error renaming next_streams to current_streams: %v\n", err) } + // Initialize the new current database db, err = database.InitializeSQLite("current_streams") if err != nil { - return fmt.Errorf("Error reinitializing to current_streams: %v\n", err) + // If initialization fails, revert both renamings + revertErr := database.RenameSQLite(tempName, "current_streams") + if revertErr != nil { + return fmt.Errorf("Error renaming back to current_streams: %v\n", revertErr) + } + revertErr = database.RenameSQLite("current_streams", "next_streams") + if revertErr != nil { + return fmt.Errorf("Error renaming back to next_streams: %v\n", revertErr) + } + return fmt.Errorf("Error initializing current_streams: %v\n", err) + } + + // Delete the temporary database + err = database.DeleteSQLite(db, tempName) + if err != nil { + // Log the error but do not return as this is not a critical error + fmt.Printf("Error deleting temp database: %v\n", err) } return nil From f18e44a6b93730b4e4dfe0c4bc3e4cbca25c2ef0 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:24:05 -0500 Subject: [PATCH 02/19] add mutex to sqlite queries --- database/db.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/database/db.go b/database/db.go index e8f34b66..9b3b0154 100644 --- a/database/db.go +++ b/database/db.go @@ -5,10 +5,13 @@ import ( "fmt" "os" "path/filepath" + "sync" _ "github.com/mattn/go-sqlite3" ) +var mutex sync.Mutex + func InitializeSQLite(name string) (db *sql.DB, err error) { foldername := filepath.Join(".", "data") filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name)) @@ -84,6 +87,9 @@ func RenameSQLite(prevName string, nextName string) error { } func SaveToSQLite(db *sql.DB, streams []StreamInfo) (err error) { + mutex.Lock() + defer mutex.Unlock() + tx, err := db.Begin() if err != nil { return fmt.Errorf("error beginning transaction: %v", err) @@ -134,6 +140,9 @@ func SaveToSQLite(db *sql.DB, streams []StreamInfo) (err error) { } func InsertStream(db *sql.DB, s StreamInfo) (i int64, err error) { + mutex.Lock() + defer mutex.Unlock() + tx, err := db.Begin() if err != nil { return -1, fmt.Errorf("error beginning transaction: %v", err) @@ -168,6 +177,9 @@ func InsertStream(db *sql.DB, s StreamInfo) (i int64, err error) { } func InsertStreamUrl(db *sql.DB, id int64, url StreamURL) (i int64, err error) { + mutex.Lock() + defer mutex.Unlock() + tx, err := db.Begin() if err != nil { return -1, fmt.Errorf("error beginning transaction: %v", err) @@ -203,6 +215,9 @@ func InsertStreamUrl(db *sql.DB, id int64, url StreamURL) (i int64, err error) { } func DeleteStreamByTitle(db *sql.DB, title string) error { + mutex.Lock() + defer mutex.Unlock() + tx, err := db.Begin() if err != nil { return fmt.Errorf("error beginning transaction: %v", err) @@ -233,6 +248,9 @@ func DeleteStreamByTitle(db *sql.DB, title string) error { } func DeleteStreamURL(db *sql.DB, streamURLID int64) error { + mutex.Lock() + defer mutex.Unlock() + tx, err := db.Begin() if err != nil { return fmt.Errorf("error beginning transaction: %v", err) @@ -263,6 +281,9 @@ func DeleteStreamURL(db *sql.DB, streamURLID int64) error { } func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) { + mutex.Lock() + defer mutex.Unlock() + rows, err := db.Query("SELECT id, title, tvg_id, logo_url, group_name FROM streams WHERE title = ?", title) if err != nil { return s, fmt.Errorf("error querying streams: %v", err) @@ -308,6 +329,9 @@ func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) { } func GetStreams(db *sql.DB) ([]StreamInfo, error) { + mutex.Lock() + defer mutex.Unlock() + rows, err := db.Query("SELECT id, title, tvg_id, logo_url, group_name FROM streams") if err != nil { return nil, fmt.Errorf("error querying streams: %v", err) From 6e99acf613833eeda8ee81d02c7765c4887b32a3 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:25:11 -0500 Subject: [PATCH 03/19] unimplemented mutex --- database/db.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/database/db.go b/database/db.go index 9b3b0154..9ec79844 100644 --- a/database/db.go +++ b/database/db.go @@ -13,6 +13,9 @@ import ( var mutex sync.Mutex func InitializeSQLite(name string) (db *sql.DB, err error) { + mutex.Lock() + defer mutex.Unlock() + foldername := filepath.Join(".", "data") filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name)) @@ -65,6 +68,9 @@ func InitializeSQLite(name string) (db *sql.DB, err error) { // DeleteSQLite deletes the SQLite database file. func DeleteSQLite(db *sql.DB, name string) error { + mutex.Lock() + defer mutex.Unlock() + foldername := filepath.Join(".", "data") filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name)) @@ -77,6 +83,9 @@ func DeleteSQLite(db *sql.DB, name string) error { } func RenameSQLite(prevName string, nextName string) error { + mutex.Lock() + defer mutex.Unlock() + foldername := filepath.Join(".", "data") prevFileName := filepath.Join(foldername, fmt.Sprintf("%s.db", prevName)) nextFileName := filepath.Join(foldername, fmt.Sprintf("%s.db", nextName)) From a9208e84d6020408a57d047b60fad50419546f9f Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:32:34 -0500 Subject: [PATCH 04/19] ensure redis initialized only once --- database/redis.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/database/redis.go b/database/redis.go index dbc29629..abb76b95 100644 --- a/database/redis.go +++ b/database/redis.go @@ -1,13 +1,22 @@ package database -import "github.com/redis/go-redis/v9" +import ( + "sync" + + "github.com/redis/go-redis/v9" +) func InitializeRedis() *redis.Client { + var redisClient *redis.Client + var redisOnce sync.Once + // Initialize Redis client - redisClient := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // Change this to your Redis server address - Password: "", // No password set - DB: 0, // Use default DB + redisOnce.Do(func() { + redisClient = redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Change this to your Redis server address + Password: "", // No password set + DB: 0, // Use default DB + }) }) return redisClient From 26806bb013c99f1820bd0d45aef7777bb82a7c1a Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:38:06 -0500 Subject: [PATCH 05/19] add logging during parsing --- m3u/parser.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/m3u/parser.go b/m3u/parser.go index 4108d1d9..7571b649 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -82,14 +82,17 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int var dbId int64 if existingStream.Title != currentStream.Title { + log.Printf("Creating new database entry: %s", currentStream.Title) dbId, err = database.InsertStream(db, currentStream) if err != nil { return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err) } } else { + log.Printf("Using existing database entry: %s", currentStream.Title) dbId = existingStream.DbId } + log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line) _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ Content: line, M3UIndex: m3uIndex, From 72ed236eef5b0301bd1ccc59831533befc8f405f Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:45:41 -0500 Subject: [PATCH 06/19] ensure db closed before opening --- database/db.go | 10 +++++++++- main.go | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/database/db.go b/database/db.go index 9ec79844..5be363b3 100644 --- a/database/db.go +++ b/database/db.go @@ -3,6 +3,7 @@ package database import ( "database/sql" "fmt" + "log" "os" "path/filepath" "sync" @@ -16,6 +17,13 @@ func InitializeSQLite(name string) (db *sql.DB, err error) { mutex.Lock() defer mutex.Unlock() + if db != nil { + err := db.Close() + if err == nil { + log.Printf("Database session has already been closed: %v\n", err) + } + } + foldername := filepath.Join(".", "data") filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name)) @@ -67,7 +75,7 @@ func InitializeSQLite(name string) (db *sql.DB, err error) { } // DeleteSQLite deletes the SQLite database file. -func DeleteSQLite(db *sql.DB, name string) error { +func DeleteSQLite(name string) error { mutex.Lock() defer mutex.Unlock() diff --git a/main.go b/main.go index 7e62b088..f303538f 100644 --- a/main.go +++ b/main.go @@ -53,7 +53,7 @@ func swapDb() error { } // Delete the temporary database - err = database.DeleteSQLite(db, tempName) + err = database.DeleteSQLite(tempName) if err != nil { // Log the error but do not return as this is not a critical error fmt.Printf("Error deleting temp database: %v\n", err) From e6eeb836139cc66011c46a74de9c478c4a5a4930 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:47:41 -0500 Subject: [PATCH 07/19] fix existing stream log --- m3u/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/m3u/parser.go b/m3u/parser.go index 7571b649..c86834c1 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -88,7 +88,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err) } } else { - log.Printf("Using existing database entry: %s", currentStream.Title) + log.Printf("Using existing database entry: %s", existingStream.Title) dbId = existingStream.DbId } From 6504cc4cd0274782a06b241ac2d00f25a3fddbec Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:50:35 -0500 Subject: [PATCH 08/19] fix deletesqlite arguments --- database/database_test.go | 2 +- m3u/m3u_test.go | 4 ++-- main.go | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/database/database_test.go b/database/database_test.go index 5a5689fa..40665b75 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -74,7 +74,7 @@ func TestSaveAndLoadFromSQLite(t *testing.T) { } } - err = DeleteSQLite(db, "test") + err = DeleteSQLite("test") if err != nil { t.Errorf("DeleteSQLite returned error: %v", err) } diff --git a/m3u/m3u_test.go b/m3u/m3u_test.go index 34d8aa77..b1de4191 100644 --- a/m3u/m3u_test.go +++ b/m3u/m3u_test.go @@ -67,7 +67,7 @@ func TestGenerateM3UContent(t *testing.T) { rr.Body.String(), expectedContent) } - err = database.DeleteSQLite(db, "test") + err = database.DeleteSQLite("test") if err != nil { t.Errorf("DeleteSQLite returned error: %v", err) } @@ -155,7 +155,7 @@ http://example.com/fox } } - err = database.DeleteSQLite(db, "test") + err = database.DeleteSQLite("test") if err != nil { t.Errorf("DeleteSQLite returned error: %v", err) } diff --git a/main.go b/main.go index f303538f..5fc81ac1 100644 --- a/main.go +++ b/main.go @@ -149,7 +149,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - db, err := database.InitializeSQLite("current_streams") + var err error + db, err = database.InitializeSQLite("current_streams") if err != nil { log.Fatalf("Error initializing current SQLite database: %v", err) } From 42dd95257d0912396ccd8e1662a7ca80e84460b0 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 13:57:47 -0500 Subject: [PATCH 09/19] fix empty tvg-name --- m3u/parser.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/m3u/parser.go b/m3u/parser.go index c86834c1..5c6f48b0 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -67,6 +67,11 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int case "tvg-logo": currentStream.LogoURL = value } + + // If tvg-name is empty, use the stream title as a fallback + if key == "tvg-name" && value == "" { + currentStream.Title = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "#EXTINF:-1"), ",")) + } } } else if strings.HasPrefix(line, "#EXTVLCOPT:") { // Extract logo URL from #EXTVLCOPT line From 6f3c183b62dc433766df962146b6ee47e031d56b Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:02:24 -0500 Subject: [PATCH 10/19] only use stream title as title instead of tvg-name --- m3u/parser.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/m3u/parser.go b/m3u/parser.go index 5c6f48b0..6ee76b43 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -46,6 +46,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int if strings.HasPrefix(line, "#EXTINF:") { currentStream = database.StreamInfo{} + currentStream.Title = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "#EXTINF:-1"), ",")) // Define a regular expression to capture key-value pairs regex := regexp.MustCompile(`(\S+?)="([^"]*?)"`) @@ -60,18 +61,11 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int switch key { case "tvg-id": currentStream.TvgID = value - case "tvg-name": - currentStream.Title = value case "group-title": currentStream.Group = value case "tvg-logo": currentStream.LogoURL = value } - - // If tvg-name is empty, use the stream title as a fallback - if key == "tvg-name" && value == "" { - currentStream.Title = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "#EXTINF:-1"), ",")) - } } } else if strings.HasPrefix(line, "#EXTVLCOPT:") { // Extract logo URL from #EXTVLCOPT line From 381c1e7616fce51b01652b521f9e1f4a073aeb32 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:04:34 -0500 Subject: [PATCH 11/19] fix stream title parsing --- m3u/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/m3u/parser.go b/m3u/parser.go index 6ee76b43..e0a4bc63 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -46,7 +46,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int if strings.HasPrefix(line, "#EXTINF:") { currentStream = database.StreamInfo{} - currentStream.Title = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "#EXTINF:-1"), ",")) + currentStream.Title = strings.TrimSpace(strings.SplitN(line, ",", 2)[1]) // Define a regular expression to capture key-value pairs regex := regexp.MustCompile(`(\S+?)="([^"]*?)"`) From 9bd9c2fb054b146d7253db67acd6f09cb697887c Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:06:18 -0500 Subject: [PATCH 12/19] fix m3u parse test --- m3u/m3u_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/m3u/m3u_test.go b/m3u/m3u_test.go index b1de4191..1dda2fe0 100644 --- a/m3u/m3u_test.go +++ b/m3u/m3u_test.go @@ -80,7 +80,7 @@ func TestParseM3UFromURL(t *testing.T) { http://example.com/bbc1 #EXTINF:-1 tvg-id="bbc2" tvg-name="BBC Two" group-title="UK",BBC Two http://example.com/bbc2 -#EXTINF:-1 tvg-id="cnn" tvg-name="CNN International" group-title="News",CNN +#EXTINF:-1 tvg-id="cnn" tvg-name="CNN International" group-title="News",CNN International http://example.com/cnn #EXTVLCOPT:logo=http://example.com/bbc_logo.png #EXTINF:-1 tvg-id="fox" tvg-name="FOX" group-title="Entertainment",FOX From 2e9e25f4fba88054a8b816fe57a88bc6470d4d7d Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:19:34 -0500 Subject: [PATCH 13/19] listen for client disconnect --- mp4_handler.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index d8df4aad..0b368304 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -17,10 +17,6 @@ import ( ) func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { - // Create a context with cancellation capability - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() - // Log the incoming request log.Printf("Received request from %s for URL: %s\n", r.RemoteAddr, r.URL.Path) @@ -60,14 +56,14 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Concurrency check mode for _, url := range stream.URLs { - if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { + if checkConcurrency(r.Context(), url.Content, url.MaxConcurrency) { continue // Skip this stream if concurrency limit reached } resp, err = http.Get(url.Content) if err == nil { selectedUrl = &url - updateConcurrency(ctx, url.Content, true) + updateConcurrency(r.Context(), url.Content, true) break } @@ -80,8 +76,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { for _, url := range stream.URLs { resp, err = http.Get(url.Content) if err == nil { - selectedUrl = &url - updateConcurrency(ctx, url.Content, true) + updateConcurrency(r.Context(), url.Content, true) break } // Log the error @@ -108,12 +103,19 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the successful response log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr) + // Listen for client disconnect + go func() { + <-r.Context().Done() + log.Println("Client disconnected") + updateConcurrency(r.Context(), selectedUrl.Content, false) + }() + // Check if the connection is still open before copying the MP4 stream to the response select { case <-r.Context().Done(): // Connection closed, handle accordingly log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.Content, false) + updateConcurrency(r.Context(), selectedUrl.Content, false) return default: // Connection still open, proceed with writing to the response @@ -122,7 +124,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the error if errors.Is(err, syscall.EPIPE) { log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.Content, false) + updateConcurrency(r.Context(), selectedUrl.Content, false) } else { log.Printf("Error copying MP4 stream to response: %s\n", err.Error()) } From 1ce7c143724a2c8f0ae9e45d821a506c1d72cef3 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:25:19 -0500 Subject: [PATCH 14/19] implement separate redis context --- mp4_handler.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index 0b368304..949844de 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -17,6 +17,10 @@ import ( ) func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { + // Create a context with cancellation capability + redisCtx, cancel := context.WithCancel(r.Context()) + defer cancel() + // Log the incoming request log.Printf("Received request from %s for URL: %s\n", r.RemoteAddr, r.URL.Path) @@ -56,14 +60,14 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Concurrency check mode for _, url := range stream.URLs { - if checkConcurrency(r.Context(), url.Content, url.MaxConcurrency) { + if checkConcurrency(redisCtx, url.Content, url.MaxConcurrency) { continue // Skip this stream if concurrency limit reached } resp, err = http.Get(url.Content) if err == nil { selectedUrl = &url - updateConcurrency(r.Context(), url.Content, true) + updateConcurrency(redisCtx, url.Content, true) break } @@ -76,7 +80,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { for _, url := range stream.URLs { resp, err = http.Get(url.Content) if err == nil { - updateConcurrency(r.Context(), url.Content, true) + updateConcurrency(redisCtx, url.Content, true) break } // Log the error @@ -107,7 +111,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { go func() { <-r.Context().Done() log.Println("Client disconnected") - updateConcurrency(r.Context(), selectedUrl.Content, false) + updateConcurrency(redisCtx, selectedUrl.Content, false) }() // Check if the connection is still open before copying the MP4 stream to the response @@ -115,7 +119,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { case <-r.Context().Done(): // Connection closed, handle accordingly log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(r.Context(), selectedUrl.Content, false) + updateConcurrency(redisCtx, selectedUrl.Content, false) return default: // Connection still open, proceed with writing to the response @@ -124,7 +128,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the error if errors.Is(err, syscall.EPIPE) { log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(r.Context(), selectedUrl.Content, false) + updateConcurrency(redisCtx, selectedUrl.Content, false) } else { log.Printf("Error copying MP4 stream to response: %s\n", err.Error()) } From 8bef991e1e8701d019d4e2cd09a363640fac413c Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:34:34 -0500 Subject: [PATCH 15/19] organize contexts --- mp4_handler.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index 949844de..32284698 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -18,7 +18,7 @@ import ( func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Create a context with cancellation capability - redisCtx, cancel := context.WithCancel(r.Context()) + ctx, cancel := context.WithCancel(r.Context()) defer cancel() // Log the incoming request @@ -60,14 +60,14 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Concurrency check mode for _, url := range stream.URLs { - if checkConcurrency(redisCtx, url.Content, url.MaxConcurrency) { + if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { continue // Skip this stream if concurrency limit reached } resp, err = http.Get(url.Content) if err == nil { selectedUrl = &url - updateConcurrency(redisCtx, url.Content, true) + updateConcurrency(ctx, url.Content, true) break } @@ -80,7 +80,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { for _, url := range stream.URLs { resp, err = http.Get(url.Content) if err == nil { - updateConcurrency(redisCtx, url.Content, true) + updateConcurrency(ctx, url.Content, true) break } // Log the error @@ -92,7 +92,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { log.Println("Error fetching MP4 stream. Exhausted all streams.") // Check if the connection is still open before writing to the response select { - case <-r.Context().Done(): + case <-ctx.Done(): // Connection closed, handle accordingly log.Println("Client disconnected") return @@ -107,19 +107,12 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the successful response log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr) - // Listen for client disconnect - go func() { - <-r.Context().Done() - log.Println("Client disconnected") - updateConcurrency(redisCtx, selectedUrl.Content, false) - }() - // Check if the connection is still open before copying the MP4 stream to the response select { - case <-r.Context().Done(): + case <-ctx.Done(): // Connection closed, handle accordingly log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(redisCtx, selectedUrl.Content, false) + updateConcurrency(ctx, selectedUrl.Content, false) return default: // Connection still open, proceed with writing to the response @@ -128,7 +121,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the error if errors.Is(err, syscall.EPIPE) { log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(redisCtx, selectedUrl.Content, false) + updateConcurrency(ctx, selectedUrl.Content, false) } else { log.Printf("Error copying MP4 stream to response: %s\n", err.Error()) } From c45a5cad384a786f26b3886838fbdc014478782d Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:47:03 -0500 Subject: [PATCH 16/19] separate loadbalancing function --- mp4_handler.go | 92 ++++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index 32284698..fa44fd49 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "fmt" "io" "log" "m3u-stream-merger/database" @@ -16,6 +17,48 @@ import ( "github.com/redis/go-redis/v9" ) +func loadBalancer(ctx context.Context, stream database.StreamInfo) (selectedUrl *database.StreamURL, err error) { + var resp *http.Response + + // Concurrency check mode + for _, url := range stream.URLs { + if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { + continue // Skip this stream if concurrency limit reached + } + + resp, err = http.Get(url.Content) + if err == nil { + selectedUrl = &url + break + } + + // Log the error + log.Printf("Error fetching MP4 stream (concurrency check mode): %s\n", err.Error()) + } + + if selectedUrl == nil { + // Connection check mode + for _, url := range stream.URLs { + resp, err = http.Get(url.Content) + if err == nil { + selectedUrl = &url + break + } + // Log the error + return nil, fmt.Errorf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) + } + + if resp == nil { + // Log the error + return nil, fmt.Errorf("Error fetching MP4 stream. Exhausted all streams.") + } + + return selectedUrl, nil + } + + return selectedUrl, nil +} + func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Create a context with cancellation capability ctx, cancel := context.WithCancel(r.Context()) @@ -58,51 +101,12 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Iterate through the streams and select one based on concurrency and availability var selectedUrl *database.StreamURL - // Concurrency check mode - for _, url := range stream.URLs { - if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { - continue // Skip this stream if concurrency limit reached - } - - resp, err = http.Get(url.Content) - if err == nil { - selectedUrl = &url - updateConcurrency(ctx, url.Content, true) - break - } - - // Log the error - log.Printf("Error fetching MP4 stream (concurrency check mode): %s\n", err.Error()) - } - - if selectedUrl == nil { - // Connection check mode - for _, url := range stream.URLs { - resp, err = http.Get(url.Content) - if err == nil { - updateConcurrency(ctx, url.Content, true) - break - } - // Log the error - log.Printf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) - } - - if resp == nil { - // Log the error - log.Println("Error fetching MP4 stream. Exhausted all streams.") - // Check if the connection is still open before writing to the response - select { - case <-ctx.Done(): - // Connection closed, handle accordingly - log.Println("Client disconnected") - return - default: - // Connection still open, proceed with writing to the response - http.Error(w, "Error fetching MP4 stream. Exhausted all streams.", http.StatusInternalServerError) - return - } - } + selectedUrl, err = loadBalancer(ctx, stream) + if err != nil { + http.Error(w, "Error fetching MP4 stream. Exhausted all streams.", http.StatusInternalServerError) + return } + updateConcurrency(ctx, selectedUrl.Content, true) // Log the successful response log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr) From ef30191e1ba1aaace434bfab29e68a9bf369a5ef Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:49:10 -0500 Subject: [PATCH 17/19] fix golang linting --- mp4_handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index fa44fd49..c7fac7cc 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -43,9 +43,10 @@ func loadBalancer(ctx context.Context, stream database.StreamInfo) (selectedUrl if err == nil { selectedUrl = &url break + } else { + // Log the error + return nil, fmt.Errorf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) } - // Log the error - return nil, fmt.Errorf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) } if resp == nil { From acda21da4931819480fc640be37ffa2f4a5ac782 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:54:57 -0500 Subject: [PATCH 18/19] fix nil resp --- mp4_handler.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index c7fac7cc..c4e93045 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -17,9 +17,7 @@ import ( "github.com/redis/go-redis/v9" ) -func loadBalancer(ctx context.Context, stream database.StreamInfo) (selectedUrl *database.StreamURL, err error) { - var resp *http.Response - +func loadBalancer(ctx context.Context, stream database.StreamInfo) (resp *http.Response, selectedUrl *database.StreamURL, err error) { // Concurrency check mode for _, url := range stream.URLs { if checkConcurrency(ctx, url.Content, url.MaxConcurrency) { @@ -45,19 +43,19 @@ func loadBalancer(ctx context.Context, stream database.StreamInfo) (selectedUrl break } else { // Log the error - return nil, fmt.Errorf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) + return nil, nil, fmt.Errorf("Error fetching MP4 stream (connection check mode): %s\n", err.Error()) } } if resp == nil { // Log the error - return nil, fmt.Errorf("Error fetching MP4 stream. Exhausted all streams.") + return nil, nil, fmt.Errorf("Error fetching MP4 stream. Exhausted all streams.") } - return selectedUrl, nil + return resp, selectedUrl, nil } - return selectedUrl, nil + return resp, selectedUrl, nil } func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { @@ -102,7 +100,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Iterate through the streams and select one based on concurrency and availability var selectedUrl *database.StreamURL - selectedUrl, err = loadBalancer(ctx, stream) + resp, selectedUrl, err = loadBalancer(ctx, stream) if err != nil { http.Error(w, "Error fetching MP4 stream. Exhausted all streams.", http.StatusInternalServerError) return From de8b27e5d4729979b3f98e57b65dd0187356bbb7 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sat, 2 Mar 2024 14:58:46 -0500 Subject: [PATCH 19/19] remove cancelability context --- mp4_handler.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mp4_handler.go b/mp4_handler.go index c4e93045..8917c4f1 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -59,9 +59,7 @@ func loadBalancer(ctx context.Context, stream database.StreamInfo) (resp *http.R } func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { - // Create a context with cancellation capability - ctx, cancel := context.WithCancel(r.Context()) - defer cancel() + ctx := r.Context() // Log the incoming request log.Printf("Received request from %s for URL: %s\n", r.RemoteAddr, r.URL.Path)