Skip to content

Commit

Permalink
rac2: add token tracker
Browse files Browse the repository at this point in the history
This commit introduces the token `Tracker`, which will be used to track
token deductions for in-flight entries at a given `RaftPriority`.

Resolves: cockroachdb#128026
Release note: None
  • Loading branch information
kvoli committed Aug 26, 2024
1 parent 408d512 commit 1c69cf0
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"range_controller.go",
"store_stream.go",
"token_counter.go",
"token_tracker.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2",
visibility = ["//visibility:public"],
Expand All @@ -31,6 +32,7 @@ go_test(
"priority_test.go",
"range_controller_test.go",
"token_counter_test.go",
"token_tracker_test.go",
],
data = glob(["testdata/**"]),
embed = [":rac2"],
Expand Down
148 changes: 148 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/testdata/token_tracker
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
track
term=1 index=10 tokens=100 pri=LowPri
term=1 index=20 tokens=200 pri=NormalPri
term=1 index=30 tokens=300 pri=HighPri
----
tracked: term=1 index=10 tokens=100 pri=LowPri
tracked: term=1 index=20 tokens=200 pri=NormalPri
tracked: term=1 index=30 tokens=300 pri=HighPri

state
----
LowPri:
term=1 index=10 tokens=100
NormalPri:
term=1 index=20 tokens=200
HighPri:
term=1 index=30 tokens=300

# The Tracker should maintain correct ordering even when entries are not added
# in ascending index order but are in-order w.r.t priority.
track
term=2 index=50 tokens=500 pri=NormalPri
term=2 index=40 tokens=400 pri=LowPri
term=2 index=51 tokens=400 pri=LowPri
----
tracked: term=2 index=50 tokens=500 pri=NormalPri
tracked: term=2 index=40 tokens=400 pri=LowPri
tracked: term=2 index=51 tokens=400 pri=LowPri

state
----
LowPri:
term=1 index=10 tokens=100
term=2 index=40 tokens=400
term=2 index=51 tokens=400
NormalPri:
term=1 index=20 tokens=200
term=2 index=50 tokens=500
HighPri:
term=1 index=30 tokens=300

track
term=3 index=60 tokens=600 pri=HighPri
term=3 index=70 tokens=700 pri=LowPri
----
tracked: term=3 index=60 tokens=600 pri=HighPri
tracked: term=3 index=70 tokens=700 pri=LowPri

state
----
LowPri:
term=1 index=10 tokens=100
term=2 index=40 tokens=400
term=2 index=51 tokens=400
term=3 index=70 tokens=700
NormalPri:
term=1 index=20 tokens=200
term=2 index=50 tokens=500
HighPri:
term=1 index=30 tokens=300
term=3 index=60 tokens=600

untrack term=2
LowPri=45
NormalPri=0
HighPri=0
----
returned: tokens=500 pri=LowPri
returned: tokens=200 pri=NormalPri
returned: tokens=300 pri=HighPri

state
----
LowPri:
term=2 index=51 tokens=400
term=3 index=70 tokens=700
NormalPri:
term=2 index=50 tokens=500
HighPri:
term=3 index=60 tokens=600

# Untrack with a higher term.
untrack term=3
NormalPri=60
HighPri=60
----
returned: tokens=400 pri=LowPri
returned: tokens=500 pri=NormalPri
returned: tokens=600 pri=HighPri

state
----
LowPri:
term=3 index=70 tokens=700

untrack_ge index=40
----
returned: tokens=700 pri=LowPri

state
----

untrack_all
----

state
----

# Test tracking and untracking with different terms
track
term=4 index=80 tokens=800 pri=NormalPri
term=5 index=90 tokens=900 pri=NormalPri
term=5 index=100 tokens=999 pri=HighPri
----
tracked: term=4 index=80 tokens=800 pri=NormalPri
tracked: term=5 index=90 tokens=900 pri=NormalPri
tracked: term=5 index=100 tokens=999 pri=HighPri

