Skip to content

Commit

Permalink
Merge pull request #203 from sonroyaalmerol/feature/one-m3u-duplicate
Browse files Browse the repository at this point in the history
Add subindex for inter-m3u duplicates
  • Loading branch information
sonroyaalmerol authored Dec 22, 2024
2 parents 8c9f44a + 931cf29 commit 8a9a12a
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 102 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
)

require golang.org/x/sys v0.1.0 // indirect
require (
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
9 changes: 5 additions & 4 deletions handlers/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
return
}

var selectedIndex int
var selectedIndex string
var selectedSubIndex string
var selectedUrl string

session := store.GetOrCreateSession(r)
Expand All @@ -46,7 +47,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
}()

for {
resp, selectedUrl, selectedIndex, err = stream.LoadBalancer(ctx, &session, r.Method)
resp, selectedUrl, selectedIndex, selectedSubIndex, err = stream.LoadBalancer(ctx, &session, r.Method)
if err != nil {
utils.SafeLogf("Error reloading stream for %s: %v\n", streamUrl, err)
return
Expand Down Expand Up @@ -77,7 +78,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
proxyCtx, proxyCtxCancel := context.WithCancel(ctx)
defer proxyCtxCancel()

go stream.ProxyStream(proxyCtx, selectedIndex, resp, r, w, exitStatus)
go stream.ProxyStream(proxyCtx, selectedIndex, selectedSubIndex, resp, r, w, exitStatus)

select {
case <-ctx.Done():
Expand All @@ -91,7 +92,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
return
} else if streamExitCode == 1 || streamExitCode == 2 {
// Retry on server-side connection errors
session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex))
session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex+"|"+selectedSubIndex))
utils.SafeLogf("Retrying other servers...\n")
proxyCtxCancel()
} else if streamExitCode == 4 {
Expand Down
52 changes: 27 additions & 25 deletions proxy/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func NewStreamInstance(streamUrl string, cm *store.ConcurrencyManager) (*StreamI
}, nil
}

func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, int, error) {
func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, string, string, error) {
debug := os.Getenv("DEBUG") == "true"

m3uIndexes := utils.GetM3UIndexes()

sort.Slice(m3uIndexes, func(i, j int) bool {
return instance.Cm.ConcurrencyPriorityValue(i) > instance.Cm.ConcurrencyPriorityValue(j)
return instance.Cm.ConcurrencyPriorityValue(m3uIndexes[i]) > instance.Cm.ConcurrencyPriorityValue(m3uIndexes[j])
})

maxLapsString := os.Getenv("MAX_RETRIES")
Expand All @@ -60,43 +60,45 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store

select {
case <-ctx.Done():
return nil, "", -1, fmt.Errorf("Cancelling load balancer.")
return nil, "", "", "", fmt.Errorf("Cancelling load balancer.")
default:
for _, index := range m3uIndexes {
if slices.Contains(session.TestedIndexes, index) {
utils.SafeLogf("Skipping M3U_%d: marked as previous stream\n", index+1)
continue
}

url, ok := instance.Info.URLs[index]
innerMap, ok := instance.Info.URLs[index]
if !ok {
utils.SafeLogf("Channel not found from M3U_%d: %s\n", index+1, instance.Info.Title)
utils.SafeLogf("Channel not found from M3U_%s: %s\n", index, instance.Info.Title)
continue
}

if instance.Cm.CheckConcurrency(index) {
utils.SafeLogf("Concurrency limit reached for M3U_%d: %s\n", index+1, url)
continue
}
for subIndex, url := range innerMap {
if slices.Contains(session.TestedIndexes, index+"|"+subIndex) {
utils.SafeLogf("Skipping M3U_%s|%s: marked as previous stream\n", index, subIndex)
continue
}

if instance.Cm.CheckConcurrency(index) {
utils.SafeLogf("Concurrency limit reached for M3U_%s: %s\n", index, url)
continue
}

resp, err := utils.CustomHttpRequest(method, url)
if err == nil {
resp, err := utils.CustomHttpRequest(method, url)
if err == nil {
if debug {
utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url)
}
return resp, url, index, subIndex, nil
}
utils.SafeLogf("Error fetching stream: %s\n", err.Error())
if debug {
utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url)
utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error())
}
return resp, url, index, nil
}
utils.SafeLogf("Error fetching stream: %s\n", err.Error())
if debug {
utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error())
session.SetTestedIndexes(append(session.TestedIndexes, index+"|"+subIndex))
}
session.SetTestedIndexes(append(session.TestedIndexes, index))
}

if debug {
utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap)
}
session.SetTestedIndexes([]int{})
session.SetTestedIndexes([]string{})

}

Expand All @@ -112,5 +114,5 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store
lap++
}

return nil, "", -1, fmt.Errorf("Error fetching stream. Exhausted all streams.")
return nil, "", "", "", fmt.Errorf("Error fetching stream. Exhausted all streams.")
}
2 changes: 1 addition & 1 deletion proxy/proxy_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"
)

func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex int, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex string, subIndex string, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
debug := os.Getenv("DEBUG") == "true"

bufferMbInt, err := strconv.Atoi(os.Getenv("BUFFER_MB"))
Expand Down
27 changes: 15 additions & 12 deletions store/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,38 @@ import (

type ConcurrencyManager struct {
mu sync.Mutex
count map[int]int
count map[string]int
}

func NewConcurrencyManager() *ConcurrencyManager {
return &ConcurrencyManager{count: make(map[int]int)}
return &ConcurrencyManager{count: make(map[string]int)}
}

func (cm *ConcurrencyManager) Increment(m3uIndex int) {
func (cm *ConcurrencyManager) Increment(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.count[m3uIndex]++
}

func (cm *ConcurrencyManager) Decrement(m3uIndex int) {
func (cm *ConcurrencyManager) Decrement(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

if cm.count[m3uIndex] > 0 {
cm.count[m3uIndex]--
}
}

func (cm *ConcurrencyManager) GetCount(m3uIndex int) int {
func (cm *ConcurrencyManager) GetCount(m3uIndex string) int {
cm.mu.Lock()
defer cm.mu.Unlock()

return cm.count[m3uIndex]
}

func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex int) int {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1)))
func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex)))
if err != nil {
maxConcurrency = 1
}
Expand All @@ -49,25 +51,26 @@ func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex int) int {
return maxConcurrency - count
}

func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex int) bool {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1)))
func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex)))
if err != nil {
maxConcurrency = 1
}

count := cm.GetCount(m3uIndex)

utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count)
utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
return count >= maxConcurrency
}

func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex int, incr bool) {
func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, incr bool) {
if incr {
cm.Increment(m3uIndex)
} else {
cm.Decrement(m3uIndex)
}

count := cm.GetCount(m3uIndex)
utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count)

utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
}
4 changes: 2 additions & 2 deletions store/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"m3u-stream-merger/utils"
)

func DownloadM3USource(m3uIndex int) (err error) {
func DownloadM3USource(m3uIndex string) (err error) {
debug := os.Getenv("DEBUG") == "true"
m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%d", m3uIndex+1))
m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%s", m3uIndex))

if debug {
utils.SafeLogf("[DEBUG] Processing M3U from: %s\n", m3uURL)
Expand Down
Loading

0 comments on commit 8a9a12a

Please sign in to comment.