Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/rangefeed: per registration buffer #33557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 43 additions & 83 deletions pkg/storage/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,7 @@ const (

// newErrBufferCapacityExceeded creates an error that is returned to subscribers
// if the rangefeed processor is not able to keep up with the flow of incoming
// events and is forced to drop events in order to not block. The error is
// usually associated with a slow consumer.
// TODO(nvanbenschoten): currently a single slow consumer can cause all
// rangefeeds on a Range to be shut down with this error. We should work
// on isolating buffering for individual consumers so that a slow consumer
// only affects itself. One idea for this is to replace Stream.Send with
// Stream.SendAsync and give each stream its own individual buffer. If
// an individual stream is unable to keep up, to should fail on its own.
// events and is forced to drop events in order to not block.
func newErrBufferCapacityExceeded() *roachpb.Error {
return roachpb.NewErrorf("rangefeed: buffer capacity exceeded due to slow consumer")
}
Expand Down Expand Up @@ -117,21 +110,14 @@ type Processor struct {
rts resolvedTimestamp

regC chan registration
catchUpC chan catchUpResult
unregC chan *registration
lenReqC chan struct{}
lenResC chan int
eventC chan event
stopC chan *roachpb.Error
stoppedC chan struct{}
}

// catchUpResult is delivered to the Processor goroutine when a catch up scan
// for a new registration has completed.
type catchUpResult struct {
r *registration
pErr *roachpb.Error
}

// event is a union of different event types that the Processor goroutine needs
// to be informed of. It is used so that all events can be sent over the same
// channel, which is necessary to prevent reordering.
Expand All @@ -140,6 +126,11 @@ type event struct {
ct hlc.Timestamp
initRTS bool
syncC chan struct{}
// This setting is used in conjunction with syncC in tests in order to ensure
// that all registrations have fully finished outputting their buffers. This
// has to be done by the processor in order to avoid race conditions with the
// registry. Should be used only in tests.
testRegCatchupSpan roachpb.Span
}

// NewProcessor creates a new rangefeed Processor. The corresponding goroutine
Expand All @@ -153,7 +144,7 @@ func NewProcessor(cfg Config) *Processor {
rts: makeResolvedTimestamp(),

regC: make(chan registration),
catchUpC: make(chan catchUpResult),
unregC: make(chan *registration),
lenReqC: make(chan struct{}),
lenResC: make(chan int),
eventC: make(chan event, cfg.EventChanCap),
Expand Down Expand Up @@ -200,13 +191,9 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)
defer txnPushTicker.Stop()
}

// checkStreamsTicker periodically checks whether any streams have
// disconnected. If so, the registration is unregistered.
checkStreamsTicker := time.NewTicker(p.CheckStreamsInterval)
defer checkStreamsTicker.Stop()

for {
select {

// Handle new registrations.
case r := <-p.regC:
if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) {
Expand All @@ -216,21 +203,28 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)
// Add the new registration to the registry.
p.reg.Register(&r)

// Launch an async catch-up scan for the new registration.
// Ignore error if quiescing.
if r.catchUpIter != nil {
catchUp := newCatchUpScan(p, &r)
err := stopper.RunAsyncTask(ctx, "rangefeed: catch-up scan", catchUp.Run)
if err != nil {
catchUp.Cancel()
// Immediately publish a checkpoint event to the registry. This will be
// the first event published to this registration after its initial
// catch-up scan completes.
r.publish(p.newCheckpointEvent())

// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
r.runOutputLoop(ctx)
select {
case p.unregC <- &r:
case <-p.stoppedC:
}
} else {
p.handleCatchUpScanRes(ctx, catchUpResult{r: &r})
}
if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
r.disconnect(roachpb.NewError(err))
p.reg.Unregister(&r)
}

// React to registrations finishing their catch up scan.
case res := <-p.catchUpC:
p.handleCatchUpScanRes(ctx, res)
// Respond to unregistration requests; these come from registrations that
// encounter an error during their output loop.
case r := <-p.unregC:
p.reg.Unregister(r)

// Respond to answers about the processor goroutine state.
case <-p.lenReqC:
Expand Down Expand Up @@ -285,10 +279,6 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator)
txnPushTickerC = txnPushTicker.C
txnPushAttemptC = nil