state
----
NormalPri:
term=4 index=80 tokens=800
term=5 index=90 tokens=900
HighPri:
term=5 index=100 tokens=999

untrack term=4
NormalPri=95
HighPri=95
----
returned: tokens=800 pri=NormalPri

state
----
NormalPri:
term=5 index=90 tokens=900
HighPri:
term=5 index=100 tokens=999

untrack_all
----
returned: tokens=900 pri=NormalPri
returned: tokens=999 pri=HighPri

state
----

# vim:ft=sh
129 changes: 129 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rac2

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Tracker tracks flow token deductions for a replicaSendStream. Tokens are
// deducted for an in-flight log entry identified by its raft log index and
// term with a given RaftPriority.
type Tracker struct {
// tracked entries are stored in increasing order of (term, index), per
// priority.
tracked [raftpb.NumPriorities][]tracked

stream kvflowcontrol.Stream // used for logging only
}

// tracked represents tracked flow tokens; they're tracked with respect to a
// raft log index and term.
type tracked struct {
tokens kvflowcontrol.Tokens
index, term uint64
}

func NewTracker(stream kvflowcontrol.Stream) *Tracker {
return &Tracker{
tracked: [int(raftpb.NumPriorities)][]tracked{},
stream: stream,
}
}

// Track token deductions of the given priority with the given raft log index and term.
func (t *Tracker) Track(
ctx context.Context, term uint64, index uint64, pri raftpb.Priority, tokens kvflowcontrol.Tokens,
) bool {
if len(t.tracked[pri]) >= 1 {
last := t.tracked[pri][len(t.tracked[pri])-1]
// Tracker exists in the context of a single replicaSendStream, which cannot
// span the leader losing leadership and regaining it. So the indices must
// advance.
if last.index >= index {
log.Fatalf(ctx, "expected in order tracked log indexes (%d < %d)",
last.index, index)
return false
}
if last.term > term {
log.Fatalf(ctx, "expected in order tracked leader terms (%d < %d)",
last.term, term)
return false
}
}

t.tracked[pri] = append(t.tracked[pri], tracked{
tokens: tokens,
index: index,
term: term,
})

return true
}

// Untrack all token deductions of the given priority that have indexes less
// than or equal to the one provided, per priority, and terms less than or
// equal to the leader term.
func (t *Tracker) Untrack(
term uint64, admitted [raftpb.NumPriorities]uint64,
) (returned [raftpb.NumPriorities]kvflowcontrol.Tokens) {
for pri := range admitted {
uptoIndex := admitted[pri]
var untracked int
for n := len(t.tracked[pri]); untracked < n; untracked++ {
tracked := t.tracked[pri][untracked]
if tracked.term > term || (tracked.term == term && tracked.index > uptoIndex) {
break
}
returned[pri] += tracked.tokens
}
t.tracked[pri] = t.tracked[pri][untracked:]
}

return returned
}

// UntrackGE untracks all token deductions of the given priority that have
// indexes greater than or equal to the one provided.
func (t *Tracker) UntrackGE(index uint64) (returned [raftpb.NumPriorities]kvflowcontrol.Tokens) {
for pri := range t.tracked {
j := len(t.tracked[pri]) - 1
for j >= 0 {
tr := t.tracked[pri][j]
if tr.index >= index {
returned[pri] += tr.tokens
j--
} else {
break
}
}
t.tracked[pri] = t.tracked[pri][:j+1]
}

return returned
}

// UntrackAll iterates through all tracked token deductions, untracking all of them
// and returning the sum of tokens for each priority.
func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tokens) {
for pri, deductions := range t.tracked {
for _, deduction := range deductions {
returned[pri] += deduction.tokens
}
}
t.tracked = [raftpb.NumPriorities][]tracked{}

return returned
}
Loading

0 comments on commit 1c69cf0

Please sign in to comment.