Skip to content

Commit

Permalink
Merge pull request #55 from sonroyaalmerol/idle-connections
Browse files Browse the repository at this point in the history
Add timeout and use HTTP context with cancel
  • Loading branch information
sonroyaalmerol authored Mar 20, 2024
2 parents 382a52b + ef5c07a commit 7073cae
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions mp4_handler.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package main

import (
"errors"
"context"
"fmt"
"io"
"log"
"m3u-stream-merger/database"
"m3u-stream-merger/utils"
"net/http"
"os"
"strconv"
"strings"
"syscall"
"time"
)

func loadBalancer(stream database.StreamInfo) (resp *http.Response, selectedUrl *database.StreamURL, err error) {
Expand Down Expand Up @@ -100,7 +99,8 @@ func loadBalancer(stream database.StreamInfo) (resp *http.Response, selectedUrl
}

func mp4Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
ctx := r.Context()
ctx, cancel := context.WithCancel(r.Context())
defer cancel()

// Log the incoming request
log.Printf("Received request from %s for URL: %s\n", r.RemoteAddr, r.URL.Path)
Expand Down Expand Up @@ -149,12 +149,19 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
// Log the successful response
log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr)

// Use a channel for goroutine synchronization
done := make(chan struct{})
// Set initial timer duration
timerDuration := 5 * time.Second
timer := time.NewTimer(timerDuration)

// Function to reset the timer
resetTimer := func() {
timer.Reset(timerDuration)
}

go func() {
defer func() {
log.Printf("Closed connection for %s\n", r.RemoteAddr)
close(done)
cancel()
}()

updateConcurrency(selectedUrl.M3UIndex, true)
Expand All @@ -173,45 +180,41 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
}
}

buffer := make([]byte, 512)
if bufferMbInt > 0 {
log.Printf("Buffer is set to %dmb.\n", bufferMbInt)
buffer := make([]byte, 1024*bufferMbInt)
for {
n, err := resp.Body.Read(buffer)
if err != nil {
if err != io.EOF {
log.Printf("Error reading MP4 stream: %s\n", err.Error())
}
break
}
if n > 0 {
_, err := w.Write(buffer[:n])
if err != nil {
log.Printf("Error writing to response: %s\n", err.Error())
break
}
}
}
} else {
_, err := io.Copy(w, resp.Body)
buffer = make([]byte, 1024*bufferMbInt)
}
for {
n, err := resp.Body.Read(buffer)
if err != nil {
// Log the error
if errors.Is(err, syscall.EPIPE) {
log.Println("Client disconnected after fetching MP4 stream")
} else {
log.Printf("Error copying MP4 stream to response: %s\n", err.Error())
log.Printf("Error reading MP4 stream: %s\n", err.Error())
cancel()
}
if n > 0 {
resetTimer()
_, err := w.Write(buffer[:n])
if err != nil {
log.Printf("Error writing to response: %s\n", err.Error())
cancel()
}
}
}
}()

go func() {
select {
case <-ctx.Done():
return
case <-timer.C:
log.Println("Timeout reached, closing connection.")
cancel()
}
}()

// Wait for the request context to be canceled or the stream to finish
select {
case <-ctx.Done():
log.Println("Client disconnected after fetching MP4 stream")
case <-done:
log.Println("MP4 source has closed the connection")
}
<-ctx.Done()
log.Printf("Client (%s) disconnected.\n", r.RemoteAddr)
updateConcurrency(selectedUrl.M3UIndex, false)
}

Expand Down

0 comments on commit 7073cae

Please sign in to comment.