Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC/etcd_worker: add rate limiter to limit EtcdWorker tick frequency (#3219) #3269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

// EtcdWorker handles all interactions with Etcd
Expand Down Expand Up @@ -121,6 +122,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
lastReceivedEventTime := time.Now()

// tickRate represents the number of times EtcdWorker can tick
// the reactor per second
tickRate := time.Second / timerInterval
rl := rate.NewLimiter(rate.Limit(tickRate), 1)
for {
var response clientv3.WatchResponse
select {
Expand All @@ -137,7 +142,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
case response = <-watchCh:
// In this select case, we receive new events from Etcd, and call handleEvent if appropriate.

if err := response.Err(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -184,6 +188,15 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err := worker.applyUpdates(); err != nil {
return errors.Trace(err)
}

// If !rl.Allow(), skip this Tick to avoid etcd worker tick reactor too frequency.
// It make etcdWorker to batch etcd changed event in worker.state.
// The semantics of `ReactorState` requires that any implementation
// can batch updates internally.
if !rl.Allow() {
continue
}
// it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick
nextState, err := worker.reactor.Tick(ctx, worker.state)
if err != nil {
if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) {
Expand Down
21 changes: 14 additions & 7 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
type intReactorState struct {
val int
isUpdated bool
lastVal int
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
Expand All @@ -290,6 +291,12 @@ func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) er
if err != nil {
log.Panic("intReactorState", zap.Error(err))
}
// As long as we can ensure that val is monotonically increasing,
// we can ensure that the linearizability of state changes
if s.lastVal > s.val {
log.Panic("linearizability check failed, lastVal must less than current val", zap.Int("lastVal", s.lastVal), zap.Int("val", s.val))
}
s.lastVal = s.val
s.isUpdated = !isInit
return nil
}
Expand All @@ -299,17 +306,17 @@ func (s *intReactorState) GetPatches() [][]DataPatch {
}

type linearizabilityReactor struct {
state *intReactorState
expected int
state *intReactorState
tickCount int
}

func (r *linearizabilityReactor) Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error) {
r.state = state.(*intReactorState)
if r.state.isUpdated {
if r.state.val != r.expected {
log.Panic("linearizability check failed", zap.Int("expected", r.expected), zap.Int("actual", r.state.val))
if r.state.val < r.tickCount {
log.Panic("linearizability check failed, val must larger than tickCount", zap.Int("expected", r.tickCount), zap.Int("actual", r.state.val))
}
r.expected++
r.tickCount++
}
if r.state.val == 1999 {
return r.state, cerrors.ErrReactorFinished
Expand All @@ -335,8 +342,8 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) {
}

reactor, err := NewEtcdWorker(cli0, testEtcdKeyPrefix+"/lin", &linearizabilityReactor{
state: nil,
expected: 999,
state: nil,
tickCount: 999,
}, &intReactorState{
val: 0,
isUpdated: false,
Expand Down