// Check whether any streams have disconnected.
case <-checkStreamsTicker.C:
p.reg.CheckStreams()

// Close registrations and exit when signaled.
case pErr := <-p.stopC:
p.reg.DisconnectWithErr(all, pErr)
Expand Down Expand Up @@ -340,17 +330,14 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
// events that are consumed concurrently with this call. The channel will be
// provided an error when the registration closes.
//
// The provided iterator is used to catch the registration up from its starting
// timestamp with value events for all committed values. It must obey the
// contract of an iterator used for a catchUpScan. The Processor promises to
// clean up the iterator by calling its Close method when it is finished. If the
// iterator is nil then no catch-up scan will be performed.
// The optionally provided "catch-up" iterator is used to read changes from the
// engine which occurred after the provided start timestamp.
//
// NOT safe to call on nil Processor.
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchUpIter engine.SimpleIterator,
catchupIter engine.SimpleIterator,
stream Stream,
errC chan<- *roachpb.Error,
) {
Expand All @@ -359,19 +346,12 @@ func (p *Processor) Register(
// it should see these events during its catch up scan.
p.syncEventC()

r := registration{
span: span.AsRawSpanWithNoLocals(),
startTS: startTS,
catchUpIter: catchUpIter,
stream: stream,
errC: errC,
}
r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap, stream, errC,
)
select {
case p.regC <- r:
case <-p.stoppedC:
if catchUpIter != nil {
catchUpIter.Close() // clean up
}
// errC has a capacity of 1. If it is already full, we don't need to send
// another error.
select {
Expand All @@ -397,16 +377,6 @@ func (p *Processor) Len() int {
}
}

// deliverCatchUpScanRes informs the Processor of the results of a catch-up scan
// for a given registration.
func (p *Processor) deliverCatchUpScanRes(r *registration, pErr *roachpb.Error) {
select {
case p.catchUpC <- catchUpResult{r: r, pErr: pErr}:
case <-p.stoppedC:
// Already stopped. Do nothing.
}
}

// ConsumeLogicalOps informs the rangefeed processor of the set of logical
// operations. It returns false if consuming the operations hit a timeout, as
// specified by the EventChanTimeout configuration. If the method returns false,
Expand Down Expand Up @@ -497,25 +467,6 @@ func (p *Processor) syncEventC() {
}
}

//
// Methods called from Processor goroutine.
//

func (p *Processor) handleCatchUpScanRes(ctx context.Context, res catchUpResult) {
if res.pErr == nil {
res.r.SetCaughtUp()

// Publish checkpoint to processor even if the resolved timestamp is
// not initialized. In that case, the timestamp will be empty but the
// checkpoint event is still useful to indicate that the catch-up scan
// has completed. This allows clients to rely on stronger ordering
// semantics once they observe the first checkpoint event.
p.reg.PublishToReg(res.r, p.newCheckpointEvent())
} else {
p.reg.DisconnectRegWithError(res.r, res.pErr)
}
}

func (p *Processor) consumeEvent(ctx context.Context, e event) {
switch {
case len(e.ops) > 0:
Expand All @@ -525,6 +476,15 @@ func (p *Processor) consumeEvent(ctx context.Context, e event) {
case e.initRTS:
p.initResolvedTS(ctx)
case e.syncC != nil:
if e.testRegCatchupSpan.Valid() {
if err := p.reg.waitForCaughtUp(e.testRegCatchupSpan); err != nil {
log.Errorf(
ctx,
"error waiting for registries to catch up during test, results might be impacted: %s",
err,
)
}
}
close(e.syncC)
default:
panic("missing event variant")
Expand Down
Loading