Skip to content

Commit

Permalink
[wip,dnm,dnr] kvserver,kvcoord: batch txn heartbeats
Browse files Browse the repository at this point in the history
See cockroachdb#45013. Picking up from cockroachdb#52705 left off.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Madeline Liao <[email protected]>
  • Loading branch information
madelineliao and irfansharif committed Sep 14, 2020
1 parent db627d7 commit 6e554ce
Show file tree
Hide file tree
Showing 27 changed files with 2,039 additions and 1,371 deletions.
163 changes: 152 additions & 11 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

177 changes: 177 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
})
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Clock: s.Clock(),
Stopper: s.Stopper(),
AmbientCtx: ambient,
Clock: s.Clock(),
Stopper: s.Stopper(),
RangeDescriptorCache: ds.RangeDescriptorCache(),
},
ds,
)
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvclient/kvcoord/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ func InitFactoryForLocalTestCluster(
stopper *stop.Stopper,
gossip *gossip.Gossip,
) kv.TxnSenderFactory {
ds := NewDistSenderForLocalTestCluster(st, nodeDesc, tracer, clock, latency, stores, stopper, gossip)
rdc := ds.RangeDescriptorCache()
return NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Settings: st,
Clock: clock,
Stopper: stopper,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Settings: st,
Clock: clock,
Stopper: stopper,
RangeDescriptorCache: rdc,
},
NewDistSenderForLocalTestCluster(st, nodeDesc, tracer, clock, latency, stores, stopper, gossip),
ds,
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func newRootTxnCoordSender(
tcs.clock,
&tcs.metrics,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
tcf.heartbeatBatcher, // XXX: We bypass the lock gatekeeper entirely here.
&tcs.mu.Mutex,
&tcs.mu.txn,
)
Expand Down Expand Up @@ -675,7 +675,7 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
tc.metrics.RestartsReadWithinUncertainty.Inc()

case *roachpb.TransactionAbortedError:
tc.metrics.RestartsTxnAborted.Inc()
tc.metrics.RestartsTxnAborted.Inc() // XXX: We'll need to make sure this codepath still gets exercised.

case *roachpb.TransactionPushError:
tc.metrics.RestartsTxnPush.Inc()
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client/requestbatcher"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -34,24 +36,26 @@ type TxnCoordSenderFactory struct {
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
metrics TxnMetrics
heartbeatBatcher *TxnHeartbeatBatcher

testingKnobs ClientTestingKnobs
}

var _ kv.TxnSenderFactory = &TxnCoordSenderFactory{}

// TxnCoordSenderFactoryConfig holds configuration and auxiliary objects that can be passed
// to NewTxnCoordSenderFactory.
// TxnCoordSenderFactoryConfig holds configuration and auxiliary objects that
// can be passed to NewTxnCoordSenderFactory.
type TxnCoordSenderFactoryConfig struct {
AmbientCtx log.AmbientContext

Settings *cluster.Settings
Clock *hlc.Clock
Stopper *stop.Stopper

HeartbeatInterval time.Duration
Linearizable bool
Metrics TxnMetrics
HeartbeatInterval time.Duration
Linearizable bool
Metrics TxnMetrics
RangeDescriptorCache kvbase.RangeDescriptorCache

TestingKnobs ClientTestingKnobs
}
Expand All @@ -61,6 +65,14 @@ type TxnCoordSenderFactoryConfig struct {
func NewTxnCoordSenderFactory(
cfg TxnCoordSenderFactoryConfig, wrapped kv.Sender,
) *TxnCoordSenderFactory {
batcherCfg := requestbatcher.Config{
// XXX: These configurations need to be tuned.
Name: "heartbeat_batcher",
MaxMsgsPerBatch: 10,
Stopper: cfg.Stopper,
Sender: wrapped,
MaxWait: time.Second,
}
tcf := &TxnCoordSenderFactory{
AmbientContext: cfg.AmbientCtx,
st: cfg.Settings,
Expand All @@ -71,6 +83,7 @@ func NewTxnCoordSenderFactory(
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
testingKnobs: cfg.TestingKnobs,
heartbeatBatcher: NewTxnHeartbeatBatcher(cfg.RangeDescriptorCache, cfg.Clock, batcherCfg),
}
if tcf.st == nil {
tcf.st = cluster.MakeTestingClusterSettings()
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,18 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {

// Make a db with a short heartbeat interval.
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
ds := s.DistSenderI().(*kvcoord.DistSender)
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
// Short heartbeat interval.
HeartbeatInterval: time.Millisecond,
Settings: s.ClusterSettings(),
Clock: s.Clock(),
Stopper: s.Stopper(),
HeartbeatInterval: time.Millisecond,
Settings: s.ClusterSettings(),
Clock: s.Clock(),
Stopper: s.Stopper(),
RangeDescriptorCache: ds.RangeDescriptorCache(),
},
s.DistSenderI().(*kvcoord.DistSender),
ds,
)
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())
txn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
Expand Down
Loading

0 comments on commit 6e554ce

Please sign in to comment.