Skip to content

Commit

Permalink
major refactoring client and background stream
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Oct 26, 2024
1 parent 5574a58 commit cc03232
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 252 deletions.
142 changes: 0 additions & 142 deletions proxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package proxy
import (
"context"
"fmt"
"io"
"m3u-stream-merger/database"
"m3u-stream-merger/utils"
"net/http"
"os"
"strconv"
"time"

"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
)

Expand Down Expand Up @@ -99,141 +95,3 @@ func (b *Buffer) Subscribe(ctx context.Context) (*chan []byte, error) {

return &ch, nil
}

func BufferStream(instance *StreamInstance, m3uIndex int, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
debug := os.Getenv("DEBUG") == "true"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

locker := redislock.New(instance.Database.Redis)

lock, err := locker.Obtain(ctx, instance.Buffer.streamKey, time.Minute, nil)
if err == redislock.ErrNotObtained {
return
} else if err != nil {
utils.SafeLogf("Obtaining lock error: %v\n", err)
return
}
defer func() {
err := lock.Release(context.Background())
if err != nil && debug {
utils.SafeLogf("Releasing lock error: %v\n", err)
}
}()

instance.Database.UpdateConcurrency(m3uIndex, true)
defer instance.Database.UpdateConcurrency(m3uIndex, false)

timeoutSecond, err := strconv.Atoi(os.Getenv("STREAM_TIMEOUT"))
if err != nil || timeoutSecond < 0 {
timeoutSecond = 3
}

timeoutDuration := time.Duration(timeoutSecond) * time.Second
if timeoutSecond == 0 {
timeoutDuration = time.Minute
}
timer := time.NewTimer(timeoutDuration)
defer timer.Stop()

// Backoff settings
initialBackoff := 200 * time.Millisecond
maxBackoff := time.Duration(timeoutSecond-1) * time.Second
currentBackoff := initialBackoff

returnStatus := 0

sourceChunk := make([]byte, 1024)

for {
select {
case <-ctx.Done(): // handle context cancellation
utils.SafeLogf("Context canceled for stream: %s\n", r.RemoteAddr)
statusChan <- 0
return
case <-timer.C:
utils.SafeLogf("Timeout reached while trying to stream: %s\n", r.RemoteAddr)
statusChan <- returnStatus
return
default:
err := lock.Refresh(ctx, time.Minute, nil)
if err != nil {
utils.SafeLogf("Failed to refresh lock: %s\n", err)
}

clients, err := instance.Database.GetBufferUser(instance.Buffer.streamKey)
if err != nil {
utils.SafeLogf("Failed to get number of clients: %s\n", err)
}

if clients <= 0 {
cancel()
continue
}

n, err := resp.Body.Read(sourceChunk)
if err != nil {
if err == io.EOF {
utils.SafeLogf("Stream ended (EOF reached): %s\n", r.RemoteAddr)
if timeoutSecond == 0 {
statusChan <- 2
return
}

returnStatus = 2
utils.SafeLogf("Retrying same stream until timeout (%d seconds) is reached...\n", timeoutSecond)
if debug {
utils.SafeLogf("[DEBUG] Retrying same stream with backoff of %v...\n", currentBackoff)
}

time.Sleep(currentBackoff)
currentBackoff *= 2
if currentBackoff > maxBackoff {
currentBackoff = maxBackoff
}

continue
}

utils.SafeLogf("Error reading stream: %s\n", err.Error())

returnStatus = 1

if timeoutSecond == 0 {
statusChan <- 1
return
}

if debug {
utils.SafeLogf("[DEBUG] Retrying same stream with backoff of %v...\n", currentBackoff)
}

time.Sleep(currentBackoff)
currentBackoff *= 2
if currentBackoff > maxBackoff {
currentBackoff = maxBackoff
}

continue
}

err = instance.Buffer.Write(ctx, sourceChunk[:n])
if err != nil {
utils.SafeLogf("Failed to store buffer: %s\n", err.Error())
}

// Reset the timer on each successful write and backoff
if !timer.Stop() {
select {
case <-timer.C: // drain the channel to avoid blocking
default:
}
}
timer.Reset(timeoutDuration)

// Reset the backoff duration after successful read/write
currentBackoff = initialBackoff
}
}
}
Loading

0 comments on commit cc03232

Please sign in to comment.