From 5a506863f0bb3780b63a7935c16039385bd83e69 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 15:33:27 -0500 Subject: [PATCH 1/9] add a memory session to retain tested indexes across requests --- handlers/stream_handler.go | 6 ++--- proxy/load_balancer.go | 6 ++--- store/sessions.go | 50 ++++++++++++++++++++++++++++++++++++++ utils/fingerprint.go | 26 ++++++++++++++++++++ 4 files changed, 82 insertions(+), 6 deletions(-) create mode 100644 store/sessions.go create mode 100644 utils/fingerprint.go diff --git a/handlers/stream_handler.go b/handlers/stream_handler.go index ccded22..c5958c8 100644 --- a/handlers/stream_handler.go +++ b/handlers/stream_handler.go @@ -35,7 +35,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency var selectedIndex int var selectedUrl string - testedIndexes := []int{} + session := store.GetOrCreateSession(r) firstWrite := true var resp *http.Response @@ -46,7 +46,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency }() for { - resp, selectedUrl, selectedIndex, err = stream.LoadBalancer(ctx, &testedIndexes, r.Method) + resp, selectedUrl, selectedIndex, err = stream.LoadBalancer(ctx, &session, r.Method) if err != nil { utils.SafeLogf("Error reloading stream for %s: %v\n", streamUrl, err) return @@ -78,7 +78,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency defer proxyCtxCancel() go stream.ProxyStream(proxyCtx, selectedIndex, resp, r, w, exitStatus) - testedIndexes = append(testedIndexes, selectedIndex) + session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex)) select { case <-ctx.Done(): diff --git a/proxy/load_balancer.go b/proxy/load_balancer.go index f3f3af1..04c6037 100644 --- a/proxy/load_balancer.go +++ b/proxy/load_balancer.go @@ -31,7 +31,7 @@ func NewStreamInstance(streamUrl string, cm *store.ConcurrencyManager) (*StreamI }, nil } -func (instance *StreamInstance) LoadBalancer(ctx context.Context, previous *[]int, method string) (*http.Response, string, int, error) { +func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, int, error) { debug := os.Getenv("DEBUG") == "true" m3uIndexes := utils.GetM3UIndexes() @@ -64,7 +64,7 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, previous *[]in return nil, "", -1, fmt.Errorf("Cancelling load balancer.") default: for _, index := range m3uIndexes { - if slices.Contains(*previous, index) { + if slices.Contains(session.TestedIndexes, index) { utils.SafeLogf("Skipping M3U_%d: marked as previous stream\n", index+1) continue } @@ -99,7 +99,7 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, previous *[]in if debug { utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap) } - *previous = []int{} + session.SetTestedIndexes([]int{}) } } diff --git a/store/sessions.go b/store/sessions.go new file mode 100644 index 0000000..07527d8 --- /dev/null +++ b/store/sessions.go @@ -0,0 +1,50 @@ +package store + +import ( + "m3u-stream-merger/utils" + "net/http" + "sync" + "time" +) + +type Session struct { + ID string + CreatedAt time.Time + TestedIndexes []int +} + +var sessionStore = struct { + sync.RWMutex + sessions map[string]Session +}{sessions: make(map[string]Session)} + +func GetOrCreateSession(r *http.Request) Session { + fingerprint := utils.GenerateFingerprint(r) + + sessionStore.RLock() + session, exists := sessionStore.sessions[fingerprint] + sessionStore.RUnlock() + if exists { + return session + } + + session = Session{ + ID: fingerprint, + CreatedAt: time.Now(), + TestedIndexes: []int{}, + } + + sessionStore.Lock() + sessionStore.sessions[session.ID] = session + sessionStore.Unlock() + + return session +} + +func (s *Session) SetTestedIndexes(indexes []int) { + s.TestedIndexes = indexes + + sessionStore.Lock() + sessionStore.sessions[s.ID] = *s + sessionStore.Unlock() +} diff --git a/utils/fingerprint.go b/utils/fingerprint.go new file mode 100644 index 0000000..9ae50a1 --- /dev/null +++ b/utils/fingerprint.go @@ -0,0 +1,26 @@ +package utils + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" +) + +func GenerateFingerprint(r *http.Request) string { + // Collect relevant attributes + ip := r.RemoteAddr + if xff := r.Header.Get("X-Forwarded-For"); xff != "" { + ip = xff + } + userAgent := r.Header.Get("User-Agent") + accept := r.Header.Get("Accept") + acceptLang := r.Header.Get("Accept-Language") + + // Combine into a single string + data := fmt.Sprintf("%s|%s|%s|%s", ip, userAgent, accept, acceptLang) + + // Hash the string for a compact, fixed-length identifier + hash := sha256.Sum256([]byte(data)) + return hex.EncodeToString(hash[:]) +} From c4d3ff2be83654c80645902b56e312a15cd95b83 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 15:40:05 -0500 Subject: [PATCH 2/9] add session clearing on sync --- store/sessions.go | 8 ++++++++ updater/updater.go | 2 ++ 2 files changed, 10 insertions(+) diff --git a/store/sessions.go b/store/sessions.go index 07527d8..69adacf 100644 --- a/store/sessions.go +++ b/store/sessions.go @@ -41,6 +41,14 @@ func GetOrCreateSession(r *http.Request) Session { return session } +func ClearSessionStore() { + sessionStore.Lock() + for k := range sessionStore.sessions { + delete(sessionStore.sessions, k) + } + sessionStore.Unlock() +} + func (s *Session) SetTestedIndexes(indexes []int) { s.TestedIndexes = indexes diff --git a/updater/updater.go b/updater/updater.go index e95b6bd..04e1943 100644 --- a/updater/updater.go +++ b/updater/updater.go @@ -95,6 +95,8 @@ func (instance *Updater) UpdateSources(ctx context.Context) { utils.SafeLogf("Background process: M3U fetching complete.\n") + store.ClearSessionStore() + cacheOnSync := os.Getenv("CACHE_ON_SYNC") if len(strings.TrimSpace(cacheOnSync)) == 0 { cacheOnSync = "false" From 42f09753330a3e5d32c848db129e8fa76c420736 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 15:46:15 -0500 Subject: [PATCH 3/9] add debug logs on sessions --- store/sessions.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/store/sessions.go b/store/sessions.go index 69adacf..7c3b73a 100644 --- a/store/sessions.go +++ b/store/sessions.go @@ -3,6 +3,7 @@ package store import ( "m3u-stream-merger/utils" "net/http" + "os" "sync" "time" ) @@ -19,12 +20,16 @@ var sessionStore = struct { }{sessions: make(map[string]Session)} func GetOrCreateSession(r *http.Request) Session { + debug := os.Getenv("DEBUG") == "true" fingerprint := utils.GenerateFingerprint(r) sessionStore.RLock() session, exists := sessionStore.sessions[fingerprint] sessionStore.RUnlock() if exists { + if debug { + utils.SafeLogf("[DEBUG] Existing session found: %s\n", fingerprint) + } return session } @@ -38,6 +43,10 @@ func GetOrCreateSession(r *http.Request) Session { sessionStore.sessions[session.ID] = session sessionStore.Unlock() + if debug { + utils.SafeLogf("[DEBUG] Generating new session: %s\n", fingerprint) + } + return session } @@ -50,8 +59,14 @@ func ClearSessionStore() { } func (s *Session) SetTestedIndexes(indexes []int) { + debug := os.Getenv("DEBUG") == "true" + s.TestedIndexes = indexes + if debug { + utils.SafeLogf("[DEBUG] Setting tested indexes for session - %s: %v\n", s.ID, s.TestedIndexes) + } + sessionStore.Lock() sessionStore.sessions[s.ID] = *s sessionStore.Unlock() From a4c8111c34bc8557dbcbbec16914b48218915190 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 15:50:09 -0500 Subject: [PATCH 4/9] add debug logs from fingerprint generation --- utils/fingerprint.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/utils/fingerprint.go b/utils/fingerprint.go index 9ae50a1..247669f 100644 --- a/utils/fingerprint.go +++ b/utils/fingerprint.go @@ -5,9 +5,12 @@ import ( "encoding/hex" "fmt" "net/http" + "os" ) func GenerateFingerprint(r *http.Request) string { + debug := os.Getenv("DEBUG") == "true" + // Collect relevant attributes ip := r.RemoteAddr if xff := r.Header.Get("X-Forwarded-For"); xff != "" { @@ -19,6 +22,9 @@ func GenerateFingerprint(r *http.Request) string { // Combine into a single string data := fmt.Sprintf("%s|%s|%s|%s", ip, userAgent, accept, acceptLang) + if debug { + SafeLogf("[DEBUG] Generating fingerprint from: %s\n", data) + } // Hash the string for a compact, fixed-length identifier hash := sha256.Sum256([]byte(data)) From d08d5ae08ff02755743b7a3c3aa41ece5dc079bd Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 15:53:58 -0500 Subject: [PATCH 5/9] remove port from ip on fingerprint generate --- utils/fingerprint.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/fingerprint.go b/utils/fingerprint.go index 247669f..b0b92a1 100644 --- a/utils/fingerprint.go +++ b/utils/fingerprint.go @@ -6,13 +6,14 @@ import ( "fmt" "net/http" "os" + "strings" ) func GenerateFingerprint(r *http.Request) string { debug := os.Getenv("DEBUG") == "true" // Collect relevant attributes - ip := r.RemoteAddr + ip := strings.Split(r.RemoteAddr, ":")[0] if xff := r.Header.Get("X-Forwarded-For"); xff != "" { ip = xff } From 2cbafccb5bf99e7797d2d90629e3da8d18ad424e Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 16:02:32 -0500 Subject: [PATCH 6/9] consider failed stream as skip --- proxy/load_balancer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/load_balancer.go b/proxy/load_balancer.go index 04c6037..a6e0886 100644 --- a/proxy/load_balancer.go +++ b/proxy/load_balancer.go @@ -80,10 +80,9 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store continue } - allSkipped = false // At least one URL is not skipped - resp, err := utils.CustomHttpRequest(method, url) if err == nil { + allSkipped = false // At least one URL is not skipped if debug { utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url) } @@ -93,6 +92,7 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store if debug { utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error()) } + session.SetTestedIndexes(append(session.TestedIndexes, index)) } if allSkipped { From a53e7194f001ec2f2df9367df267702428dad19f Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 16:07:49 -0500 Subject: [PATCH 7/9] include path in session fingerprint --- utils/fingerprint.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/fingerprint.go b/utils/fingerprint.go index b0b92a1..1efa90a 100644 --- a/utils/fingerprint.go +++ b/utils/fingerprint.go @@ -20,9 +20,10 @@ func GenerateFingerprint(r *http.Request) string { userAgent := r.Header.Get("User-Agent") accept := r.Header.Get("Accept") acceptLang := r.Header.Get("Accept-Language") + path := r.URL.Path // Combine into a single string - data := fmt.Sprintf("%s|%s|%s|%s", ip, userAgent, accept, acceptLang) + data := fmt.Sprintf("%s|%s|%s|%s|%s", ip, userAgent, accept, acceptLang, path) if debug { SafeLogf("[DEBUG] Generating fingerprint from: %s\n", data) } From b8fd18ba821f245dbdc50f6351537b0ec2de78df Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 16:13:22 -0500 Subject: [PATCH 8/9] only append failed indexes --- handlers/stream_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handlers/stream_handler.go b/handlers/stream_handler.go index c5958c8..1e9b84b 100644 --- a/handlers/stream_handler.go +++ b/handlers/stream_handler.go @@ -78,7 +78,6 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency defer proxyCtxCancel() go stream.ProxyStream(proxyCtx, selectedIndex, resp, r, w, exitStatus) - session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex)) select { case <-ctx.Done(): @@ -92,6 +91,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency return } else if streamExitCode == 1 || streamExitCode == 2 { // Retry on server-side connection errors + session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex)) utils.SafeLogf("Retrying other servers...\n") proxyCtxCancel() } else if streamExitCode == 4 { From 0189bef2877869c712a501d18d13ba5c25fa2881 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Fri, 20 Dec 2024 16:16:02 -0500 Subject: [PATCH 9/9] remove unnecessary all skipped --- proxy/load_balancer.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/proxy/load_balancer.go b/proxy/load_balancer.go index a6e0886..bca28b6 100644 --- a/proxy/load_balancer.go +++ b/proxy/load_balancer.go @@ -57,7 +57,6 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store if debug { utils.SafeLogf("[DEBUG] Stream attempt %d out of %d\n", lap+1, maxLaps) } - allSkipped := true // Assume all URLs might be skipped select { case <-ctx.Done(): @@ -82,7 +81,6 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store resp, err := utils.CustomHttpRequest(method, url) if err == nil { - allSkipped = false // At least one URL is not skipped if debug { utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url) } @@ -95,12 +93,10 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store session.SetTestedIndexes(append(session.TestedIndexes, index)) } - if allSkipped { - if debug { - utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap) - } - session.SetTestedIndexes([]int{}) + if debug { + utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap) } + session.SetTestedIndexes([]int{}) }