Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subindex for inter-m3u duplicates #203

Merged
merged 13 commits into from
Dec 22, 2024
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
)

require golang.org/x/sys v0.1.0 // indirect
require (
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
9 changes: 5 additions & 4 deletions handlers/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
return
}

var selectedIndex int
var selectedIndex string
var selectedSubIndex string
var selectedUrl string

session := store.GetOrCreateSession(r)
Expand All @@ -46,7 +47,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
}()

for {
resp, selectedUrl, selectedIndex, err = stream.LoadBalancer(ctx, &session, r.Method)
resp, selectedUrl, selectedIndex, selectedSubIndex, err = stream.LoadBalancer(ctx, &session, r.Method)
if err != nil {
utils.SafeLogf("Error reloading stream for %s: %v\n", streamUrl, err)
return
Expand Down Expand Up @@ -77,7 +78,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
proxyCtx, proxyCtxCancel := context.WithCancel(ctx)
defer proxyCtxCancel()

go stream.ProxyStream(proxyCtx, selectedIndex, resp, r, w, exitStatus)
go stream.ProxyStream(proxyCtx, selectedIndex, selectedSubIndex, resp, r, w, exitStatus)

select {
case <-ctx.Done():
Expand All @@ -91,7 +92,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency
return
} else if streamExitCode == 1 || streamExitCode == 2 {
// Retry on server-side connection errors
session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex))
session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex+"|"+selectedSubIndex))
utils.SafeLogf("Retrying other servers...\n")
proxyCtxCancel()
} else if streamExitCode == 4 {
Expand Down
52 changes: 27 additions & 25 deletions proxy/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func NewStreamInstance(streamUrl string, cm *store.ConcurrencyManager) (*StreamI
}, nil
}

func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, int, error) {
func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, string, string, error) {
debug := os.Getenv("DEBUG") == "true"

m3uIndexes := utils.GetM3UIndexes()

sort.Slice(m3uIndexes, func(i, j int) bool {
return instance.Cm.ConcurrencyPriorityValue(i) > instance.Cm.ConcurrencyPriorityValue(j)
return instance.Cm.ConcurrencyPriorityValue(m3uIndexes[i]) > instance.Cm.ConcurrencyPriorityValue(m3uIndexes[j])
})

maxLapsString := os.Getenv("MAX_RETRIES")
Expand All @@ -60,43 +60,45 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store

select {
case <-ctx.Done():
return nil, "", -1, fmt.Errorf("Cancelling load balancer.")
return nil, "", "", "", fmt.Errorf("Cancelling load balancer.")
default:
for _, index := range m3uIndexes {
if slices.Contains(session.TestedIndexes, index) {
utils.SafeLogf("Skipping M3U_%d: marked as previous stream\n", index+1)
continue
}

url, ok := instance.Info.URLs[index]
innerMap, ok := instance.Info.URLs[index]
if !ok {
utils.SafeLogf("Channel not found from M3U_%d: %s\n", index+1, instance.Info.Title)
utils.SafeLogf("Channel not found from M3U_%s: %s\n", index, instance.Info.Title)
continue
}

if instance.Cm.CheckConcurrency(index) {
utils.SafeLogf("Concurrency limit reached for M3U_%d: %s\n", index+1, url)
continue
}
for subIndex, url := range innerMap {
if slices.Contains(session.TestedIndexes, index+"|"+subIndex) {
utils.SafeLogf("Skipping M3U_%s|%s: marked as previous stream\n", index, subIndex)
continue
}

if instance.Cm.CheckConcurrency(index) {
utils.SafeLogf("Concurrency limit reached for M3U_%s: %s\n", index, url)
continue
}

resp, err := utils.CustomHttpRequest(method, url)
if err == nil {
resp, err := utils.CustomHttpRequest(method, url)
if err == nil {
if debug {
utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url)
}
return resp, url, index, subIndex, nil
}
utils.SafeLogf("Error fetching stream: %s\n", err.Error())
if debug {
utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url)
utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error())
}
return resp, url, index, nil
}
utils.SafeLogf("Error fetching stream: %s\n", err.Error())
if debug {
utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error())
session.SetTestedIndexes(append(session.TestedIndexes, index+"|"+subIndex))
}
session.SetTestedIndexes(append(session.TestedIndexes, index))
}

if debug {
utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap)
}
session.SetTestedIndexes([]int{})
session.SetTestedIndexes([]string{})

}

Expand All @@ -112,5 +114,5 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store
lap++
}

return nil, "", -1, fmt.Errorf("Error fetching stream. Exhausted all streams.")
return nil, "", "", "", fmt.Errorf("Error fetching stream. Exhausted all streams.")
}
2 changes: 1 addition & 1 deletion proxy/proxy_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"
)

func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex int, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex string, subIndex string, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
debug := os.Getenv("DEBUG") == "true"

bufferMbInt, err := strconv.Atoi(os.Getenv("BUFFER_MB"))
Expand Down
27 changes: 15 additions & 12 deletions store/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,38 @@ import (

type ConcurrencyManager struct {
mu sync.Mutex
count map[int]int
count map[string]int
}

func NewConcurrencyManager() *ConcurrencyManager {
return &ConcurrencyManager{count: make(map[int]int)}
return &ConcurrencyManager{count: make(map[string]int)}
}

func (cm *ConcurrencyManager) Increment(m3uIndex int) {
func (cm *ConcurrencyManager) Increment(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.count[m3uIndex]++
}

func (cm *ConcurrencyManager) Decrement(m3uIndex int) {
func (cm *ConcurrencyManager) Decrement(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

if cm.count[m3uIndex] > 0 {
cm.count[m3uIndex]--
}
}

func (cm *ConcurrencyManager) GetCount(m3uIndex int) int {
func (cm *ConcurrencyManager) GetCount(m3uIndex string) int {
cm.mu.Lock()
defer cm.mu.Unlock()

return cm.count[m3uIndex]
}

func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex int) int {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1)))
func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex)))
if err != nil {
maxConcurrency = 1
}
Expand All @@ -49,25 +51,26 @@ func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex int) int {
return maxConcurrency - count
}

func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex int) bool {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1)))
func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool {
maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex)))
if err != nil {
maxConcurrency = 1
}

count := cm.GetCount(m3uIndex)

utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count)
utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
return count >= maxConcurrency
}

func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex int, incr bool) {
func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, incr bool) {
if incr {
cm.Increment(m3uIndex)
} else {
cm.Decrement(m3uIndex)
}

count := cm.GetCount(m3uIndex)
utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count)

utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
}
4 changes: 2 additions & 2 deletions store/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"m3u-stream-merger/utils"
)

func DownloadM3USource(m3uIndex int) (err error) {
func DownloadM3USource(m3uIndex string) (err error) {
debug := os.Getenv("DEBUG") == "true"
m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%d", m3uIndex+1))
m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%s", m3uIndex))

if debug {
utils.SafeLogf("[DEBUG] Processing M3U from: %s\n", m3uURL)
Expand Down
Loading
Loading