diff --git a/database/db.go b/database/db.go index 542b86f4..ed96c5a4 100644 --- a/database/db.go +++ b/database/db.go @@ -3,6 +3,7 @@ package database import ( "context" "fmt" + "log" "math" "os" "strconv" @@ -61,6 +62,8 @@ func (db *Instance) ClearDb() error { } func (db *Instance) SaveToDb(streams []StreamInfo) error { + var debug = os.Getenv("DEBUG") == "true" + pipeline := db.Redis.Pipeline() for _, s := range streams { @@ -72,15 +75,30 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error { "logo_url": s.LogoURL, "group_name": s.Group, } + + if debug { + log.Printf("[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData) + } + pipeline.HSet(db.Ctx, streamKey, streamData) for index, u := range s.URLs { streamURLKey := fmt.Sprintf("stream:%s:url:%d", s.Slug, index) + + if debug { + log.Printf("[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u) + } + pipeline.Set(db.Ctx, streamURLKey, u, 0) } // Add to the sorted set sortScore := calculateSortScore(s) + + if debug { + log.Printf("[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey) + } + pipeline.ZAdd(db.Ctx, "streams_sorted", redis.Z{ Score: sortScore, Member: streamKey, @@ -88,16 +106,28 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error { } if len(streams) > 0 { + if debug { + log.Println("[DEBUG] Executing pipeline...") + } + _, err := pipeline.Exec(db.Ctx) if err != nil { return fmt.Errorf("SaveToDb error: %v", err) } + + if debug { + log.Println("[DEBUG] Pipeline executed successfully.") + } } db.Cache.Clear("streams_sorted_cache") + + if debug { + log.Println("[DEBUG] Cache cleared.") + } + return nil } - func (db *Instance) DeleteStreamBySlug(slug string) error { streamKey := fmt.Sprintf("stream:%s", slug) @@ -204,12 +234,21 @@ func (db *Instance) GetStreamBySlug(slug string) (StreamInfo, error) { } func (db *Instance) GetStreams() ([]StreamInfo, error) { + var debug = os.Getenv("DEBUG") == "true" + // Check if the data is in the cache cacheKey := "streams_sorted_cache" if data, found := db.Cache.Get(cacheKey); found { + if debug { + log.Printf("[DEBUG] Cache hit for key %s\n", cacheKey) + } return data, nil } + if debug { + log.Println("[DEBUG] Cache miss. Retrieving streams from Redis...") + } + keys, err := db.Redis.ZRange(db.Ctx, "streams_sorted", 0, -1).Result() if err != nil { return nil, fmt.Errorf("error retrieving streams: %v", err) @@ -223,6 +262,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { } } + if debug { + log.Printf("[DEBUG] Filtered stream keys: %v\n", streamKeys) + } + // Split the stream keys into chunks chunkSize := 100 var chunks [][]string @@ -234,6 +277,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { chunks = append(chunks, streamKeys[i:end]) } + if debug { + log.Printf("[DEBUG] Chunks created: %d chunks\n", len(chunks)) + } + // Create channels for work distribution and results collection workChan := make(chan []string, len(chunks)) resultChan := make(chan []StreamInfo, len(chunks)) @@ -241,7 +288,7 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { // Define the number of workers parserWorkers := os.Getenv("PARSER_WORKERS") - if parserWorkers != "" { + if parserWorkers == "" { parserWorkers = "5" } numWorkers, err := strconv.Atoi(parserWorkers) @@ -249,13 +296,20 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { numWorkers = 5 } + if debug { + log.Printf("[DEBUG] Number of workers: %d\n", numWorkers) + } + var wg sync.WaitGroup // Start the worker pool for i := 0; i < numWorkers; i++ { wg.Add(1) - go func() { + go func(workerID int) { defer wg.Done() + if debug { + log.Printf("[DEBUG] Worker %d started\n", workerID) + } for chunk := range workChan { pipe := db.Redis.Pipeline() cmds := make([]*redis.MapStringStringCmd, len(chunk)) @@ -264,6 +318,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { cmds[i] = pipe.HGetAll(db.Ctx, key) } + if debug { + log.Printf("[DEBUG] Executing pipeline for chunk: %v\n", chunk) + } + _, err := pipe.Exec(db.Ctx) if err != nil { errChan <- fmt.Errorf("error executing Redis pipeline: %v", err) @@ -288,6 +346,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { URLs: map[int]string{}, } + if debug { + log.Printf("[DEBUG] Processing stream: %v\n", stream) + } + urlKeys, err := db.Redis.Keys(db.Ctx, fmt.Sprintf("%s:url:*", chunk[i])).Result() if err != nil { errChan <- fmt.Errorf("error finding URLs for stream: %v", err) @@ -314,7 +376,7 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { resultChan <- chunkStreams } - }() + }(i) } // Send work to the workers @@ -345,6 +407,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) { // Store the result in the cache before returning db.Cache.Set(cacheKey, streams) + if debug { + log.Println("[DEBUG] Streams retrieved and cached successfully.") + } + return streams, nil } diff --git a/m3u/generate.go b/m3u/generate.go index 5eecc9cf..228ec585 100644 --- a/m3u/generate.go +++ b/m3u/generate.go @@ -8,6 +8,7 @@ import ( "m3u-stream-merger/utils" "net/http" "net/url" + "os" "strings" ) @@ -32,12 +33,22 @@ func GenerateStreamURL(baseUrl string, slug string, sampleUrl string) string { } func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Instance) { + debug := os.Getenv("DEBUG") == "true" + + if debug { + log.Println("[DEBUG] Generating M3U content") + } + streams, err := db.GetStreams() if err != nil { log.Println(fmt.Errorf("GetStreams error: %v", err)) } - w.Header().Set("Content-Type", "text/plain") // Set the Content-Type header to M3U + if debug { + log.Printf("[DEBUG] Retrieved %d streams\n", len(streams)) + } + + w.Header().Set("Content-Type", "text/plain") w.Header().Set("Access-Control-Allow-Origin", "*") baseUrl := "" @@ -47,6 +58,10 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins baseUrl = fmt.Sprintf("https://%s/stream", r.Host) } + if debug { + log.Printf("[DEBUG] Base URL set to %s\n", baseUrl) + } + _, err = fmt.Fprintf(w, "#EXTM3U\n") if err != nil { log.Println(fmt.Errorf("Fprintf error: %v", err)) @@ -57,17 +72,29 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins continue } - // Write #EXTINF line + if debug { + log.Printf("[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID) + } + _, err := fmt.Fprintf(w, "#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n", stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title) if err != nil { + if debug { + log.Printf("[DEBUG] Error writing #EXTINF line for stream %s: %v\n", stream.TvgID, err) + } continue } - // Write stream URL _, err = fmt.Fprintf(w, "%s", GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0])) if err != nil { + if debug { + log.Printf("[DEBUG] Error writing stream URL for stream %s: %v\n", stream.TvgID, err) + } continue } } + + if debug { + log.Println("[DEBUG] Finished generating M3U content") + } } diff --git a/m3u/parser.go b/m3u/parser.go index 9097f9a9..7ecb666e 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -20,13 +20,19 @@ import ( ) func parseLine(line string, nextLine string, m3uIndex int) database.StreamInfo { + debug := os.Getenv("DEBUG") == "true" + if debug { + log.Printf("[DEBUG] Parsing line: %s\n", line) + log.Printf("[DEBUG] Next line: %s\n", nextLine) + log.Printf("[DEBUG] M3U index: %d\n", m3uIndex) + } + var currentStream database.StreamInfo currentStream.URLs = map[int]string{m3uIndex: strings.TrimSpace(nextLine)} lineWithoutPairs := line // Define a regular expression to capture key-value pairs - // regex := regexp.MustCompile(`([a-zA-Z0-9_-]+)=("[^"]+"|[^",]+)`) regex := regexp.MustCompile(`([a-zA-Z0-9_-]+)="([^"]+)"`) // Find all key-value pairs in the line @@ -36,6 +42,10 @@ func parseLine(line string, nextLine string, m3uIndex int) database.StreamInfo { key := strings.TrimSpace(match[1]) value := strings.TrimSpace(match[2]) + if debug { + log.Printf("[DEBUG] Processing attribute: %s=%s\n", key, value) + } + switch strings.ToLower(key) { case "tvg-id": currentStream.TvgID = tvgIdParser(value) @@ -48,8 +58,8 @@ func parseLine(line string, nextLine string, m3uIndex int) database.StreamInfo { case "tvg-logo": currentStream.LogoURL = tvgLogoParser(value) default: - if os.Getenv("DEBUG") == "true" { - log.Printf("Uncaught attribute: %s=%s\n", key, value) + if debug { + log.Printf("[DEBUG] Uncaught attribute: %s=%s\n", key, value) } } @@ -59,21 +69,36 @@ func parseLine(line string, nextLine string, m3uIndex int) database.StreamInfo { lineCommaSplit := strings.SplitN(lineWithoutPairs, ",", 2) if len(lineCommaSplit) > 1 { + if debug { + log.Printf("[DEBUG] Line comma split detected, title: %s\n", strings.TrimSpace(lineCommaSplit[1])) + } currentStream.Title = tvgNameParser(strings.TrimSpace(lineCommaSplit[1])) } currentStream.Slug = slug.Make(currentStream.Title) + if debug { + log.Printf("[DEBUG] Generated slug: %s\n", currentStream.Slug) + } + return currentStream } func checkIncludeGroup(groups []string, line string) bool { + debug := os.Getenv("DEBUG") == "true" + if debug { + log.Printf("[DEBUG] Checking if line includes group: %s\n", line) + } + if len(groups) == 0 { return true } else { for _, group := range groups { toMatch := "group-title=" + "\"" + group + "\"" if strings.Contains(line, toMatch) { + if debug { + log.Printf("[DEBUG] Line matches group: %s\n", group) + } return true } } @@ -82,6 +107,11 @@ func checkIncludeGroup(groups []string, line string) bool { } func downloadM3UToBuffer(m3uURL string, buffer *bytes.Buffer) (err error) { + debug := os.Getenv("DEBUG") == "true" + if debug { + log.Printf("[DEBUG] Downloading M3U from: %s\n", m3uURL) + } + var file io.Reader if strings.HasPrefix(m3uURL, "file://") { @@ -111,10 +141,16 @@ func downloadM3UToBuffer(m3uURL string, buffer *bytes.Buffer) (err error) { return fmt.Errorf("Error reading file: %v", err) } + if debug { + log.Println("[DEBUG] Successfully copied M3U content to buffer") + } + return nil } func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { + debug := os.Getenv("DEBUG") == "true" + maxRetries := 10 var err error maxRetriesStr, maxRetriesExists := os.LookupEnv("MAX_RETRIES") @@ -125,15 +161,25 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { } } + if debug { + log.Printf("[DEBUG] Max retries set to %d\n", maxRetries) + } + var buffer bytes.Buffer var grps []string includeGroups := os.Getenv(fmt.Sprintf("INCLUDE_GROUPS_%d", m3uIndex+1)) if includeGroups != "" { grps = strings.Split(includeGroups, ",") + if debug { + log.Printf("[DEBUG] Include groups: %v\n", grps) + } } for i := 0; i <= maxRetries; i++ { + if debug { + log.Printf("[DEBUG] Attempt %d to download M3U\n", i+1) + } err := downloadM3UToBuffer(m3uURL, &buffer) if err != nil { log.Printf("downloadM3UToBuffer error. Retrying in 5 secs... (error: %v)\n", err) @@ -145,11 +191,15 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { scanner := bufio.NewScanner(&buffer) var wg sync.WaitGroup - // Create channels for workers parserWorkers := os.Getenv("PARSER_WORKERS") if parserWorkers != "" { parserWorkers = "5" } + + if debug { + log.Printf("[DEBUG] Using %s parser workers\n", parserWorkers) + } + streamInfoCh := make(chan database.StreamInfo) errCh := make(chan error) numWorkers, err := strconv.Atoi(parserWorkers) @@ -157,17 +207,17 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { numWorkers = 5 } - // Shared slice to collect parsed data var streamInfos []database.StreamInfo var mu sync.Mutex - // Worker pool for parsing for w := 0; w < numWorkers; w++ { wg.Add(1) go func() { defer wg.Done() for streamInfo := range streamInfoCh { - // Collect parsed data + if debug { + log.Printf("[DEBUG] Worker processing stream info: %s\n", streamInfo.Slug) + } mu.Lock() streamInfos = append(streamInfos, streamInfo) mu.Unlock() @@ -175,28 +225,33 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { }() } - // Error handler go func() { for err := range errCh { - log.Printf("M3U Parser error: %v", err) + log.Printf("M3U Parser error: %v\n", err) } }() - // Parse lines and send to worker pool for scanner.Scan() { line := scanner.Text() + if debug { + log.Printf("[DEBUG] Scanning line: %s\n", line) + } + if strings.HasPrefix(line, "#EXTINF:") && checkIncludeGroup(grps, line) { if scanner.Scan() { nextLine := scanner.Text() + if debug { + log.Printf("[DEBUG] Found next line for EXTINF: %s\n", nextLine) + } + streamInfo := parseLine(line, nextLine, m3uIndex) streamInfoCh <- streamInfo } } } - // Close channels after processing close(streamInfoCh) wg.Wait() close(errCh) @@ -206,16 +261,23 @@ func ParseM3UFromURL(db *database.Instance, m3uURL string, m3uIndex int) error { } if len(streamInfos) > 0 { + if debug { + log.Printf("[DEBUG] Saving %d stream infos to database\n", len(streamInfos)) + } if err := db.SaveToDb(streamInfos); err != nil { return fmt.Errorf("failed to save data to database: %v", err) } } - // Free up memory used by buffer buffer.Reset() + if debug { + log.Println("[DEBUG] Buffer reset and memory freed") + } + return nil } return fmt.Errorf("Max retries reached without success. Failed to fetch %s\n", m3uURL) } + diff --git a/stream_handler.go b/stream_handler.go index 10062aa8..57ad72d8 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -16,6 +16,8 @@ import ( ) func loadBalancer(stream database.StreamInfo, previous []int) (*http.Response, string, int, error) { + debug := os.Getenv("DEBUG") == "true" + m3uIndexes := utils.GetM3UIndexes() sort.Slice(m3uIndexes, func(i, j int) bool { @@ -49,12 +51,21 @@ func loadBalancer(stream database.StreamInfo, previous []int) (*http.Response, s resp, err := utils.CustomHttpRequest("GET", url) if err == nil { + if debug { + log.Printf("[DEBUG] Successfully fetched stream from %s\n", url) + } return resp, url, index, nil } log.Printf("Error fetching stream: %s\n", err.Error()) + if debug { + log.Printf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error()) + } } if allSkipped { + if debug { + log.Printf("[DEBUG] All streams skipped in lap %d\n", lap) + } break } @@ -104,6 +115,8 @@ func proxyStream(m3uIndex int, resp *http.Response, r *http.Request, w http.Resp } func streamHandler(w http.ResponseWriter, r *http.Request, db *database.Instance) { + debug := os.Getenv("DEBUG") == "true" + ctx, cancel := context.WithCancel(r.Context()) defer cancel() @@ -164,6 +177,9 @@ func streamHandler(w http.ResponseWriter, r *http.Request, db *database.Instance } } } + if debug { + log.Printf("[DEBUG] Headers set for response: %v\n", w.Header()) + } } exitStatus := make(chan int) @@ -188,3 +204,4 @@ func streamHandler(w http.ResponseWriter, r *http.Request, db *database.Instance } } } +