diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index c4adda2b487d..0549527a5528 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -343,7 +343,7 @@ type TxnCoordSenderFactory struct { clock *hlc.Clock heartbeatInterval time.Duration linearizable bool // enables linearizable behavior - stopper *stop.Stopper + stopper *stop.DelayedStopper metrics TxnMetrics testingKnobs ClientTestingKnobs @@ -377,7 +377,7 @@ func NewTxnCoordSenderFactory( st: cfg.Settings, wrapped: wrapped, clock: cfg.Clock, - stopper: cfg.Stopper, + stopper: stop.NewDelayedStopper(cfg.Stopper), linearizable: cfg.Linearizable, heartbeatInterval: cfg.HeartbeatInterval, metrics: cfg.Metrics, diff --git a/pkg/kv/txn_coord_sender_server_test.go b/pkg/kv/txn_coord_sender_server_test.go index ad48bbc0ef36..ac46259b1c9e 100644 --- a/pkg/kv/txn_coord_sender_server_test.go +++ b/pkg/kv/txn_coord_sender_server_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "reflect" - "strings" "sync/atomic" "testing" "time" @@ -34,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/opentracing/opentracing-go" ) // Test that a transaction gets cleaned up when the heartbeat loop finds out @@ -139,61 +137,3 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { t.Fatalf("expected aborted error, got: %s", err) } } - -// Test that, when a transaction restarts, we don't get a second heartbeat loop -// for it. This bug happened in the past. -// -// The test traces the restarting transaction and looks in it to see how many -// times a heartbeat loop was started. -func TestNoDuplicateHeartbeatLoops(t *testing.T) { - defer leaktest.AfterTest(t)() - - s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) - ctx := context.Background() - defer s.Stopper().Stop(ctx) - - key := roachpb.Key("a") - - tracer := tracing.NewTracer() - sp := tracer.StartSpan("test", tracing.Recordable) - tracing.StartRecording(sp, tracing.SingleNodeRecording) - txnCtx := opentracing.ContextWithSpan(context.Background(), sp) - - push := func(ctx context.Context, key roachpb.Key) error { - return db.Put(ctx, key, "push") - } - - var attempts int - err := db.Txn(txnCtx, func(ctx context.Context, txn *client.Txn) error { - attempts++ - if attempts == 1 { - if err := push(context.Background() /* keep the contexts separate */, key); err != nil { - return err - } - } - if _, err := txn.Get(ctx, key); err != nil { - return err - } - return txn.Put(ctx, key, "val") - }) - if err != nil { - t.Fatal(err) - } - if attempts != 2 { - t.Fatalf("expected 2 attempts, got: %d", attempts) - } - sp.Finish() - recording := tracing.GetRecording(sp) - var foundHeartbeatLoop bool - for _, sp := range recording { - if strings.Contains(sp.Operation, "heartbeat loop") { - if foundHeartbeatLoop { - t.Fatal("second heartbeat loop found") - } - foundHeartbeatLoop = true - } - } - if !foundHeartbeatLoop { - t.Fatal("no heartbeat loop found. Test rotted?") - } -} diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 7605b1ffda17..0134dfe0daeb 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -79,7 +79,7 @@ type txnHeartbeater struct { // stopper is the TxnCoordSender's stopper. Used to stop the heartbeat loop // when quiescing. - stopper *stop.Stopper + stopper *stop.DelayedStopper // asyncAbortCallbackLocked is called when the heartbeat loop shuts itself // down because it has detected the transaction to be aborted. The intention @@ -117,6 +117,8 @@ type txnHeartbeater struct { // TODO(nvanbenschoten): Once we stop sending BeginTxn entirely (v2.3) // we can get rid of this. For now, we keep it to ensure compatibility. needBeginTxn bool + + heartbeatTask stop.DelayedTask } } @@ -130,7 +132,7 @@ func (h *txnHeartbeater) init( heartbeatInterval time.Duration, gatekeeper lockedSender, metrics *TxnMetrics, - stopper *stop.Stopper, + stopper *stop.DelayedStopper, asyncAbortCallbackLocked func(context.Context), ) { h.stopper = stopper @@ -267,6 +269,7 @@ func (h *txnHeartbeater) closeLocked() { if h.mu.txnEnd == nil { return } + h.mu.heartbeatTask.Cancel() close(h.mu.txnEnd) h.mu.txnEnd = nil } @@ -288,23 +291,17 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error { hbCtx := h.AnnotateCtx(context.Background()) hbCtx = opentracing.ContextWithSpan(hbCtx, opentracing.SpanFromContext(ctx)) - return h.stopper.RunAsyncTask( + _, err := h.stopper.RunDelayedAsyncTask( hbCtx, "kv.TxnCoordSender: heartbeat loop", func(ctx context.Context) { h.heartbeatLoop(ctx) - }) + }, h.heartbeatInterval, &h.mu.heartbeatTask) + return err } // heartbeatLoop periodically sends a HeartbeatTxn request to the transaction // record, stopping in the event the transaction is aborted or committed after // attempting to resolve the intents. func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) { - var tickChan <-chan time.Time - { - ticker := time.NewTicker(h.heartbeatInterval) - tickChan = ticker.C - defer ticker.Stop() - } - var finalErr *roachpb.Error defer func() { h.mu.Lock() @@ -317,7 +314,14 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) { } h.mu.Unlock() }() - + // heartbeat once immediately on start. + if !h.heartbeat(ctx) { + // This error we're generating here should not be seen by clients. Since + // the transaction is aborted, they should be rejected before they reach + // this interceptor. + finalErr = roachpb.NewErrorf("heartbeat failed fatally") + return + } var closer <-chan struct{} { h.mu.Lock() @@ -327,6 +331,12 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) { return } } + var tickChan <-chan time.Time + { + ticker := time.NewTicker(h.heartbeatInterval) + tickChan = ticker.C + defer ticker.Stop() + } // Loop with ticker for periodic heartbeats. for { select { diff --git a/pkg/util/stop/delayed_stopper.go b/pkg/util/stop/delayed_stopper.go index 5f0907780545..035d31b1ef82 100644 --- a/pkg/util/stop/delayed_stopper.go +++ b/pkg/util/stop/delayed_stopper.go @@ -1,3 +1,17 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + package stop import ( @@ -55,7 +69,7 @@ func (t *DelayedTask) Cancel() { } // RunDelayedAsyncTask queues a task for executaion after the provided delay. -// An error is returned if the context is cancelled or the DelayedStopper is +// An error is returned if the context is canceled or the DelayedStopper is // shutting down before the task can be queued. Otherwise the returned object // can be used to cancel the task while its queued. The method task an optional // DelayedTask pointer to allow clients to avoid an allocation by storing the