Skip to content

Commit

Permalink
revert concurrency system
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Dec 22, 2024
1 parent 19584b9 commit 30a03da
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 45 deletions.
4 changes: 2 additions & 2 deletions proxy/proxy_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex string
return
}

instance.Cm.UpdateConcurrency(m3uIndex, subIndex, true)
instance.Cm.UpdateConcurrency(m3uIndex, true)
defer func() {
if debug {
utils.SafeLogf("[DEBUG] Defer executed for stream: %s\n", r.RemoteAddr)
}
instance.Cm.UpdateConcurrency(m3uIndex, subIndex, false)
instance.Cm.UpdateConcurrency(m3uIndex, false)
}()

defer func() {
Expand Down
62 changes: 19 additions & 43 deletions store/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,34 @@ import (

type ConcurrencyManager struct {
mu sync.Mutex
count map[string]map[string]int
count map[string]int
}

func NewConcurrencyManager() *ConcurrencyManager {
return &ConcurrencyManager{count: make(map[string]map[string]int)}
return &ConcurrencyManager{count: make(map[string]int)}
}

func (cm *ConcurrencyManager) Increment(m3uIndex string, subIndex string) {
func (cm *ConcurrencyManager) Increment(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

if _, ok := cm.count[m3uIndex]; !ok {
cm.count[m3uIndex] = make(map[string]int)
}

cm.count[m3uIndex][subIndex]++
cm.count[m3uIndex]++
}

func (cm *ConcurrencyManager) Decrement(m3uIndex string, subIndex string) {
func (cm *ConcurrencyManager) Decrement(m3uIndex string) {
cm.mu.Lock()
defer cm.mu.Unlock()

if _, ok := cm.count[m3uIndex]; !ok {
cm.count[m3uIndex] = make(map[string]int)
}

if cm.count[m3uIndex][subIndex] > 0 {
cm.count[m3uIndex][subIndex]--
if cm.count[m3uIndex] > 0 {
cm.count[m3uIndex]--
}
}

func (cm *ConcurrencyManager) GetCount(m3uIndex string, subIndex string) int {
func (cm *ConcurrencyManager) GetCount(m3uIndex string) int {
cm.mu.Lock()
defer cm.mu.Unlock()

if _, ok := cm.count[m3uIndex]; !ok {
cm.count[m3uIndex] = make(map[string]int)
}

return cm.count[m3uIndex][subIndex]
return cm.count[m3uIndex]
}

func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int {
Expand All @@ -61,13 +49,9 @@ func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int {
maxConcurrency = 1
}

totalCount := 0
for subIndex := range cm.count[m3uIndex] {
count := cm.GetCount(m3uIndex, subIndex)
totalCount += count
}
count := cm.GetCount(m3uIndex)

return maxConcurrency - totalCount
return maxConcurrency - count
}

func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool {
Expand All @@ -79,31 +63,23 @@ func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool {
maxConcurrency = 1
}

totalCount := 0
for subIndex := range cm.count[m3uIndex] {
count := cm.GetCount(m3uIndex, subIndex)
totalCount += count
}
count := cm.GetCount(m3uIndex)

utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount)
return totalCount >= maxConcurrency
utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
return count >= maxConcurrency
}

func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, subIndex string, incr bool) {
func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, incr bool) {
cm.mu.Lock()
defer cm.mu.Unlock()

if incr {
cm.Increment(m3uIndex, subIndex)
cm.Increment(m3uIndex)
} else {
cm.Decrement(m3uIndex, subIndex)
cm.Decrement(m3uIndex)
}

totalCount := 0
for subIndex := range cm.count[m3uIndex] {
count := cm.GetCount(m3uIndex, subIndex)
totalCount += count
}
count := cm.GetCount(m3uIndex)

utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount)
utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count)
}

0 comments on commit 30a03da

Please sign in to comment.