Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis data structure and major code refactoring #144

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ FROM alpine:latest

# add bash and timezone data
# hadolint ignore=DL3018
RUN apk --no-cache add tzdata
RUN apk --no-cache add tzdata \
ca-certificates \
&& update-ca-certificates

# set the current workdir
WORKDIR /app
Expand Down
19 changes: 6 additions & 13 deletions database/database_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package database

import (
"os"
"testing"
)

func TestSaveAndLoadFromDb(t *testing.T) {
// Test InitializeDb and check if the database file exists
REDIS_ADDR := "127.0.0.1:6379"
REDIS_PASS := ""
REDIS_DB := 1
os.Setenv("REDIS_DB", "1")

db, err := InitializeDb(REDIS_ADDR, REDIS_PASS, REDIS_DB)
db, err := InitializeDb()
if err != nil {
t.Errorf("InitializeDb returned error: %v", err)
}
Expand All @@ -21,7 +20,7 @@ func TestSaveAndLoadFromDb(t *testing.T) {
}

// Test LoadFromDb with existing data in the database
expected := []StreamInfo{{
expected := []*StreamInfo{{
Slug: "stream1",
Title: "stream1",
TvgID: "test1",
Expand Down Expand Up @@ -54,7 +53,7 @@ func TestSaveAndLoadFromDb(t *testing.T) {
}

for i, expectedStream := range expected {
if !streamInfoEqual(result[i], expectedStream) {
if !streamInfoEqual(result[i], *expectedStream) {
t.Errorf("GetStreams returned %+v, expected %+v", result[i], expectedStream)
}
}
Expand All @@ -64,11 +63,6 @@ func TestSaveAndLoadFromDb(t *testing.T) {
t.Errorf("DeleteStreamBySlug returned error: %v", err)
}

err = db.DeleteStreamURL(expected[0], 0)
if err != nil {
t.Errorf("DeleteStreamURL returned error: %v", err)
}

streamChan = db.GetStreams()

result = []StreamInfo{}
Expand All @@ -77,14 +71,13 @@ func TestSaveAndLoadFromDb(t *testing.T) {
}

expected = expected[:1]
expected[0].URLs = map[int]string{}

if len(result) != len(expected) {
t.Errorf("GetStreams returned %+v, expected %+v", result, expected)
}

for i, expectedStream := range expected {
if !streamInfoEqual(result[i], expectedStream) {
if !streamInfoEqual(result[i], *expectedStream) {
t.Errorf("GetStreams returned %+v, expected %+v", result[i], expectedStream)
}
}
Expand Down
200 changes: 44 additions & 156 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package database

import (
"context"
"encoding/json"
"fmt"
"log"
"m3u-stream-merger/utils"
Expand All @@ -19,7 +20,14 @@ type Instance struct {
Ctx context.Context
}

func InitializeDb(addr string, password string, db int) (*Instance, error) {
func InitializeDb() (*Instance, error) {
addr := os.Getenv("REDIS_ADDR")
password := os.Getenv("REDIS_PASS")
db := 0
if i, err := strconv.Atoi(os.Getenv("REDIS_DB")); err == nil {
db = i
}

var redisOptions *redis.Options

if password == "" {
Expand Down Expand Up @@ -58,39 +66,27 @@ func (db *Instance) ClearDb() error {
return nil
}

func (db *Instance) SaveToDb(streams []StreamInfo) error {
func (db *Instance) SaveToDb(streams []*StreamInfo) error {
var debug = os.Getenv("DEBUG") == "true"

pipeline := db.Redis.Pipeline()

for _, s := range streams {
streamKey := fmt.Sprintf("stream:%s", s.Slug)
streamData := map[string]interface{}{
"title": s.Title,
"tvg_id": s.TvgID,
"tvg_chno": s.TvgChNo,
"logo_url": s.LogoURL,
"group_name": s.Group,

streamDataJson, err := json.Marshal(s)
if err != nil {
return fmt.Errorf("SaveToDb error: %v", err)
}

if debug {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, s)
}

pipeline.HSet(db.Ctx, streamKey, streamData)

for index, u := range s.URLs {
streamURLKey := fmt.Sprintf("stream:%s:url:%d", s.Slug, index)

if debug {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u)
}

pipeline.Set(db.Ctx, streamURLKey, u, 0)
}
pipeline.Set(db.Ctx, streamKey, string(streamDataJson), 0)

// Add to the sorted set
sortScore := calculateSortScore(s)
sortScore := calculateSortScore(*s)

if debug {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey)
Expand Down Expand Up @@ -122,26 +118,6 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
func (db *Instance) DeleteStreamBySlug(slug string) error {
streamKey := fmt.Sprintf("stream:%s", slug)

// Delete associated URLs
cursor := uint64(0)
for {
keys, newCursor, err := db.Redis.Scan(db.Ctx, cursor, fmt.Sprintf("%s:url:*", streamKey), 10).Result()
if err != nil {
return fmt.Errorf("error scanning associated URLs: %v", err)
}

for _, key := range keys {
if err := db.Redis.Del(db.Ctx, key).Err(); err != nil {
return fmt.Errorf("error deleting stream URL from Redis: %v", err)
}
}

cursor = newCursor
if cursor == 0 {
break
}
}

// Delete from the sorted set
if err := db.Redis.ZRem(db.Ctx, "streams_sorted", streamKey).Err(); err != nil {
return fmt.Errorf("error removing stream from sorted set: %v", err)
Expand All @@ -155,71 +131,25 @@ func (db *Instance) DeleteStreamBySlug(slug string) error {
return nil
}

func (db *Instance) DeleteStreamURL(s StreamInfo, m3uIndex int) error {
if err := db.Redis.Del(db.Ctx, fmt.Sprintf("stream:%s:url:%d", s.Slug, m3uIndex)).Err(); err != nil {
return fmt.Errorf("error deleting stream URL from Redis: %v", err)
}

return nil
}

func (db *Instance) GetStreamBySlug(slug string) (StreamInfo, error) {
streamKey := fmt.Sprintf("stream:%s", slug)
streamData, err := db.Redis.HGetAll(db.Ctx, streamKey).Result()
streamDataJson, err := db.Redis.Get(db.Ctx, streamKey).Result()
if err != nil {
return StreamInfo{}, fmt.Errorf("error getting stream from Redis: %v", err)
}

if len(streamData) == 0 {
return StreamInfo{}, fmt.Errorf("stream not found: %s", slug)
}
stream := StreamInfo{}

s := StreamInfo{
Slug: slug,
Title: streamData["title"],
TvgID: streamData["tvg_id"],
TvgChNo: streamData["tvg_chno"],
LogoURL: streamData["logo_url"],
Group: streamData["group_name"],
URLs: map[int]string{},
err = json.Unmarshal([]byte(streamDataJson), &stream)
if err != nil {
return StreamInfo{}, fmt.Errorf("error getting stream: %v", err)
}

cursor := uint64(0)
for {
keys, newCursor, err := db.Redis.Scan(db.Ctx, cursor, fmt.Sprintf("%s:url:*", streamKey), 10).Result()
if err != nil {
return s, fmt.Errorf("error finding URLs for stream: %v", err)
}

if len(keys) > 0 {
results, err := db.Redis.Pipelined(db.Ctx, func(pipe redis.Pipeliner) error {
for _, key := range keys {
pipe.Get(db.Ctx, key)
}
return nil
})
if err != nil {
return s, fmt.Errorf("error getting URL data from Redis: %v", err)
}

for i, result := range results {
urlData := result.(*redis.StringCmd).Val()

m3uIndex, err := strconv.Atoi(extractM3UIndex(keys[i]))
if err != nil {
return s, fmt.Errorf("m3u index is not an integer: %v", err)
}
s.URLs[m3uIndex] = urlData
}
}

cursor = newCursor
if cursor == 0 {
break
}
if strings.TrimSpace(stream.Title) == "" {
return StreamInfo{}, fmt.Errorf("stream not found: %s", slug)
}

return s, nil
return stream, nil
}

func (db *Instance) GetStreams() <-chan StreamInfo {
Expand All @@ -239,52 +169,26 @@ func (db *Instance) GetStreams() <-chan StreamInfo {

// Filter out URL keys
for _, key := range keys {
if !strings.Contains(key, ":url:") {
streamData, err := db.Redis.HGetAll(db.Ctx, key).Result()
if err != nil {
utils.SafeLogPrintf(nil, nil, "error retrieving stream data: %v", err)
return
}

slug := extractSlug(key)
stream := StreamInfo{
Slug: slug,
Title: streamData["title"],
TvgID: streamData["tvg_id"],
TvgChNo: streamData["tvg_chno"],
LogoURL: streamData["logo_url"],
Group: streamData["group_name"],
URLs: map[int]string{},
}

if debug {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Processing stream: %v\n", stream)
}

urlKeys, err := db.Redis.Keys(db.Ctx, fmt.Sprintf("%s:url:*", key)).Result()
if err != nil {
utils.SafeLogPrintf(nil, nil, "error finding URLs for stream: %v", err)
return
}

for _, urlKey := range urlKeys {
urlData, err := db.Redis.Get(db.Ctx, urlKey).Result()
if err != nil {
utils.SafeLogPrintf(nil, nil, "error getting URL data from Redis: %v", err)
return
}

m3uIndex, err := strconv.Atoi(extractM3UIndex(urlKey))
if err != nil {
utils.SafeLogPrintf(nil, nil, "m3u index is not an integer: %v", err)
return
}
stream.URLs[m3uIndex] = urlData
}

// Send the stream to the channel
streamChan <- stream
streamDataJson, err := db.Redis.Get(db.Ctx, key).Result()
if err != nil {
utils.SafeLogPrintf(nil, nil, "error retrieving stream data: %v", err)
return
}

stream := StreamInfo{}

err = json.Unmarshal([]byte(streamDataJson), &stream)
if err != nil {
utils.SafeLogPrintf(nil, nil, "error retrieving stream data: %v", err)
return
}

if debug {
utils.SafeLogPrintf(nil, nil, "[DEBUG] Processing stream: %v\n", stream)
}

// Send the stream to the channel
streamChan <- stream
}

if debug {
Expand Down Expand Up @@ -353,22 +257,6 @@ func (db *Instance) ClearConcurrencies() error {
return nil
}

func extractSlug(key string) string {
parts := strings.Split(key, ":")
if len(parts) > 1 {
return parts[1]
}
return ""
}

func extractM3UIndex(key string) string {
parts := strings.Split(key, ":")
if len(parts) > 1 {
return parts[3]
}
return ""
}

func getSortingValue(s StreamInfo) string {
key := os.Getenv("SORTING_KEY")

Expand Down
14 changes: 7 additions & 7 deletions database/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package database

type StreamInfo struct {
Slug string
Title string
TvgID string
TvgChNo string
LogoURL string
Group string
URLs map[int]string
Slug string `json:"slug"`
Title string `json:"title"`
TvgID string `json:"tvg_id"`
TvgChNo string `json:"tvg_chno"`
LogoURL string `json:"logo_url"`
Group string `json:"group_name"`
URLs map[int]string `json:"urls"`
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module m3u-stream-merger

go 1.22.1
go 1.23.0

require (
github.com/gosimple/slug v1.14.0
Expand Down
Loading
Loading