Skip to content

Commit

Permalink
kvserver: disconnect rangefeeds on AddSSTable
Browse files Browse the repository at this point in the history
This patch disconnects rangefeeds on `AddSSTable` application with
`REASON_ADDSSTABLE`. This allows consumers to reconnect and run a
catchup scan to see the new writes.

The rangefeed is only disconnected when `WriteAtRequestTimestamp` is
enabled for the `AddSSTable` request. Otherwise the SSTable may write
below the closed timestamp, and so a catchup scan may not see the
changes anyway.

Since we have to disconnect the rangefeed even for `IngestAsWrites`,
which does not produce a replicated `AddSSTable` result (there is no SST
to link), a new field `ReplicatedEvalResult.DisconnectRangeFeed` is
added as a general mechanism to disconnect the rangefeed. This is not
used elsewhere, to retain backwards compatibility with 21.2 followers,
but `WriteAtRequestTimestamp` requires a 22.1 `MVCCAddSSTable` version
gate and can therefore safely rely on this field.

Release note: None
  • Loading branch information
erikgrinaker committed Nov 25, 2021
1 parent ebd9bed commit 6f2891d
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 302 deletions.
24 changes: 23 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ func EvalAddSSTable(

ms.Add(stats)

// With WriteAtRequestTimestamp, we respect the closed timestamp, so we
// disconnect the rangefeed to allow consumers to run a catchup scan covering
// the new entries. We don't do this otherwise, since the consumer wouldn't
// see any writes below the closed timestamp anyway.
var rangeFeedErr *roachpb.RangeFeedRetryError
if args.WriteAtRequestTimestamp {
rangeFeedErr = &roachpb.RangeFeedRetryError{
Reason: roachpb.RangeFeedRetryError_REASON_ADDSSTABLE,
}
}

if args.IngestAsWrites {
span.RecordStructured(&types.StringValue{Value: fmt.Sprintf("ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))})
log.VEventf(ctx, 2, "ingesting SST (%d keys/%d bytes) via regular write batch", stats.KeyCount, len(sst))
Expand All @@ -242,7 +253,17 @@ func EvalAddSSTable(
}
sstIter.Next()
}
return result.Result{}, nil
// We could emit logical ops for the rangefeed here. However, that would
// require using MVCCful storage.MVCCPut() operations above, which change
// the semantics and performance of IngestAsWrites to check for and respect
// key conflicts (i.e. don't write below existing keys). Since we want
// identical behavior as a regular AddSSTable, including blind writes, we
// extend that to disconnecting the rangefeed as well.
return result.Result{
Replicated: kvserverpb.ReplicatedEvalResult{
DisconnectRangeFeed: rangeFeedErr,
},
}, nil
}

return result.Result{
Expand All @@ -251,6 +272,7 @@ func EvalAddSSTable(
Data: sst,
CRC32: util.CRC32(sst),
},
DisconnectRangeFeed: rangeFeedErr,
},
}, nil
}
77 changes: 77 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,83 @@ func TestEvalAddSSTable(t *testing.T) {
})
}

// TestEvalAddSSTableRangefeed tests EvalAddSSTable rangefeed-related
// behavior.
func TestEvalAddSSTableRangefeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
sst []mvccKV
atReqTS bool // WriteAtRequestTimestamp
asWrites bool // IngestAsWrites
expectDisconnect bool
expectLogicalOps []enginepb.MVCCLogicalOp
}{
"Default does not affect rangefeed": {
sst: []mvccKV{{"a", 1, "a1"}},
expectDisconnect: false,
expectLogicalOps: nil,
},
"WriteAtRequestTimestamp disconnects rangefeed": {
sst: []mvccKV{{"a", 1, "a1"}},
atReqTS: true,
expectDisconnect: true,
},
"IngestAsWrites alone does not affect rangefeed": {
sst: []mvccKV{{"a", 1, "a1"}},
asWrites: true,
expectDisconnect: false,
expectLogicalOps: nil,
},
"IngestAsWrites and WriteAtRequestTimestamp disconnects rangefeed without logical ops": {
sst: []mvccKV{{"a", 1, "a1"}},
asWrites: true,
atReqTS: true,
expectDisconnect: true,
expectLogicalOps: nil,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
ctx := context.Background()

engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
opLogger := storage.NewOpLoggerBatch(engine.NewBatch())

// Build and add SST.
sst, start, end := makeSST(t, tc.sst)
result, err := batcheval.EvalAddSSTable(ctx, opLogger, batcheval.CommandArgs{
EvalCtx: (&batcheval.MockEvalCtx{ClusterSettings: st}).EvalContext(),
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: 10},
},
Stats: &enginepb.MVCCStats{},
Args: &roachpb.AddSSTableRequest{
RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end},
Data: sst,
MVCCStats: sstStats(t, sst),
WriteAtRequestTimestamp: tc.atReqTS,
IngestAsWrites: tc.asWrites,
},
}, &roachpb.AddSSTableResponse{})
require.NoError(t, err)

if tc.expectDisconnect {
require.NotNil(t, result.Replicated.DisconnectRangeFeed)
require.Equal(t, roachpb.RangeFeedRetryError_REASON_ADDSSTABLE, result.Replicated.DisconnectRangeFeed.Reason)
} else if result.Replicated.AddSSTable != nil {
require.Nil(t, result.Replicated.DisconnectRangeFeed)
}

require.Equal(t, tc.expectLogicalOps, opLogger.LogicalOps())
})
}
}

// TestDBAddSSTable tests application of an SST to a database, both in-memory
// and on disk.
func TestDBAddSSTable(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Replicated.PriorReadSummary = nil

if p.Replicated.DisconnectRangeFeed == nil {
p.Replicated.DisconnectRangeFeed = q.Replicated.DisconnectRangeFeed
}
q.Replicated.DisconnectRangeFeed = nil

if p.Local.EncounteredIntents == nil {
p.Local.EncounteredIntents = q.Local.EncounteredIntents
} else {
Expand Down
Loading

0 comments on commit 6f2891d

Please sign in to comment.