Skip to content

Commit

Permalink
kv: delay starting heartbeat loop until after first interval
Browse files Browse the repository at this point in the history
Before this PR every transaction would start a goroutine for its heartbeat loop
immediately upon creation. Most transactions never need to heartbeat so this
goroutine is wasteful. This change delays the start of the heartbeat loop until
after the first heartbeatInterval has passed by using the new DelayedStopper.

Fixes cockroachdb#35009

Release note: None
  • Loading branch information
ajwerner committed Feb 19, 2019
1 parent 4098122 commit 5127770
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 75 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ type TxnCoordSenderFactory struct {
clock *hlc.Clock
heartbeatInterval time.Duration
linearizable bool // enables linearizable behavior
stopper *stop.Stopper
stopper *stop.DelayedStopper
metrics TxnMetrics

testingKnobs ClientTestingKnobs
Expand Down Expand Up @@ -377,7 +377,7 @@ func NewTxnCoordSenderFactory(
st: cfg.Settings,
wrapped: wrapped,
clock: cfg.Clock,
stopper: cfg.Stopper,
stopper: stop.NewDelayedStopper(cfg.Stopper),
linearizable: cfg.Linearizable,
heartbeatInterval: cfg.HeartbeatInterval,
metrics: cfg.Metrics,
Expand Down
60 changes: 0 additions & 60 deletions pkg/kv/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
Expand All @@ -34,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
)

// Test that a transaction gets cleaned up when the heartbeat loop finds out
Expand Down Expand Up @@ -139,61 +137,3 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {
t.Fatalf("expected aborted error, got: %s", err)
}
}

// Test that, when a transaction restarts, we don't get a second heartbeat loop
// for it. This bug happened in the past.
//
// The test traces the restarting transaction and looks in it to see how many
// times a heartbeat loop was started.
func TestNoDuplicateHeartbeatLoops(t *testing.T) {
defer leaktest.AfterTest(t)()

s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

key := roachpb.Key("a")

tracer := tracing.NewTracer()
sp := tracer.StartSpan("test", tracing.Recordable)
tracing.StartRecording(sp, tracing.SingleNodeRecording)
txnCtx := opentracing.ContextWithSpan(context.Background(), sp)

push := func(ctx context.Context, key roachpb.Key) error {
return db.Put(ctx, key, "push")
}

var attempts int
err := db.Txn(txnCtx, func(ctx context.Context, txn *client.Txn) error {
attempts++
if attempts == 1 {
if err := push(context.Background() /* keep the contexts separate */, key); err != nil {
return err
}
}
if _, err := txn.Get(ctx, key); err != nil {
return err
}
return txn.Put(ctx, key, "val")
})
if err != nil {
t.Fatal(err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got: %d", attempts)
}
sp.Finish()
recording := tracing.GetRecording(sp)
var foundHeartbeatLoop bool
for _, sp := range recording {
if strings.Contains(sp.Operation, "heartbeat loop") {
if foundHeartbeatLoop {
t.Fatal("second heartbeat loop found")
}
foundHeartbeatLoop = true
}
}
if !foundHeartbeatLoop {
t.Fatal("no heartbeat loop found. Test rotted?")
}
}
34 changes: 22 additions & 12 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type txnHeartbeater struct {

// stopper is the TxnCoordSender's stopper. Used to stop the heartbeat loop
// when quiescing.
stopper *stop.Stopper
stopper *stop.DelayedStopper

// asyncAbortCallbackLocked is called when the heartbeat loop shuts itself
// down because it has detected the transaction to be aborted. The intention
Expand Down Expand Up @@ -117,6 +117,8 @@ type txnHeartbeater struct {
// TODO(nvanbenschoten): Once we stop sending BeginTxn entirely (v2.3)
// we can get rid of this. For now, we keep it to ensure compatibility.
needBeginTxn bool

heartbeatTask stop.DelayedTask
}
}

Expand All @@ -130,7 +132,7 @@ func (h *txnHeartbeater) init(
heartbeatInterval time.Duration,
gatekeeper lockedSender,
metrics *TxnMetrics,
stopper *stop.Stopper,
stopper *stop.DelayedStopper,
asyncAbortCallbackLocked func(context.Context),
) {
h.stopper = stopper
Expand Down Expand Up @@ -267,6 +269,7 @@ func (h *txnHeartbeater) closeLocked() {
if h.mu.txnEnd == nil {
return
}
h.mu.heartbeatTask.Cancel()
close(h.mu.txnEnd)
h.mu.txnEnd = nil
}
Expand All @@ -288,23 +291,17 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
hbCtx := h.AnnotateCtx(context.Background())
hbCtx = opentracing.ContextWithSpan(hbCtx, opentracing.SpanFromContext(ctx))

return h.stopper.RunAsyncTask(
_, err := h.stopper.RunDelayedAsyncTask(
hbCtx, "kv.TxnCoordSender: heartbeat loop", func(ctx context.Context) {
h.heartbeatLoop(ctx)
})
}, h.heartbeatInterval, &h.mu.heartbeatTask)
return err
}

// heartbeatLoop periodically sends a HeartbeatTxn request to the transaction
// record, stopping in the event the transaction is aborted or committed after
// attempting to resolve the intents.
func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
var tickChan <-chan time.Time
{
ticker := time.NewTicker(h.heartbeatInterval)
tickChan = ticker.C
defer ticker.Stop()
}

var finalErr *roachpb.Error
defer func() {
h.mu.Lock()
Expand All @@ -317,7 +314,14 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
}
h.mu.Unlock()
}()

// heartbeat once immediately on start.
if !h.heartbeat(ctx) {
// This error we're generating here should not be seen by clients. Since
// the transaction is aborted, they should be rejected before they reach
// this interceptor.
finalErr = roachpb.NewErrorf("heartbeat failed fatally")
return
}
var closer <-chan struct{}
{
h.mu.Lock()
Expand All @@ -327,6 +331,12 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
return
}
}
var tickChan <-chan time.Time
{
ticker := time.NewTicker(h.heartbeatInterval)
tickChan = ticker.C
defer ticker.Stop()
}
// Loop with ticker for periodic heartbeats.
for {
select {
Expand Down
16 changes: 15 additions & 1 deletion pkg/util/stop/delayed_stopper.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package stop

import (
Expand Down Expand Up @@ -55,7 +69,7 @@ func (t *DelayedTask) Cancel() {
}

// RunDelayedAsyncTask queues a task for executaion after the provided delay.
// An error is returned if the context is cancelled or the DelayedStopper is
// An error is returned if the context is canceled or the DelayedStopper is
// shutting down before the task can be queued. Otherwise the returned object
// can be used to cancel the task while its queued. The method task an optional
// DelayedTask pointer to allow clients to avoid an allocation by storing the
Expand Down

0 comments on commit 5127770

Please sign in to comment.