Skip to content

Commit

Permalink
use buffer_mb env var
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Oct 25, 2024
1 parent 04414c9 commit f222edc
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions proxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"context"
"fmt"
"m3u-stream-merger/database"
"m3u-stream-merger/utils"
"os"
"strconv"
"sync"
"time"

Expand All @@ -18,14 +19,22 @@ type Buffer struct {
testedIndexes []int
ingest sync.Mutex
latestMsgId string
bufferSize int64
}

// NewBuffer creates a new Redis-backed buffer with a unique stream key
func NewBuffer(db *database.Instance, id string) (*Buffer, error) {
bufferSize := 1024
bufferMbInt, err := strconv.Atoi(os.Getenv("BUFFER_MB"))
if err == nil && bufferMbInt > 0 {
bufferSize = bufferMbInt * 1024
}

return &Buffer{
db: db,
streamKey: "streambuffer:" + id,
testedIndexes: []int{},
bufferSize: int64(bufferSize),
}, nil
}

Expand All @@ -34,14 +43,12 @@ func (b *Buffer) Write(ctx context.Context, data []byte) error {
msgId, err := b.db.Redis.XAdd(ctx, &redis.XAddArgs{
Stream: b.streamKey,
Values: map[string]interface{}{"data": data},
MaxLen: 4096,
MaxLen: b.bufferSize,
}).Result()
if err != nil {
return err
}

utils.SafeLogf("written msg: %s\n", msgId)

b.latestMsgId = msgId

_, err = b.db.Redis.Expire(ctx, b.streamKey, time.Minute).Result()
Expand Down

0 comments on commit f222edc

Please sign in to comment.