Skip to content

Commit

Permalink
use redis to lock multi streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Oct 25, 2024
1 parent f222edc commit bd02958
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
32 changes: 30 additions & 2 deletions proxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package proxy

import (
"context"
"errors"
"fmt"
"m3u-stream-merger/database"
"m3u-stream-merger/utils"
"os"
"strconv"
"sync"
"time"

"github.com/redis/go-redis/v9"
Expand All @@ -15,9 +16,9 @@ import (
// Buffer struct using Redis Streams
type Buffer struct {
streamKey string
lockKey string
db *database.Instance
testedIndexes []int
ingest sync.Mutex
latestMsgId string
bufferSize int64
}
Expand All @@ -33,6 +34,7 @@ func NewBuffer(db *database.Instance, id string) (*Buffer, error) {
return &Buffer{
db: db,
streamKey: "streambuffer:" + id,
lockKey: "streambuffer:" + id + ":streaming",
testedIndexes: []int{},
bufferSize: int64(bufferSize),
}, nil
Expand Down Expand Up @@ -95,3 +97,29 @@ func (b *Buffer) Subscribe(ctx context.Context) (*chan []byte, error) {

return &ch, nil
}

func (b *Buffer) TryLock() bool {
_, err := b.db.Redis.Get(context.Background(), b.lockKey).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
_, err := b.db.Redis.Set(context.Background(), b.lockKey, true, 0).Result()
if err != nil {
utils.SafeLogf("Setting lock error: %v\n", err)
return false
}
return true
}

utils.SafeLogf("Getting lock status error: %v\n", err)
return false
}

return false
}

func (b *Buffer) Unlock() {
_, err := b.db.Redis.Del(context.Background(), b.lockKey).Result()
if err != nil {
utils.SafeLogf("Error unlocking: %v\n", err)
}
}
4 changes: 2 additions & 2 deletions proxy/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ func Handler(w http.ResponseWriter, r *http.Request) {
exitStatus <- 0
return
default:
if !stream.Buffer.ingest.TryLock() {
if !stream.Buffer.TryLock() {
if !alreadyLogged {
utils.SafeLogf("Using shared stream buffer with other existing clients for %s\n", r.URL.Path)
alreadyLogged = true
}
continue
}

defer stream.Buffer.ingest.Unlock()
defer stream.Buffer.Unlock()

stream.BufferStream(ctx, selectedIndex, resp, r, w, exitStatus)
return
Expand Down

0 comments on commit bd02958

Please sign in to comment.