Skip to content

Commit

Permalink
Merge pull request #10 from sonroyaalmerol/atomic-renaming
Browse files Browse the repository at this point in the history
Stabilize concurrency and disconnection handling
  • Loading branch information
sonroyaalmerol authored Mar 2, 2024
2 parents e25c384 + de8b27e commit 61beeda
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 66 deletions.
2 changes: 1 addition & 1 deletion database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestSaveAndLoadFromSQLite(t *testing.T) {
}
}

err = DeleteSQLite(db, "test")
err = DeleteSQLite("test")
if err != nil {
t.Errorf("DeleteSQLite returned error: %v", err)
}
Expand Down
43 changes: 42 additions & 1 deletion database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,27 @@ package database
import (
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
"sync"

_ "github.com/mattn/go-sqlite3"
)

var mutex sync.Mutex

func InitializeSQLite(name string) (db *sql.DB, err error) {
mutex.Lock()
defer mutex.Unlock()

if db != nil {
err := db.Close()
if err == nil {
log.Printf("Database session has already been closed: %v\n", err)
}
}

foldername := filepath.Join(".", "data")
filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name))

Expand Down Expand Up @@ -61,7 +75,10 @@ func InitializeSQLite(name string) (db *sql.DB, err error) {
}

// DeleteSQLite deletes the SQLite database file.
func DeleteSQLite(db *sql.DB, name string) error {
func DeleteSQLite(name string) error {
mutex.Lock()
defer mutex.Unlock()

foldername := filepath.Join(".", "data")
filename := filepath.Join(foldername, fmt.Sprintf("%s.db", name))

Expand All @@ -74,6 +91,9 @@ func DeleteSQLite(db *sql.DB, name string) error {
}

func RenameSQLite(prevName string, nextName string) error {
mutex.Lock()
defer mutex.Unlock()

foldername := filepath.Join(".", "data")
prevFileName := filepath.Join(foldername, fmt.Sprintf("%s.db", prevName))
nextFileName := filepath.Join(foldername, fmt.Sprintf("%s.db", nextName))
Expand All @@ -84,6 +104,9 @@ func RenameSQLite(prevName string, nextName string) error {
}

func SaveToSQLite(db *sql.DB, streams []StreamInfo) (err error) {
mutex.Lock()
defer mutex.Unlock()

tx, err := db.Begin()
if err != nil {
return fmt.Errorf("error beginning transaction: %v", err)
Expand Down Expand Up @@ -134,6 +157,9 @@ func SaveToSQLite(db *sql.DB, streams []StreamInfo) (err error) {
}

func InsertStream(db *sql.DB, s StreamInfo) (i int64, err error) {
mutex.Lock()
defer mutex.Unlock()

tx, err := db.Begin()
if err != nil {
return -1, fmt.Errorf("error beginning transaction: %v", err)
Expand Down Expand Up @@ -168,6 +194,9 @@ func InsertStream(db *sql.DB, s StreamInfo) (i int64, err error) {
}

func InsertStreamUrl(db *sql.DB, id int64, url StreamURL) (i int64, err error) {
mutex.Lock()
defer mutex.Unlock()

tx, err := db.Begin()
if err != nil {
return -1, fmt.Errorf("error beginning transaction: %v", err)
Expand Down Expand Up @@ -203,6 +232,9 @@ func InsertStreamUrl(db *sql.DB, id int64, url StreamURL) (i int64, err error) {
}

func DeleteStreamByTitle(db *sql.DB, title string) error {
mutex.Lock()
defer mutex.Unlock()

tx, err := db.Begin()
if err != nil {
return fmt.Errorf("error beginning transaction: %v", err)
Expand Down Expand Up @@ -233,6 +265,9 @@ func DeleteStreamByTitle(db *sql.DB, title string) error {
}

func DeleteStreamURL(db *sql.DB, streamURLID int64) error {
mutex.Lock()
defer mutex.Unlock()

tx, err := db.Begin()
if err != nil {
return fmt.Errorf("error beginning transaction: %v", err)
Expand Down Expand Up @@ -263,6 +298,9 @@ func DeleteStreamURL(db *sql.DB, streamURLID int64) error {
}

func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) {
mutex.Lock()
defer mutex.Unlock()

rows, err := db.Query("SELECT id, title, tvg_id, logo_url, group_name FROM streams WHERE title = ?", title)
if err != nil {
return s, fmt.Errorf("error querying streams: %v", err)
Expand Down Expand Up @@ -308,6 +346,9 @@ func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) {
}

func GetStreams(db *sql.DB) ([]StreamInfo, error) {
mutex.Lock()
defer mutex.Unlock()

rows, err := db.Query("SELECT id, title, tvg_id, logo_url, group_name FROM streams")
if err != nil {
return nil, fmt.Errorf("error querying streams: %v", err)
Expand Down
19 changes: 14 additions & 5 deletions database/redis.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package database

import "github.com/redis/go-redis/v9"
import (
"sync"

"github.com/redis/go-redis/v9"
)

func InitializeRedis() *redis.Client {
var redisClient *redis.Client
var redisOnce sync.Once

// Initialize Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Change this to your Redis server address
Password: "", // No password set
DB: 0, // Use default DB
redisOnce.Do(func() {
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Change this to your Redis server address
Password: "", // No password set
DB: 0, // Use default DB
})
})

return redisClient
Expand Down
6 changes: 3 additions & 3 deletions m3u/m3u_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGenerateM3UContent(t *testing.T) {
rr.Body.String(), expectedContent)
}

err = database.DeleteSQLite(db, "test")
err = database.DeleteSQLite("test")
if err != nil {
t.Errorf("DeleteSQLite returned error: %v", err)
}
Expand All @@ -80,7 +80,7 @@ func TestParseM3UFromURL(t *testing.T) {
http://example.com/bbc1
#EXTINF:-1 tvg-id="bbc2" tvg-name="BBC Two" group-title="UK",BBC Two
http://example.com/bbc2
#EXTINF:-1 tvg-id="cnn" tvg-name="CNN International" group-title="News",CNN
#EXTINF:-1 tvg-id="cnn" tvg-name="CNN International" group-title="News",CNN International
http://example.com/cnn
#EXTVLCOPT:logo=http://example.com/bbc_logo.png
#EXTINF:-1 tvg-id="fox" tvg-name="FOX" group-title="Entertainment",FOX
Expand Down Expand Up @@ -155,7 +155,7 @@ http://example.com/fox
}
}

err = database.DeleteSQLite(db, "test")
err = database.DeleteSQLite("test")
if err != nil {
t.Errorf("DeleteSQLite returned error: %v", err)
}
Expand Down
6 changes: 4 additions & 2 deletions m3u/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int

if strings.HasPrefix(line, "#EXTINF:") {
currentStream = database.StreamInfo{}
currentStream.Title = strings.TrimSpace(strings.SplitN(line, ",", 2)[1])

// Define a regular expression to capture key-value pairs
regex := regexp.MustCompile(`(\S+?)="([^"]*?)"`)
Expand All @@ -60,8 +61,6 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int
switch key {
case "tvg-id":
currentStream.TvgID = value
case "tvg-name":
currentStream.Title = value
case "group-title":
currentStream.Group = value
case "tvg-logo":
Expand All @@ -82,14 +81,17 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int

var dbId int64
if existingStream.Title != currentStream.Title {
log.Printf("Creating new database entry: %s", currentStream.Title)
dbId, err = database.InsertStream(db, currentStream)
if err != nil {
return fmt.Errorf("InsertStream error (title: %s): %v", currentStream.Title, err)
}
} else {
log.Printf("Using existing database entry: %s", existingStream.Title)
dbId = existingStream.DbId
}

log.Printf("Adding MP4 url entry to %s: %s", currentStream.Title, line)
_, err = database.InsertStreamUrl(db, dbId, database.StreamURL{
Content: line,
M3UIndex: m3uIndex,
Expand Down
38 changes: 33 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,46 @@ import (
var db *sql.DB

func swapDb() error {
err := database.DeleteSQLite(db, "current_streams")
// Generate a unique temporary name
tempName := fmt.Sprintf("temp_%d", time.Now().UnixNano())

// Rename the current database to a temporary name
err := database.RenameSQLite("current_streams", tempName)
if err != nil {
return fmt.Errorf("Error deleting current_streams: %v\n", err)
return fmt.Errorf("Error renaming current_streams to temp: %v\n", err)
}

// Rename the next database to current
err = database.RenameSQLite("next_streams", "current_streams")
if err != nil {
return fmt.Errorf("Error renaming to current_streams: %v\n", err)
// If renaming fails, revert the previous renaming to maintain consistency
revertErr := database.RenameSQLite(tempName, "current_streams")
if revertErr != nil {
return fmt.Errorf("Error renaming back to current_streams: %v\n", revertErr)
}
return fmt.Errorf("Error renaming next_streams to current_streams: %v\n", err)
}

// Initialize the new current database
db, err = database.InitializeSQLite("current_streams")
if err != nil {
return fmt.Errorf("Error reinitializing to current_streams: %v\n", err)
// If initialization fails, revert both renamings
revertErr := database.RenameSQLite(tempName, "current_streams")
if revertErr != nil {
return fmt.Errorf("Error renaming back to current_streams: %v\n", revertErr)
}
revertErr = database.RenameSQLite("current_streams", "next_streams")
if revertErr != nil {
return fmt.Errorf("Error renaming back to next_streams: %v\n", revertErr)
}
return fmt.Errorf("Error initializing current_streams: %v\n", err)
}

// Delete the temporary database
err = database.DeleteSQLite(tempName)
if err != nil {
// Log the error but do not return as this is not a critical error
fmt.Printf("Error deleting temp database: %v\n", err)
}

return nil
Expand Down Expand Up @@ -122,7 +149,8 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := database.InitializeSQLite("current_streams")
var err error
db, err = database.InitializeSQLite("current_streams")
if err != nil {
log.Fatalf("Error initializing current SQLite database: %v", err)
}
Expand Down
Loading

0 comments on commit 61beeda

Please sign in to comment.