Skip to content

Commit

Permalink
decrement concurrency on disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Mar 2, 2024
1 parent 6d42fce commit eaa12cc
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions mp4_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,24 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
}()

// Iterate through the streams and select one based on concurrency and availability
var selectedUrl *database.StreamURL
for _, url := range stream.URLs {
if checkConcurrency(ctx, url.Content, url.MaxConcurrency) {
continue // Skip this stream if concurrency limit reached
}

resp, err = http.Get(url.Content)
if err == nil {
updateConcurrency(ctx, url.Content)
selectedUrl = &url
updateConcurrency(ctx, url.Content, true)
break
}

// Log the error
log.Printf("Error fetching MP4 stream: %s\n", err.Error())
}

if resp == nil {
if selectedUrl == nil {
// Log the error
log.Println("Error fetching MP4 stream. Exhausted all streams.")
// Check if the connection is still open before writing to the response
Expand All @@ -94,6 +96,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
case <-r.Context().Done():
// Connection closed, handle accordingly
log.Println("Client disconnected after fetching MP4 stream")
updateConcurrency(ctx, selectedUrl.Content, false)
return
default:
// Connection still open, proceed with writing to the response
Expand All @@ -102,6 +105,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
// Log the error
if errors.Is(err, syscall.EPIPE) {
log.Println("Client disconnected after fetching MP4 stream")
updateConcurrency(ctx, selectedUrl.Content, false)
} else {
log.Printf("Error copying MP4 stream to response: %s\n", err.Error())
}
Expand All @@ -124,9 +128,14 @@ func checkConcurrency(ctx context.Context, url string, maxConcurrency int) bool
return count >= maxConcurrency
}

func updateConcurrency(ctx context.Context, url string) {
func updateConcurrency(ctx context.Context, url string, incr bool) {
redisClient := database.InitializeRedis()
err := redisClient.Incr(ctx, url).Err()
var err error
if incr {
err = redisClient.Incr(ctx, url).Err()
} else {
err = redisClient.Decr(ctx, url).Err()
}
if err != nil {
log.Printf("Error updating concurrency: %s\n", err.Error())
}
Expand Down

0 comments on commit eaa12cc

Please sign in to comment.