Skip to content

Commit

Permalink
CDC/etcd_worker: add rate limiter to limit EtcdWorker tick frequency (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 4, 2021
1 parent 8470bb3 commit 85749b9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
15 changes: 14 additions & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

// EtcdWorker handles all interactions with Etcd
Expand Down Expand Up @@ -121,6 +122,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
lastReceivedEventTime := time.Now()

// tickRate represents the number of times EtcdWorker can tick
// the reactor per second
tickRate := time.Second / timerInterval
rl := rate.NewLimiter(rate.Limit(tickRate), 1)
for {
var response clientv3.WatchResponse
select {
Expand All @@ -137,7 +142,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
case response = <-watchCh:
// In this select case, we receive new events from Etcd, and call handleEvent if appropriate.

if err := response.Err(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -184,6 +188,15 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err := worker.applyUpdates(); err != nil {
return errors.Trace(err)
}

// If !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency.
// It make etcdWorker to batch etcd changed event in worker.state.
// The semantics of `ReactorState` requires that any implementation
// can batch updates internally.
if !rl.Allow() {
continue
}
// it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick
nextState, err := worker.reactor.Tick(ctx, worker.state)
if err != nil {
if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
type intReactorState struct {
val int
isUpdated bool
lastVal int
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
Expand All @@ -290,6 +291,12 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er
if err != nil {
log.Panic("intReactorState", zap.Error(err))
}
// As long as we can ensure that val is monotonically increasing,
// we can ensure that the linearizability of state changes
if s.lastVal > s.val {
log.Panic("linearizability check failed, lastVal must less than current val", zap.Int("lastVal", s.lastVal), zap.Int("val", s.val))
}
s.lastVal = s.val
s.isUpdated = !isInit
return nil
}
Expand All @@ -299,17 +306,17 @@ func (s *intReactorState) GetPatches() [][]DataPatch {
}

type linearizabilityReactor struct {
state *intReactorState
expected int
state *intReactorState
tickCount int
}

func (r *linearizabilityReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) {
r.state = state.(*intReactorState)
if r.state.isUpdated {
if r.state.val != r.expected {
log.Panic("linearizability check failed", zap.Int("expected", r.expected), zap.Int("actual", r.state.val))
if r.state.val < r.tickCount {
log.Panic("linearizability check failed, val must larger than tickCount", zap.Int("expected", r.tickCount), zap.Int("actual", r.state.val))
}
r.expected++
r.tickCount++
}
if r.state.val == 1999 {
return r.state, cerrors.ErrReactorFinished
Expand All @@ -335,8 +342,8 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
}

reactor, err := NewEtcdWorker(cli0, testEtcdKeyPrefix+"/lin", &linearizabilityReactor{
state: nil,
expected: 999,
state: nil,
tickCount: 999,
}, &intReactorState{
val: 0,
isUpdated: false,
Expand Down

0 comments on commit 85749b9

Please sign in to comment.