Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor job controller and websocket logic. #150

Merged
merged 11 commits into from
Dec 2, 2021

Conversation

nishkrishnan
Copy link
Contributor

A number of things are going on here:

  • Removed the feature aware related items for log streaming work
  • Created websocket package which handles writing to individual websockets
  • Created Multiplexor which primarily should be all the controller interacts with and moved all primary logic there
  • Created PartitionKeyGenerator for pulling the projectinfo which makes it generic.
  • Modified Receive/removeChan -> Register/Deregister to remove the reading logic from the outputhandler since that struct has way too many responsibilities right now.

msarvar
msarvar previously approved these changes Nov 12, 2021
Copy link

@msarvar msarvar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice!

upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
return &Multiplexor{
writer: &Writer{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the writer.NewWriter() method instead of instantiating the upgrader here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that method is unused. I'll remove it.

upgrader: upgrader,
log: log,
},
keyGenerator: keyGenerator,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to assign the registry to the multiplexor? Don't we have a linter that checks for these kind of errors where the passed argument is not used in the method body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, classic. i don't think we have linters for that. Will fix.

}
p.receiverBuffers[pull][ch] = true
p.receiverBuffersLock.Unlock()

p.projectOutputBuffersLock.RLock()
buffer := p.projectOutputBuffers[pull]
p.projectOutputBuffersLock.RUnlock()

for _, line := range buffer {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious how the buffer would behave if there's new messages coming in to the projectOutputBuffers while we send messages into the ch channel. Since we only add the channel ch to the receiverBuffers[pull] later in the method, the writelogLine method would not forward the new messages into the channel neither will we have it copied over from the output buffer unless the buffer is being passed by reference. If it is being passed by reference should we just leave it inside the Rlock to ensure two goroutines don't read and write in the buffer at the same time?

for ch := range p.receiverBuffers[pull] {
select {
case ch <- line:
default:
// Client ws conn could be closed in two ways:
// 1. Client closes the conn gracefully -> the closeHandler() is executed which
// closes the channel and cleans up resources.
// 2. Client does not close the conn and the closeHandler() is not executed -> the
// receiverChan will be blocking for N number of messages (equal to buffer size)
// before we delete the channel and clean up the resources.
delete(p.receiverBuffers[pull], ch)
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a reference, the underlying array gets copied. Realistically we should be using sync.Map here, not sure why we aren't. I plan on moving to that in a future PR.

The problem you're mentioning would happen regardless of whether it's in the Rlock since there is a chance we don't get the receiver buffer lock in this method and then lines are stored in the buffer but not forwarded to the channel. Something we'll have to fix with a future refactor to this. These two data objects should be accessed atomically imo.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! Thanks for the clarification :)

Aayyush
Aayyush previously approved these changes Nov 16, 2021
Copy link

@Aayyush Aayyush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments but looks great overall 👍🏽 ! This will definitely help when we start working on log persistence :)

@nishkrishnan nishkrishnan dismissed stale reviews from Aayyush and msarvar via 1df15b6 November 16, 2021 17:04
Aayyush
Aayyush previously approved these changes Nov 17, 2021
@nishkrishnan nishkrishnan merged commit 8a79e1b into release-v0.17.3-lyft.1 Dec 2, 2021
@nishkrishnan nishkrishnan deleted the nish/refactor-jobs-controller branch December 2, 2021 17:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants