Skip to content

Commit

Permalink
kv/storage: add cluster version gates for local timestamp
Browse files Browse the repository at this point in the history
This commit adds a cluster version gate and a cluster setting for
local timestamps, to assist with their migration into an existing
cluster.

This fixes mixed-version clusters' interaction with local timestamps.
  • Loading branch information
nvanbenschoten committed Apr 15, 2022
1 parent b40e44b commit fb24b1a
Show file tree
Hide file tree
Showing 11 changed files with 3,040 additions and 22 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func registerAcceptance(r registry.Registry) {
},
{
name: "version-upgrade",
skip: "WIP: unskip when version checks are added to local_timestamp writes",
fn: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runVersionUpgrade(ctx, t, c)
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ func (s spanSetWriter) LogLogicalOp(
s.w.LogLogicalOp(op, details)
}

func (s spanSetWriter) ShouldWriteLocalTimestamps(ctx context.Context) bool {
return s.w.ShouldWriteLocalTimestamps(ctx)
}

// ReadWriter is used outside of the spanset package internally, in ccl.
type ReadWriter struct {
spanSetReader
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ type Writer interface {
//
// It is safe to modify the contents of the arguments after it returns.
SingleClearEngineKey(key EngineKey) error

// ShouldWriteLocalTimestamps is only for internal use in the storage package.
// This method is temporary, to handle the transition from clusters where not
// all nodes understand local timestamps.
ShouldWriteLocalTimestamps(ctx context.Context) bool
}

// ReadWriter is the read/write interface to an engine's data.
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/intent_reader_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ type intentDemuxWriter struct {
w Writer
}

func wrapIntentWriter(ctx context.Context, w Writer) intentDemuxWriter {
idw := intentDemuxWriter{w: w}
return idw
func wrapIntentWriter(w Writer) intentDemuxWriter {
return intentDemuxWriter{w: w}
}

// ClearIntent has the same behavior as Writer.ClearIntent. buf is used as
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/intent_reader_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestIntentDemuxWriter(t *testing.T) {
// This is a low-level test that explicitly wraps the writer, so it
// doesn't matter how the original call to createTestPebbleEngine
// behaved in terms of separated intents config.
w = wrapIntentWriter(context.Background(), &pw)
w = wrapIntentWriter(&pw)
return ""
case "put-intent":
pw.reset()
Expand Down
16 changes: 12 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,7 +1597,11 @@ func mvccPutInternal(
}
buf.newMeta.Txn = txnMeta
buf.newMeta.Timestamp = writeTimestamp.ToLegacyTimestamp()
buf.newMeta.SetLocalTimestamp(localTimestamp, &buf.newLocalTs)
if writer.ShouldWriteLocalTimestamps(ctx) {
buf.newMeta.SetLocalTimestamp(localTimestamp, &buf.newLocalTs)
} else {
buf.newMeta.LocalTimestamp = nil
}
}
newMeta := &buf.newMeta

Expand Down Expand Up @@ -3084,9 +3088,13 @@ func mvccResolveWriteIntent(
// resolver provides a clock observation from this node that was captured
// while the transaction was still pending, in which case it can be advanced
// to the observed timestamp.
localTs := latestKey.LocalTimestamp
localTs.Forward(intent.ClockWhilePending.Timestamp)
buf.newMeta.SetLocalTimestamp(localTs, &buf.newLocalTs)
if rw.ShouldWriteLocalTimestamps(ctx) {
localTs := latestKey.LocalTimestamp
localTs.Forward(intent.ClockWhilePending.Timestamp)
buf.newMeta.SetLocalTimestamp(localTs, &buf.newLocalTs)
} else {
buf.newMeta.LocalTimestamp = nil
}

// Update or remove the metadata key.
var metaKeySize, metaValSize int64
Expand Down
13 changes: 5 additions & 8 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -99,18 +98,16 @@ func TestMVCCHistories(t *testing.T) {

datadriven.Walk(t, testutils.TestDataPath(t, "mvcc_histories"), func(t *testing.T, path string) {
// We start from a clean slate in every test file.
engine, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */),
func(cfg *engineConfig) error {
// Latest cluster version, since these tests are not ones where we
// are examining differences related to separated intents.
cfg.Settings = cluster.MakeTestingClusterSettings()
return nil
})
engine, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */))
if err != nil {
t.Fatal(err)
}
defer engine.Close()

if strings.Contains(path, "_disable_local_timestamps") {
localTimestampsEnabled.Override(ctx, &engine.settings.SV, false)
}

reportDataEntries := func(buf *redact.StringBuilder) error {
hasData := false
err := engine.MVCCIterate(span.Key, span.EndKey, MVCCKeyAndIntentsIterKind, func(r MVCCKeyValue) error {
Expand Down
49 changes: 46 additions & 3 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (*Pebble, error) {
p.makeMetricEtcEventListener(ctx),
)
p.eventListener = &cfg.Opts.EventListener
p.wrappedIntentWriter = wrapIntentWriter(ctx, p)
p.wrappedIntentWriter = wrapIntentWriter(p)

// Read the current store cluster version.
storeClusterVersion, err := getMinVersion(unencryptedFS, cfg.Dir)
Expand Down Expand Up @@ -1298,6 +1298,45 @@ func (p *Pebble) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetails
// No-op. Logical logging disabled.
}

// localTimestampsEnabled controls whether local timestamps are written on MVCC
// keys. A true setting is also gated on clusterversion.TODO. After all nodes in
// a cluster are at or beyond clusterversion.TODO, different nodes will see the
// version state transition at different times. Nodes that have not yet seen the
// transition may remove the local timestamp from an intent that has one during
// intent resolution. This will not cause problems.
//
// TODO(nvanbenschoten): remove this cluster setting and its associated plumbing
// when removing the cluster version, once we're confident in the efficacy and
// stability of local timestamps.
var localTimestampsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.transaction.local_timestamps.enabled",
"if enabled, MVCC keys will be written with local timestamps",
true,
)

func shouldWriteLocalTimestamps(ctx context.Context, settings *cluster.Settings) bool {
if !localTimestampsEnabled.Get(&settings.SV) {
// Not enabled.
return false
}
ver := settings.Version.ActiveVersionOrEmpty(ctx)
if ver == (clusterversion.ClusterVersion{}) {
// Some tests fail to configure settings. In these cases, assume that it
// is safe to write local timestamps.
return true
}
// TODO(nvanbenschoten): add a new cluster version when the time comes.
return ver.IsActive(clusterversion.SeedSpanCountTable)
}

// ShouldWriteLocalTimestamps implements the Writer interface.
func (p *Pebble) ShouldWriteLocalTimestamps(ctx context.Context) bool {
// This is not fast. Pebble should not be used by writers that want
// performance. They should use pebbleBatch.
return shouldWriteLocalTimestamps(ctx, p.settings)
}

// Attrs implements the Engine interface.
func (p *Pebble) Attrs() roachpb.Attributes {
return p.attrs
Expand Down Expand Up @@ -1534,7 +1573,7 @@ func (p *Pebble) GetAuxiliaryDir() string {

// NewBatch implements the Engine interface.
func (p *Pebble) NewBatch() Batch {
return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */)
return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings)
}

// NewReadOnly implements the Engine interface.
Expand All @@ -1544,7 +1583,7 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter {

// NewUnindexedBatch implements the Engine interface.
func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), writeOnly)
return newPebbleBatch(p.db, p.db.NewBatch(), writeOnly, p.settings)
}

// NewSnapshot implements the Engine interface.
Expand Down Expand Up @@ -2104,6 +2143,10 @@ func (p *pebbleReadOnly) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO
panic("not implemented")
}

func (p *pebbleReadOnly) ShouldWriteLocalTimestamps(ctx context.Context) bool {
panic("not implemented")
}

// pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot().
type pebbleSnapshot struct {
snapshot *pebble.Snapshot
Expand Down
17 changes: 15 additions & 2 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -56,6 +57,8 @@ type pebbleBatch struct {
wrappedIntentWriter intentDemuxWriter
// scratch space for wrappedIntentWriter.
scratch []byte

shouldWriteLocalTimestamps bool
}

var _ Batch = &pebbleBatch{}
Expand All @@ -67,7 +70,9 @@ var pebbleBatchPool = sync.Pool{
}

// Instantiates a new pebbleBatch.
func newPebbleBatch(db *pebble.DB, batch *pebble.Batch, writeOnly bool) *pebbleBatch {
func newPebbleBatch(
db *pebble.DB, batch *pebble.Batch, writeOnly bool, settings *cluster.Settings,
) *pebbleBatch {
pb := pebbleBatchPool.Get().(*pebbleBatch)
*pb = pebbleBatch{
db: db,
Expand All @@ -94,8 +99,11 @@ func newPebbleBatch(db *pebble.DB, batch *pebble.Batch, writeOnly bool) *pebbleB
reusable: true,
},
writeOnly: writeOnly,

// pebbleBatch is short-lived, so cache the value for performance.
shouldWriteLocalTimestamps: shouldWriteLocalTimestamps(context.Background(), settings),
}
pb.wrappedIntentWriter = wrapIntentWriter(context.Background(), pb)
pb.wrappedIntentWriter = wrapIntentWriter(pb)
return pb
}

Expand Down Expand Up @@ -528,3 +536,8 @@ func (p *pebbleBatch) Repr() []byte {
copy(reprCopy, repr)
return reprCopy
}

// ShouldWriteLocalTimestamps implements the Writer interface.
func (p *pebbleBatch) ShouldWriteLocalTimestamps(ctx context.Context) bool {
return p.shouldWriteLocalTimestamps
}
5 changes: 5 additions & 0 deletions pkg/storage/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ func (fw *SSTWriter) Close() {
fw.fw = nil
}

// ShouldWriteLocalTimestamps implements the Writer interface.
func (fw *SSTWriter) ShouldWriteLocalTimestamps(context.Context) bool {
return false
}

// MemFile is a file-like struct that buffers all data written to it in memory.
// Implements the writeCloseSyncer interface and is intended for use with
// SSTWriter.
Expand Down
Loading

0 comments on commit fb24b1a

Please sign in to comment.