diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 1d421e146057..751da9878cb5 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "processor.go", "registry.go", "resolved_timestamp.go", + "scheduler.go", "task.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed", @@ -24,6 +25,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/buildutil", "//pkg/util/envutil", "//pkg/util/future", "//pkg/util/hlc", @@ -52,6 +54,7 @@ go_test( "processor_test.go", "registry_test.go", "resolved_timestamp_test.go", + "scheduler_test.go", "task_test.go", ], args = ["-test.timeout=895s"], @@ -85,5 +88,6 @@ go_test( "@com_github_cockroachdb_pebble//vfs", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_x_exp//slices", ], ) diff --git a/pkg/kv/kvserver/rangefeed/scheduler.go b/pkg/kv/kvserver/rangefeed/scheduler.go new file mode 100644 index 000000000000..759ca7c4c7ad --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/scheduler.go @@ -0,0 +1,514 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// Scheduler is used by rangefeed processors to schedule work to a pool of +// workers instead of running individual goroutines per range. Store will run +// a single scheduler for all its rangefeeds. +// +// When processor is started it registers a callback with scheduler. +// After that processor can enqueue work for itself by telling scheduler what +// types of events it plans to process. +// +// Scheduler maintains a queue of processor ids that wish to do processing and +// notifies callbacks in order passing them union of event types that were +// enqueued since last notification. + +// processorEventType is a mask for pending events for the processor. All event +// types that were enqueued between two callback invocations are coalesced into +// a single value. +type processorEventType int + +const ( + // queued is an internal event type that indicate that there's already a + // pending work for processor and it is already scheduled for execution. + // When more events types come in, they should just be added to existing + // pending value. + queued processorEventType = 1 << iota + // stopped is an event that indicates that there would be no more events + // scheduled for the processor. once it is enqueued, all subsequent events + // are rejected. processor should perform any cleanup when receiving this + // event that it needs to perform within callback context. + stopped + // numProcessorEventTypes is total number of event types. + numProcessorEventTypes int = iota +) + +var eventNames = map[processorEventType]string{ + queued: "Queued", + stopped: "Stopped", +} + +func (e processorEventType) String() string { + var evts []string + for i := 0; i < numProcessorEventTypes; i++ { + if eventType := processorEventType(1 << i); eventType&e != 0 { + evts = append(evts, eventNames[eventType]) + } + } + return strings.Join(evts, " | ") +} + +// enqueueBulkMaxChunk is max number of event enqueued in one go while holding +// scheduler lock. +const enqueueBulkMaxChunk = 100 + +// Callback is a callback to perform work set by processor. Event is a +// combination of all event types scheduled since last callback invocation. +// +// Once callback returns, event types considered to be processed. If a processor +// decided not to process everything, it can return remaining types which would +// instruct scheduler to re-enqueue processor. +// +// This mechanism allows processors to throttle processing if it has too much +// pending data to process in one go without blocking other processors. +type Callback func(event processorEventType) (remaining processorEventType) + +// SchedulerConfig contains configurable scheduler parameters. +type SchedulerConfig struct { + // Workers is the number of pool workers for scheduler to use. + Workers int + // BulkChunkSize is number of ids that would be enqueued in a single bulk + // enqueue operation. Chunking is done to avoid holding locks for too long + // as it will interfere with enqueue operations. + BulkChunkSize int +} + +// Scheduler is a simple scheduler that allows work to be scheduler +// against number of processors. Each processor is represented by unique id and +// a callback. +// +// Work is enqueued in a form of event type using processor id. +// Processors callback is then called by worker thread with all combined pending +// events. +// +// Each event is represented as a bit mask and multiple pending events could be +// ORed together before being delivered to processor. +type Scheduler struct { + SchedulerConfig + + mu struct { + syncutil.Mutex + nextID int64 + procs map[int64]Callback + status map[int64]processorEventType + queue *idQueue + // No more new registrations allowed. Workers are winding down. + quiescing bool + } + cond *sync.Cond + wg sync.WaitGroup +} + +// NewScheduler will instantiate an idle scheduler based on provided config. +// Scheduler needs to be started to become operational. +func NewScheduler(cfg SchedulerConfig) *Scheduler { + if cfg.BulkChunkSize == 0 { + cfg.BulkChunkSize = enqueueBulkMaxChunk + } + s := &Scheduler{ + SchedulerConfig: cfg, + wg: sync.WaitGroup{}, + } + s.mu.procs = make(map[int64]Callback) + s.mu.status = make(map[int64]processorEventType) + s.mu.queue = newIDQueue() + s.cond = sync.NewCond(&s.mu) + return s +} + +// Start scheduler workers. +func (s *Scheduler) Start(ctx context.Context, stopper *stop.Stopper) error { + for i := 0; i < s.Workers; i++ { + s.wg.Add(1) + workerID := i + if err := stopper.RunAsyncTask(ctx, fmt.Sprintf("rangefeed-scheduler-worker-%d", workerID), + func(ctx context.Context) { + log.VEventf(ctx, 3, "%d scheduler worker started", workerID) + defer s.wg.Done() + s.processEvents(ctx) + log.VEventf(ctx, 3, "%d scheduler worker finished", workerID) + }); err != nil { + s.wg.Done() + s.Stop() + return err + } + } + if err := stopper.RunAsyncTask(ctx, "terminate scheduler", + func(ctx context.Context) { + <-stopper.ShouldQuiesce() + log.VEvent(ctx, 2, "scheduler quiescing") + s.Stop() + }); err != nil { + s.Stop() + return err + } + return nil +} + +// Register callback to be able to schedule work. Returns allocated callback id +// which should be used to send notifications to the callback. Returns error if +// Scheduler is stopped. +func (s *Scheduler) Register(f Callback) (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mu.quiescing { + // Don't accept new registrations if quiesced. + return 0, errors.New("server stopping") + } + s.mu.nextID++ + id := s.mu.nextID + s.mu.procs[id] = f + return id, nil +} + +// Enqueue event for existing callback. Returns error if callback was not +// registered for the id or if processor is stopping. Error doesn't guarantee +// that processor actually handled stopped event it may either be pending or +// processed. +func (s *Scheduler) Enqueue(id int64, evt processorEventType) { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.mu.procs[id]; !ok { + return + } + newWork := s.enqueueInternalLocked(id, evt) + if newWork { + // Wake up potential waiting worker. + // We are allowed to do this under cond lock. + s.cond.Signal() + } +} + +func (s *Scheduler) enqueueInternalLocked(id int64, evt processorEventType) bool { + pending := s.mu.status[id] + if pending&stopped != 0 { + return false + } + if pending == 0 { + // Enqueue if processor was idle. + s.mu.queue.pushBack(id) + } + update := pending | evt | queued + if update != pending { + // Only update if event actually changed. + s.mu.status[id] = update + } + return pending == 0 +} + +// EnqueueAll enqueues event for all existing non-stopped id's. Enqueueing is +// done in chunks to avoid holding lock for too long and interfering with other +// enqueue operations. +// +// If id is not known or already stopped it is ignored. +func (s *Scheduler) EnqueueAll(ids []int64, evt processorEventType) { + scheduleChunk := func(chunk []int64) int { + s.mu.Lock() + defer s.mu.Unlock() + wake := 0 + for _, id := range chunk { + if _, ok := s.mu.procs[id]; ok { + if newWork := s.enqueueInternalLocked(id, evt); newWork { + wake++ + } + } + } + return wake + } + wake := 0 + total := len(ids) + for first := 0; first < total; first += s.BulkChunkSize { + last := first + s.BulkChunkSize + if last > total { + last = total + } + added := scheduleChunk(ids[first:last]) + wake += added + } + // Wake up potential waiting workers. We wake all of them as we expect more + // than total number of workers. + if wake >= s.Workers { + s.cond.Broadcast() + } else { + for ; wake > 0; wake-- { + s.cond.Signal() + } + } +} + +// StopProcessor instructs processor to stop gracefully by sending it stopped event. +// Once stop is called all subsequent Schedule calls for this id will return +// error. +func (s *Scheduler) StopProcessor(id int64) { + s.Enqueue(id, stopped) +} + +// processEvents is a main worker method of a scheduler pool. each one should +// be launched in separate goroutine and will loop until scheduler is stopped. +func (s *Scheduler) processEvents(ctx context.Context) { + for { + var id int64 + s.mu.Lock() + for { + if s.mu.quiescing { + s.mu.Unlock() + return + } + var ok bool + if id, ok = s.mu.queue.popFront(); ok { + break + } + s.cond.Wait() + } + + cb := s.mu.procs[id] + e := s.mu.status[id] + // Keep queued status and preserve stopped to block any more events. + s.mu.status[id] = queued | (e & stopped) + s.mu.Unlock() + + procEventType := queued ^ e + remaining := cb(procEventType) + + if remaining != 0 && buildutil.CrdbTestBuild { + if (remaining^procEventType)&remaining != 0 { + log.Fatalf(ctx, + "rangefeed processor attempted to reschedule event type %s that was not present in original event set %s", + procEventType, remaining) + } + } + + if e&stopped != 0 { + if remaining != 0 { + log.VWarningf(ctx, 5, + "rangefeed processor %d didn't process all events on close", id) + } + // We'll keep stopped state to avoid calling stopped processor again + // on scheduler shutdown. + s.mu.Lock() + s.mu.status[id] = stopped + s.mu.Unlock() + continue + } + + s.mu.Lock() + pendingStatus, ok := s.mu.status[id] + if !ok { + s.mu.Unlock() + continue + } + newStatus := pendingStatus | remaining + if newStatus == queued { + // If no events arrived, get rid of id. + delete(s.mu.status, id) + } else { + // Since more events arrived during processing, reschedule. + s.mu.queue.pushBack(id) + // If remaining work was returned and not already planned, then update + // pending status to reflect that. + if newStatus != pendingStatus { + s.mu.status[id] = newStatus + } + } + s.mu.Unlock() + } +} + +// Unregister a processor. This function is removing processor callback and +// status from scheduler. If processor is currently processing event it will +// finish processing. +// Processor won't receive stopped event if it wasn't explicitly sent. +// To make sure processor performs cleanup, it is easier to send it stopped +// event first and let it remove itself from registration during event handling. +// Any attempts to enqueue events for processor after this call will return an +// error. +func (s *Scheduler) Unregister(id int64) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.mu.procs, id) + delete(s.mu.status, id) +} + +func (s *Scheduler) Stop() { + // Stop all processors. + s.mu.Lock() + if !s.mu.quiescing { + // On first close attempt trigger termination of all unfinished callbacks, + // we only need to do that once to avoid closing s.drained channel multiple + // times. + s.mu.quiescing = true + } + s.mu.Unlock() + s.cond.Broadcast() + s.wg.Wait() + + // Synchronously notify all non-stopped processors about stop. + s.mu.Lock() + for id, p := range s.mu.procs { + pending := s.mu.status[id] + // Ignore processors that already processed their stopped event. + if pending == stopped { + continue + } + // Add stopped event on top of what was pending and remove queued. + pending = (^queued & pending) | stopped + s.mu.Unlock() + p(pending) + s.mu.Lock() + } + s.mu.Unlock() +} + +// ClientScheduler is a wrapper on top of scheduler that could be passed to a +// processor to be able to register itself with a pre-configured ID, enqueue +// events and terminate as needed. +type ClientScheduler struct { + id int64 + s *Scheduler +} + +// NewClientScheduler creates an instance of ClientScheduler for specific id. +// It is safe to use it as value as it is immutable and delegates all work to +// underlying scheduler. +func NewClientScheduler(s *Scheduler) ClientScheduler { + return ClientScheduler{ + s: s, + } +} + +// ID returns underlying callback id used to schedule work. +func (cs *ClientScheduler) ID() int64 { + return cs.id +} + +// Register registers processing callback in scheduler. Error is returned if +// callback was already registered for this ClientScheduler or if scheduler is +// already quiescing. +func (cs *ClientScheduler) Register(cb Callback) error { + if cs.id != 0 { + return errors.Newf("callback is already registered with id %d", cs.id) + } + var err error + cs.id, err = cs.s.Register(cb) + return err +} + +// Schedule schedules callback for event. Error is returned if client callback +// wasn't registered prior to this call. +func (cs *ClientScheduler) Schedule(event processorEventType) { + cs.s.Enqueue(cs.id, event) +} + +// Stop instructs processor to stop gracefully by sending it stopped event. +// Once stop is called all subsequent Schedule calls will return error. +func (cs *ClientScheduler) Stop() { + cs.s.StopProcessor(cs.id) +} + +// Unregister will remove callback associated with this processor. No stopped +// event will be scheduled. See Scheduler.Unregister for details. +func (cs *ClientScheduler) Unregister() { + cs.s.Unregister(cs.id) +} + +// Number of queue elements allocated at once to amortize queue allocations. +const idQueueChunkSize = 8000 + +// idQueueChunk is a queue chunk of a fixed size which idQueue uses to extend +// its storage. Chunks are kept in the pool to reduce allocations. +type idQueueChunk struct { + data [idQueueChunkSize]int64 + nextChunk *idQueueChunk +} + +var sharedIDQueueChunkSyncPool = sync.Pool{ + New: func() interface{} { + return new(idQueueChunk) + }, +} + +func getPooledIDQueueChunk() *idQueueChunk { + return sharedIDQueueChunkSyncPool.Get().(*idQueueChunk) +} + +func putPooledIDQueueChunk(e *idQueueChunk) { + // Don't need to cleanup chunk as it is an array of values. + e.nextChunk = nil + sharedIDQueueChunkSyncPool.Put(e) +} + +// idQueue stores pending processor ID's. Internally data is stored in +// idQueueChunkSize sized arrays that are added as needed and discarded once +// reader and writers finish working with it. Since we only have a single +// scheduler per store, we don't use a pool as only reuse could happen within +// the same queue and in that case we can just increase chunk size. +type idQueue struct { + first, last *idQueueChunk + read, write int + size int +} + +func newIDQueue() *idQueue { + chunk := getPooledIDQueueChunk() + return &idQueue{ + first: chunk, + last: chunk, + read: 0, + size: 0, + } +} + +func (q *idQueue) pushBack(id int64) { + if q.write == idQueueChunkSize { + nexChunk := getPooledIDQueueChunk() + q.last.nextChunk = nexChunk + q.last = nexChunk + q.write = 0 + } + q.last.data[q.write] = id + q.write++ + q.size++ +} + +func (q *idQueue) popFront() (int64, bool) { + if q.size == 0 { + return 0, false + } + if q.read == idQueueChunkSize { + removed := q.first + q.first = q.first.nextChunk + putPooledIDQueueChunk(removed) + q.read = 0 + } + res := q.first.data[q.read] + q.read++ + q.size-- + return res, true +} + +func (q *idQueue) Len() int { + return q.size +} diff --git a/pkg/kv/kvserver/rangefeed/scheduler_test.go b/pkg/kv/kvserver/rangefeed/scheduler_test.go new file mode 100644 index 000000000000..b9a3508afed4 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/scheduler_test.go @@ -0,0 +1,464 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" +) + +func TestStopEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + s.Stop() + + assertStopsWithinTimeout(t, s) +} + +func TestStopNonEmpty(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + s.StopProcessor(c.id) + assertStopsWithinTimeout(t, s) + c.requireStopped(t, time.Second*30) +} + +type schedulerConsumer struct { + c chan processorEventType + mu struct { + syncutil.RWMutex + wait chan interface{} + waiting chan interface{} + } + reschedule chan processorEventType + flat []processorEventType + sched *Scheduler + id int64 +} + +func createAndRegisterConsumerOrFail(t *testing.T, scheduler *Scheduler) *schedulerConsumer { + t.Helper() + c := &schedulerConsumer{ + c: make(chan processorEventType, 1000), + reschedule: make(chan processorEventType, 1), + sched: scheduler, + } + id, err := c.sched.Register(c.process) + require.NoError(t, err, "failed to register processor") + c.id = id + return c +} + +func (c *schedulerConsumer) process(ev processorEventType) processorEventType { + c.c <- ev + c.mu.RLock() + w, ww := c.mu.wait, c.mu.waiting + c.mu.RUnlock() + if w != nil { + close(ww) + <-w + } + select { + case r := <-c.reschedule: + // Tests don't try to do reschedule and stop at the same time, so it's ok + // not to fall through. + return r + default: + } + if ev&stopped != 0 { + c.sched.Unregister(c.id) + } + return 0 +} + +func (c *schedulerConsumer) pause() { + c.mu.Lock() + c.mu.wait = make(chan interface{}) + c.mu.waiting = make(chan interface{}) + c.mu.Unlock() +} + +func (c *schedulerConsumer) waitPaused() { + <-c.mu.waiting +} + +// Close waiter channel. Test should track state itself and don't use resume if +// pause was not issued. +func (c *schedulerConsumer) resume() { + c.mu.Lock() + w := c.mu.wait + c.mu.wait, c.mu.waiting = nil, nil + c.mu.Unlock() + close(w) +} + +func (c *schedulerConsumer) rescheduleNext(e processorEventType) { + c.reschedule <- e +} + +func (c *schedulerConsumer) assertTill( + t *testing.T, timeout time.Duration, assert func(flat []processorEventType) bool, +) bool { + t.Helper() + till := time.After(timeout) + for { + if assert(c.flat) { + return true + } + select { + case <-till: + return false + case e := <-c.c: + c.flat = append(c.flat, e) + } + } +} + +func (c *schedulerConsumer) requireEvent( + t *testing.T, timeout time.Duration, event processorEventType, count ...int, +) { + t.Helper() + min, max := 0, 0 + l := len(count) + switch { + case l == 1: + min, max = count[0], count[0] + case l == 2: + min, max = count[0], count[1] + default: + t.Fatal("event count limits must be 1 (exact) or 2 [mix, max]") + } + var lastHist []processorEventType + if !c.assertTill(t, timeout, func(flat []processorEventType) bool { + lastHist = flat + match := 0 + for _, e := range lastHist { + if e&event != 0 { + match++ + } + } + return match >= min && match <= max + }) { + t.Fatalf("failed to find event %08b between %d and %d times in history %08b", event, min, max, + lastHist) + } +} + +func (c *schedulerConsumer) requireHistory( + t *testing.T, timeout time.Duration, history []processorEventType, +) { + t.Helper() + var lastHist []processorEventType + if !c.assertTill(t, timeout, func(flat []processorEventType) bool { + lastHist = flat + return slices.Equal(history, lastHist) + }) { + t.Fatalf("expected history %08b found %08b", history, lastHist) + } +} + +func (c *schedulerConsumer) requireStopped(t *testing.T, timeout time.Duration) { + t.Helper() + lastEvent := processorEventType(0) + if !c.assertTill(t, timeout, func(flat []processorEventType) bool { + t.Helper() + if len(c.flat) == 0 { + return false + } + lastEvent = c.flat[len(c.flat)-1] + return lastEvent&stopped != 0 + }) { + t.Fatalf("failed to find stopped event at the end of history after %s, lastEvent=%08b", timeout, + lastEvent) + } +} + +const ( + te1 = 1 << 2 + te2 = 1 << 3 + te3 = 1 << 4 +) + +func TestDeliverEvents(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + s.Enqueue(c.id, te1) + c.requireEvent(t, time.Second*30000, te1, 1) + assertStopsWithinTimeout(t, s) +} + +func TestNoParallel(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + c.pause() + s.Enqueue(c.id, te1) + c.waitPaused() + s.Enqueue(c.id, te2) + c.resume() + c.requireHistory(t, time.Second*30, []processorEventType{te1, te2}) + assertStopsWithinTimeout(t, s) +} + +func TestProcessOtherWhilePaused(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c1 := createAndRegisterConsumerOrFail(t, s) + c2 := createAndRegisterConsumerOrFail(t, s) + c1.pause() + s.Enqueue(c1.id, te1) + c1.waitPaused() + s.Enqueue(c2.id, te1) + c2.requireHistory(t, time.Second*30, []processorEventType{te1}) + c1.resume() + c1.requireHistory(t, time.Second*30, []processorEventType{te1}) + assertStopsWithinTimeout(t, s) + c1.requireStopped(t, time.Second*30) + c2.requireStopped(t, time.Second*30) +} + +func TestEventsCombined(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + c.pause() + s.Enqueue(c.id, te1) + c.waitPaused() + s.Enqueue(c.id, te2) + s.Enqueue(c.id, te3) + c.resume() + c.requireHistory(t, time.Second*30, []processorEventType{te1, te2 | te3}) + assertStopsWithinTimeout(t, s) +} + +func TestRescheduleEvent(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + c.pause() + s.Enqueue(c.id, te1) + c.waitPaused() + s.Enqueue(c.id, te1) + c.resume() + c.requireHistory(t, time.Second*30, []processorEventType{te1, te1}) + assertStopsWithinTimeout(t, s) +} + +func TestClientScheduler(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + cs := NewClientScheduler(s) + // Manually create consumer as we don't want it to start, but want to use it + // via client scheduler. + c := &schedulerConsumer{ + c: make(chan processorEventType, 1000), + reschedule: make(chan processorEventType, 1), + sched: s, + id: 1, + } + require.NoError(t, cs.Register(c.process), "failed to register consumer") + require.Error(t, + cs.Register(func(event processorEventType) (remaining processorEventType) { return 0 }), + "reregistration must fail") + c.pause() + cs.Schedule(te2) + c.waitPaused() + cs.Unregister() + c.resume() + c.requireHistory(t, time.Second*30, []processorEventType{te2}) + assertStopsWithinTimeout(t, s) +} + +func TestScheduleMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 2, BulkChunkSize: 2}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + const consumerNumber = 10 + consumers := make([]*schedulerConsumer, consumerNumber) + ids := make([]int64, consumerNumber) + for i := 0; i < consumerNumber; i++ { + consumers[i] = createAndRegisterConsumerOrFail(t, s) + ids[i] = consumers[i].id + } + s.EnqueueAll(ids, te1) + for _, c := range consumers { + c.requireEvent(t, time.Second*30000, te1, 1) + } + assertStopsWithinTimeout(t, s) +} + +func TestPartialProcessing(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + // Set process response to trigger process once again. + c.rescheduleNext(te1) + s.Enqueue(c.id, te1) + c.requireHistory(t, time.Second*30, []processorEventType{te1, te1}) + assertStopsWithinTimeout(t, s) +} + +func assertStopsWithinTimeout(t *testing.T, s *Scheduler) { + stopC := make(chan interface{}) + go func() { + s.Stop() + close(stopC) + }() + select { + case <-stopC: + case <-time.After(30 * time.Second): + t.Fatalf("scheduler failed to stop after 30 seconds") + } +} + +func TestUnregisterWithoutStop(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c := createAndRegisterConsumerOrFail(t, s) + s.Enqueue(c.id, te1) + c.requireHistory(t, time.Second*30, []processorEventType{te1}) + s.Unregister(c.id) + assertStopsWithinTimeout(t, s) + // Ensure that we didn't send stop after callback was removed. + c.requireHistory(t, time.Second*30, []processorEventType{te1}) +} + +func TestStartupFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.Error(t, s.Start(ctx, stopper), "started despite stopper stopped") +} + +func TestSchedulerShutdown(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + s := NewScheduler(SchedulerConfig{Workers: 1}) + require.NoError(t, s.Start(ctx, stopper), "failed to start") + c1 := createAndRegisterConsumerOrFail(t, s) + c2 := createAndRegisterConsumerOrFail(t, s) + s.StopProcessor(c2.id) + s.Stop() + // Ensure that we are not stopped twice. + c1.requireHistory(t, time.Second*30, []processorEventType{stopped}) + c2.requireHistory(t, time.Second*30, []processorEventType{stopped}) +} + +func TestQueueReadWrite1By1(t *testing.T) { + q := newIDQueue() + val := int64(7) + for i := 0; i < idQueueChunkSize*3; i++ { + q.pushBack(val) + require.Equal(t, 1, q.Len(), "queue size") + v, ok := q.popFront() + require.True(t, ok, "value not found after writing") + require.Equal(t, val, v, "read different from write") + val = val*3 + 7 + } + _, ok := q.popFront() + require.False(t, ok, "unexpected value after tail") +} + +func TestQueueReadWriteFull(t *testing.T) { + q := newIDQueue() + val := int64(7) + for i := 0; i < idQueueChunkSize*3; i++ { + require.Equal(t, i, q.Len(), "queue size") + q.pushBack(val) + val = val*3 + 7 + } + val = int64(7) + for i := 0; i < idQueueChunkSize*3; i++ { + require.Equal(t, idQueueChunkSize*3-i, q.Len(), "queue size") + v, ok := q.popFront() + require.True(t, ok, "value not found after writing") + require.Equal(t, val, v, "read different from write") + val = val*3 + 7 + } + require.Equal(t, 0, q.Len(), "queue size") + _, ok := q.popFront() + require.False(t, ok, "unexpected value after tail") +} + +func TestQueueReadEmpty(t *testing.T) { + q := newIDQueue() + _, ok := q.popFront() + require.False(t, ok, "unexpected value in empty queue") +}