diff --git a/proxy/buffer.go b/proxy/buffer.go index bd2aa081..1b977bb6 100644 --- a/proxy/buffer.go +++ b/proxy/buffer.go @@ -4,7 +4,8 @@ import ( "context" "fmt" "m3u-stream-merger/database" - "m3u-stream-merger/utils" + "os" + "strconv" "sync" "time" @@ -18,14 +19,22 @@ type Buffer struct { testedIndexes []int ingest sync.Mutex latestMsgId string + bufferSize int64 } // NewBuffer creates a new Redis-backed buffer with a unique stream key func NewBuffer(db *database.Instance, id string) (*Buffer, error) { + bufferSize := 1024 + bufferMbInt, err := strconv.Atoi(os.Getenv("BUFFER_MB")) + if err == nil && bufferMbInt > 0 { + bufferSize = bufferMbInt * 1024 + } + return &Buffer{ db: db, streamKey: "streambuffer:" + id, testedIndexes: []int{}, + bufferSize: int64(bufferSize), }, nil } @@ -34,14 +43,12 @@ func (b *Buffer) Write(ctx context.Context, data []byte) error { msgId, err := b.db.Redis.XAdd(ctx, &redis.XAddArgs{ Stream: b.streamKey, Values: map[string]interface{}{"data": data}, - MaxLen: 4096, + MaxLen: b.bufferSize, }).Result() if err != nil { return err } - utils.SafeLogf("written msg: %s\n", msgId) - b.latestMsgId = msgId _, err = b.db.Redis.Expire(ctx, b.streamKey, time.Minute).Result()