Skip to content

Commit

Permalink
Add preliminary check before registering new receivers in the log han…
Browse files Browse the repository at this point in the history
…dler (#173)
  • Loading branch information
Aayyush authored Feb 4, 2022
1 parent 5e86742 commit 316147d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
7 changes: 7 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package websocket

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -18,6 +19,7 @@ type PartitionKeyGenerator interface {
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
IsKeyExists(key string) bool
}

// Multiplexor is responsible for handling the data transfer between the storage layer
Expand Down Expand Up @@ -51,6 +53,11 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "generating partition key")
}

// check if the job ID exists before registering receiver
if !m.registry.IsKeyExists(key) {
return fmt.Errorf("invalid key: %s", key)
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)
Expand Down
42 changes: 42 additions & 0 deletions server/handlers/mocks/mock_project_command_output_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 14 additions & 9 deletions server/handlers/project_command_output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ProjectCommandOutputHandler interface {
// Deregister removes a channel from successive updates and closes it.
Deregister(jobID string, receiver chan string)

IsKeyExists(key string) bool

// Listens for msg from channel
Handle()

Expand Down Expand Up @@ -114,6 +116,13 @@ func NewAsyncProjectCommandOutputHandler(
}
}

func (p *AsyncProjectCommandOutputHandler) IsKeyExists(key string) bool {
p.receiverBuffersLock.RLock()
defer p.receiverBuffersLock.RUnlock()
_, ok := p.receiverBuffers[key]
return ok
}

func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) {
p.projectCmdOutput <- &ProjectCmdOutputLine{
JobID: ctx.JobID,
Expand Down Expand Up @@ -219,12 +228,7 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line strin
select {
case ch <- line:
default:
// Client ws conn could be closed in two ways:
// 1. Client closes the conn gracefully -> the closeHandler() is executed which
// closes the channel and cleans up resources.
// 2. Client does not close the conn and the closeHandler() is not executed -> the
// receiverChan will be blocking for N number of messages (equal to buffer size)
// before we delete the channel and clean up the resources.
// Delete buffered channel if it's blocking.
delete(p.receiverBuffers[jobID], ch)
}
}
Expand Down Expand Up @@ -274,9 +278,6 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pullContext PullContext) {
delete(p.projectOutputBuffers, jobID)
p.projectOutputBuffersLock.Unlock()

// Only delete the pull record from receiver buffers.
// WS channel will be closed when the user closes the browser tab
// in closeHanlder().
p.receiverBuffersLock.Lock()
delete(p.receiverBuffers, jobID)
p.receiverBuffersLock.Unlock()
Expand Down Expand Up @@ -305,3 +306,7 @@ func (p *NoopProjectOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommand

func (p *NoopProjectOutputHandler) CleanUp(pullContext PullContext) {
}

func (p *NoopProjectOutputHandler) IsKeyExists(key string) bool {
return false
}

0 comments on commit 316147d

Please sign in to comment.