Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async option to Replay() #8892

Closed
wants to merge 1 commit into from
Closed

Conversation

reductionista
Copy link
Contributor

An async option has been added to lp.Replay(), so it can be called either synchronously or asynchronously. This allows us to call it asynchronously from the CLI, but synchronously from other services.

This also makes synchronous Replay calls slightly more robust, as previously once the replay had started the only way to stop it was by cancelling the caller's context. Now it will stop immediately even if logpoller is shut down before the caller.

For async calls, it can also be cancelled at any time by shutting down the log poller service. It can also be cancelled by the caller after Replay() is called but before the main logpoller runloop receives the replay request. But as soon as the request is received, Replay() will return with no error, indicating that the replay request was successfully received, and the replay has begun. After returning to the caller, the only way to cancel it is by shutting down the logpoller.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 1, 2023

I see that you haven't updated any CHANGELOG files. Would it make sense to do so?

@reductionista reductionista force-pushed the logpoller-async-cli branch 4 times, most recently from d043fc8 to cf6ac19 Compare April 3, 2023 21:51
@reductionista reductionista marked this pull request as ready for review April 3, 2023 21:52
Comment on lines +309 to +327
replayCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
// merge log poller context and parent context. If either of them are cancelled, abort replay
// This is so that lp.run() can pass a single context to lp.PollAndSaveLogs(). Not used in this
// function since we want to return differentiate between the two, for returning proper error code
select {
case <-lp.ctx.Done():
cancel()
case <-ctx.Done():
if !async {
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the
// log poller is shutting down). This is important for a web controller, since it will always
// cancel the context as soon as the request is complete, which will likely be before the replay is done.
cancel()
}
case <-replayCtx.Done():
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extremely difficult to read and seems brittle since it tightly couples implementation details of systems that shouldn't know anything about each other. Why can't the caller own the context lifetime like usual?

Actually, why not simply this?

Suggested change
replayCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
// merge log poller context and parent context. If either of them are cancelled, abort replay
// This is so that lp.run() can pass a single context to lp.PollAndSaveLogs(). Not used in this
// function since we want to return differentiate between the two, for returning proper error code
select {
case <-lp.ctx.Done():
cancel()
case <-ctx.Done():
if !async {
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the
// log poller is shutting down). This is important for a web controller, since it will always
// cancel the context as soon as the request is complete, which will likely be before the replay is done.
cancel()
}
case <-replayCtx.Done():
}
}()
if async {
ctx = lp.ctx
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contexts need to be merged for the synchronous case, but you have a good point... I think for the async case it could be something similar to ctx = lp.ctx, although that would have to happen later not here. Thanks for the suggestion, it's at least in the right direction.

This go routine for merging the contexts doesn't necessarily have to be spawned by this function, it could be moved into lp.run(). I considered both possibilities and decided this one made slightly more sense, but I'm open to moving it if there's a good reason.

I've tried several simpler ideas to try to avoid merging the contexts, in earlier PR's, kept thinking there must be a better way. Each of them ended up falling short in one way or another, so I closed the PR's. I eventually decided that @connorwstein was right, that without a major re-arch of how the replays work in logpoller, this is the best option that does everything we need. Let's discuss offline if you have other ideas, as it's probably too complex for github.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contexts need to be merged for the synchronous case

Why must they be merged? In what scenario is one lp.ctx done but not the request ctx?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it's probably too complex for github.

I disagree. This requires a nuanced discussion alongside the code.

Storing a context on the log poller in the first place is a misuse of the context API, but regardless of which type you store there is no need to merge the signals. The stored context only needs to be wired to things that don't already have one of their own - i.e. background routines literally using context.Background(). Incoming requests with a context arg will already be cancelled by the system on shutdown, or will encounter systems that have shut down and return an appropriate error. What difference does it make if we race to cancel the request context from another place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang/go#36503 (comment)

I guess I'm a little confused about why the request context wouldn't already have the server context as a parent to begin with.

golang/go#36503 (comment)

Within Google's codebase, where the context package originated, we follow the rule that a context.Context should only be passed around via the call stack.
...
This rule means that at any point in the call stack, there should be exactly one applicable Context, received as a function parameter. When following this pattern, the merge operation never makes sense.

While merging context cancellation signals is straightforward, merging context values is not. Contexts can contain trace IDs and other information; which value would we pick when merging two contexts?

Comment on lines +320 to +322
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the
// log poller is shutting down). This is important for a web controller, since it will always
// cancel the context as soon as the request is complete, which will likely be before the replay is done.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But context cancellation isn't prevented by avoiding this particular call, because the replayCtx is derived directly from the original ctx argument and will automatically be cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I was wondering why these tests weren't passing. Kept tweaking the tests today thinking it was a problem with them, but you're right this is a bug... in fact, so is the defer cancel() line, for the same reason. Thanks for looking at it, guess I need to refactor some things.

return ErrReplayAbortedOnShutdown
case <-ctx.Done():
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are redundant, given the defer up top.

@@ -31,7 +31,7 @@ import (
//go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go
type LogPoller interface {
services.ServiceCtx
Replay(ctx context.Context, fromBlock int64) error
Replay(ctx context.Context, fromBlock int64, async bool) error
Copy link
Contributor

@jmank88 jmank88 Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider a separate method?

Suggested change
Replay(ctx context.Context, fromBlock int64, async bool) error
Replay(ctx context.Context, fromBlock int64) error
ReplayAsync(fromBlock int64) error

It seems like it would simplify the context situation. Could it just piggy back on the other method like this?

func (lp *logPoller) ReplayAsync(fromBlock int64) error {
	go lp.Replay(lp.ctx, fromBlock)
	return nil
}

@cl-sonarqube-production
Copy link

@reductionista reductionista marked this pull request as draft April 4, 2023 04:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants