Skip to content

Commit

Permalink
kvserver: log receiver snapshot trace on context errors
Browse files Browse the repository at this point in the history
In #106363, the receiver side of snapshots was modified to log the trace
of the current context when context errors (such as timeouts or
client-side context cancellation) occur. This refactors that change to
log these traces whenever there is any context-based errors that occur
during receiver-side handling of the MultiRaft/RaftSnapshot streaming RPC.
This includes errors while receiving requests or sending responses from
the server side of a snapshot - i.e. any time that the traces cannot be
collected and returned to the client.

Additionally, testing for these scenarios has been incorporated as well.

Part of: #105820

Release note: None
  • Loading branch information
AlexTalks committed Jul 14, 2023
1 parent a6dbdee commit ee6e8c1
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4813,7 +4813,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
1: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(header *kvserverpb.SnapshotRequest_Header) error {
ReceiveSnapshot: func(_ context.Context, header *kvserverpb.SnapshotRequest_Header) error {
val := delaySnapshotTrap.Load()
if val != nil {
fn := val.(func() error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.DisableRaftSnapshotQueue = true // we'll control it ourselves
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
// We'll want a signal for when the snapshot was received by the sender.
once.Do(func() { close(blockUntilSnapshotCh) })

Expand Down
185 changes: 185 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"math/rand"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -58,10 +59,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1460,6 +1464,187 @@ func (c fakeSnapshotStream) Send(request *kvserverpb.SnapshotResponse) error {
return nil
}

type snapshotTestSignals struct {
// Receiver-side wait channels.
receiveErrCh chan error
batchReceiveReadyCh chan struct{}

// Sender-side wait channels.
svrContextDone <-chan struct{}
receiveStartedCh chan struct{}
batchReceiveStartedCh chan struct{}
receiverDoneCh chan struct{}
}

// TestReceiveSnapshotLogging tests that a snapshot receiver properly captures
// the collected tracing spans in the last response, or logs the span if the
// context is cancelled from the client side.
func TestReceiveSnapshotLogging(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const senderNodeIdx = 0
const receiverNodeIdx = 1
const dummyEventMsg = "test receive snapshot logging - dummy event"

setupTest := func(t *testing.T) (context.Context, *testcluster.TestCluster, *roachpb.RangeDescriptor, *snapshotTestSignals) {
ctx := context.Background()

signals := &snapshotTestSignals{
receiveErrCh: make(chan error),
batchReceiveReadyCh: make(chan struct{}),

svrContextDone: nil,
receiveStartedCh: make(chan struct{}),
batchReceiveStartedCh: make(chan struct{}),
receiverDoneCh: make(chan struct{}, 1),
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableRaftSnapshotQueue: true,
},
},
},
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: map[int]base.TestServerArgs{
receiverNodeIdx: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableRaftSnapshotQueue: true,
ThrottleEmptySnapshots: true,
ReceiveSnapshot: func(ctx context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
t.Logf("incoming snapshot on n2")
log.Event(ctx, dummyEventMsg)
signals.svrContextDone = ctx.Done()
close(signals.receiveStartedCh)
return <-signals.receiveErrCh
},
BeforeRecvAcceptedSnapshot: func() {
t.Logf("receiving on n2")
signals.batchReceiveStartedCh <- struct{}{}
<-signals.batchReceiveReadyCh
},
HandleSnapshotDone: func() {
t.Logf("receiver on n2 completed")
signals.receiverDoneCh <- struct{}{}
},
},
},
},
},
})

_, scratchRange, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)

return ctx, tc, &scratchRange, signals
}

snapshotAndValidateLogs := func(t *testing.T, ctx context.Context, tc *testcluster.TestCluster, rngDesc *roachpb.RangeDescriptor, signals *snapshotTestSignals, expectTraceOnSender bool) error {
t.Helper()

repl := tc.GetFirstStoreFromServer(t, senderNodeIdx).LookupReplica(rngDesc.StartKey)
chgs := kvpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(receiverNodeIdx))

testStartTs := timeutil.Now()
_, pErr := repl.ChangeReplicas(ctx, rngDesc, kvserverpb.SnapshotRequest_REBALANCE, kvserverpb.ReasonRangeUnderReplicated, "", chgs)

// When ready, flush logs and check messages from store_raft.go since
// call to repl.ChangeReplicas(..).
<-signals.receiverDoneCh
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`store_raft\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)

errRegexp, err := regexp.Compile(`incoming snapshot stream failed with error`)
require.NoError(t, err)
foundEntry := false
var entry logpb.Entry
for _, entry = range entries {
if errRegexp.MatchString(entry.Message) {
foundEntry = true
break
}
}
expectTraceOnReceiver := !expectTraceOnSender
require.Equal(t, expectTraceOnReceiver, foundEntry)
if expectTraceOnReceiver {
require.Contains(t, entry.Message, dummyEventMsg)
}

// Check that receiver traces were imported in sender's context on success.
clientTraces := tracing.SpanFromContext(ctx).GetConfiguredRecording()
_, receiverTraceFound := clientTraces.FindLogMessage(dummyEventMsg)
require.Equal(t, expectTraceOnSender, receiverTraceFound)

return pErr
}

t.Run("cancel on header", func(t *testing.T) {
ctx, tc, scratchRange, signals := setupTest(t)
defer tc.Stopper().Stop(ctx)

ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

ctx, cancel := context.WithCancel(ctx)
go func() {
<-signals.receiveStartedCh
cancel()
<-signals.svrContextDone
time.Sleep(10 * time.Millisecond)
signals.receiveErrCh <- errors.Errorf("header is bad")
}()
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, false /* expectTraceOnSender */)
require.Error(t, err)
})
t.Run("cancel during receive", func(t *testing.T) {
ctx, tc, scratchRange, signals := setupTest(t)
defer tc.Stopper().Stop(ctx)

ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

ctx, cancel := context.WithCancel(ctx)
close(signals.receiveErrCh)
go func() {
<-signals.receiveStartedCh
<-signals.batchReceiveStartedCh
cancel()
<-signals.svrContextDone
time.Sleep(10 * time.Millisecond)
close(signals.batchReceiveReadyCh)
}()
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, false /* expectTraceOnSender */)
require.Error(t, err)
})
t.Run("successful send", func(t *testing.T) {
ctx, tc, scratchRange, signals := setupTest(t)
defer tc.Stopper().Stop(ctx)

ctx, sp := tracing.EnsureChildSpan(ctx, tc.GetFirstStoreFromServer(t, senderNodeIdx).GetStoreConfig().Tracer(),
t.Name(), tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

ctx, cancel := context.WithCancel(ctx)
defer cancel()
close(signals.receiveErrCh)
close(signals.batchReceiveReadyCh)
go func() {
<-signals.receiveStartedCh
<-signals.batchReceiveStartedCh
}()
err := snapshotAndValidateLogs(t, ctx, tc, scratchRange, signals, true /* expectTraceOnSender */)
require.NoError(t, err)
})
}

// TestFailedSnapshotFillsReservation tests that failing to finish applying an
// incoming snapshot still cleans up the outstanding reservation that was made.
func TestFailedSnapshotFillsReservation(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestAddReplicaViaLearner(t *testing.T) {
var receivedSnap int64
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
if atomic.CompareAndSwapInt64(&receivedSnap, 0, 1) {
close(blockUntilSnapshotCh)
} else {
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
activateBlocking := int64(1)
var count int64
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&activateBlocking) > 0 {
// Signal waitForRebalanceToBlockCh to indicate the testing knob was hit.
close(waitForRebalanceToBlockCh)
Expand All @@ -250,7 +250,7 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
ltk.storeKnobs.BeforeSendSnapshotThrottle = func() {
atomic.AddInt64(&count, 1)
}
ltk.storeKnobs.AfterSendSnapshotThrottle = func() {
ltk.storeKnobs.AfterSnapshotThrottle = func() {
atomic.AddInt64(&count, -1)
}
ctx := context.Background()
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestDelegateSnapshotFails(t *testing.T) {
}

setupFn := func(t *testing.T,
receiveFunc func(*kvserverpb.SnapshotRequest_Header) error,
receiveFunc func(context.Context, *kvserverpb.SnapshotRequest_Header) error,
sendFunc func(*kvserverpb.DelegateSendSnapshotRequest),
processRaft func(roachpb.StoreID) bool,
) (
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestDelegateSnapshotFails(t *testing.T) {
var block atomic.Int32
tc, scratchKey := setupFn(
t,
func(h *kvserverpb.SnapshotRequest_Header) error {
func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
// TODO(abaptist): Remove this check once #96841 is fixed.
if h.SenderQueueName == kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE {
return nil
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
runTest := func(t *testing.T, replicaType roachpb.ReplicaType) {
var rejectSnapshotErr atomic.Value // error
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
if err := rejectSnapshotErr.Load().(error); err != nil {
return err
}
Expand Down Expand Up @@ -1374,7 +1374,7 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.DisableRaftSnapshotQueue = true
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
select {
case <-blockSnapshotsCh:
case <-time.After(10 * time.Second):
Expand Down Expand Up @@ -1438,7 +1438,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
blockUntilSnapshotCh := make(chan struct{}, 2)
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserverpb.SnapshotRequest_Header) error {
ltk.storeKnobs.ReceiveSnapshot = func(_ context.Context, h *kvserverpb.SnapshotRequest_Header) error {
blockUntilSnapshotCh <- struct{}{}
<-blockSnapshotsCh
return nil
Expand Down Expand Up @@ -1991,7 +1991,7 @@ func TestMergeQueueDoesNotInterruptReplicationChange(t *testing.T) {
// Disable load-based splitting, so that the absence of sufficient
// QPS measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&activateSnapshotTestingKnob) == 1 {
// While the snapshot RPC should only happen once given
// that the cluster is running under manual replication,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) {
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
Expand Down Expand Up @@ -967,7 +967,7 @@ func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
ReceiveSnapshot: func(_ context.Context, _ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,22 @@ func (s *Store) HandleDelegatedSnapshot(
func (s *Store) HandleSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream SnapshotResponseStream,
) error {
if fn := s.cfg.TestingKnobs.HandleSnapshotDone; fn != nil {
defer fn()
}
ctx = s.AnnotateCtx(ctx)
const name = "storage.Store: handle snapshot"
return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error {
s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1)

return s.receiveSnapshot(ctx, header, stream)
err := s.receiveSnapshot(ctx, header, stream)
if err != nil && ctx.Err() != nil {
// Log trace of incoming snapshot on context cancellation (e.g.
// times out or caller goes away).
log.Infof(ctx, "incoming snapshot stream failed with error: %v\ntrace:\n%v",
err, tracing.SpanFromContext(ctx).GetConfiguredRecording())
}
return err
})
}

Expand Down
Loading

0 comments on commit ee6e8c1

Please sign in to comment.