Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: make RaftTruncatedState unreplicated #34660

Merged
merged 5 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-6</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
return false, err
}
getReplicaInfo(rangeID).committedIndex = hs.Commit
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix):
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix):
var trunc roachpb.RaftTruncatedState
if err := kv.Value.GetProto(&trunc); err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func tryRangeIDKey(kv engine.MVCCKeyValue) (string, error) {
case bytes.Equal(suffix, keys.LocalRaftTombstoneSuffix):
msg = &roachpb.RaftTombstone{}

case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix):
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix):
msg = &roachpb.RaftTruncatedState{}

case bytes.Equal(suffix, keys.LocalRangeLeaseSuffix):
Expand Down
5 changes: 3 additions & 2 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ var (
LocalRaftAppliedIndexLegacySuffix = []byte("rfta")
// LocalRaftTombstoneSuffix is the suffix for the raft tombstone.
LocalRaftTombstoneSuffix = []byte("rftb")
// LocalRaftTruncatedStateSuffix is the suffix for the RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")
// LocalRaftTruncatedStateLegacySuffix is the suffix for the legacy RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
LocalRaftTruncatedStateLegacySuffix = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease index.
Expand Down
23 changes: 17 additions & 6 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,10 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).LeaseAppliedIndexLegacyKey()
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey()
// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey()
}

// RangeFrozenStatusKey returns a system-local key for the frozen status.
Expand Down Expand Up @@ -314,6 +315,11 @@ func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTombstoneKey()
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey()
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func RaftHardStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftHardStateKey()
Expand Down Expand Up @@ -897,9 +903,9 @@ func (b RangeIDPrefixBuf) LeaseAppliedIndexLegacyKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalLeaseAppliedIndexLegacySuffix...)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRaftTruncatedStateSuffix...)
// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateLegacyKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...)
}

// RangeFrozenStatusKey returns a system-local key for the frozen status.
Expand Down Expand Up @@ -935,6 +941,11 @@ func (b RangeIDPrefixBuf) RaftTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTombstoneSuffix...)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...)
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func (b RangeIDPrefixBuf) RaftHardStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftHardStateSuffix...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestKeyAddressError(t *testing.T) {
AbortSpanKey(0, uuid.MakeV4()),
RaftTombstoneKey(0),
RaftAppliedIndexLegacyKey(0),
RaftTruncatedStateKey(0),
RaftTruncatedStateLegacyKey(0),
RangeLeaseKey(0),
RangeStatsLegacyKey(0),
RaftHardStateKey(0),
Expand Down
39 changes: 4 additions & 35 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ var (
ppFunc: raftLogKeyPrint,
psFunc: raftLogKeyParse,
},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateSuffix},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix},
{name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix},
{name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix},
{name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated},
Expand Down Expand Up @@ -563,40 +563,9 @@ func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRaw
return str
}

// PrettyPrint prints the key in a human readable format:
//
// Key format Key value
// /Local/... "\x01"+...
// /Store/... "\x01s"+...
// /RangeID/... "\x01s"+[rangeid]
// /[rangeid]/AbortSpan/[id] "\x01s"+[rangeid]+"abc-"+[id]
// /[rangeid]/Lease "\x01s"+[rangeid]+"rfll"
// /[rangeid]/RaftTombstone "\x01s"+[rangeid]+"rftb"
// /[rangeid]/RaftHardState "\x01s"+[rangeid]+"rfth"
// /[rangeid]/RaftAppliedIndex "\x01s"+[rangeid]+"rfta"
// /[rangeid]/RaftLog/logIndex:[logIndex] "\x01s"+[rangeid]+"rftl"+[logIndex]
// /[rangeid]/RaftTruncatedState "\x01s"+[rangeid]+"rftt"
// /[rangeid]/RaftLastIndex "\x01s"+[rangeid]+"rfti"
// /[rangeid]/RangeLastReplicaGCTimestamp "\x01s"+[rangeid]+"rlrt"
// /[rangeid]/RangeLastVerificationTimestamp "\x01s"+[rangeid]+"rlvt"
// /[rangeid]/RangeStats "\x01s"+[rangeid]+"stat"
// /Range/... "\x01k"+...
// [key]/RangeDescriptor "\x01k"+[key]+"rdsc"
// [key]/Transaction/[id] "\x01k"+[key]+"txn-"+[txn-id]
// [key]/QueueLastProcessed/[queue] "\x01k"+[key]+"qlpt"+[queue]
// /Local/Max "\x02"
//
// /Meta1/[key] "\x02"+[key]
// /Meta2/[key] "\x03"+[key]
// /System/... "\x04"
// /NodeLiveness/[key] "\x04\0x00liveness-"+[key]
// /StatusNode/[key] "\x04status-node-"+[key]
// /System/Max "\x05"
//
// /Table/[key] [key]
//
// /Min ""
// /Max "\xff\xff"
// PrettyPrint prints the key in a human readable format, see TestPrettyPrint.
// The output does not indicate whether a key is part of the replicated or un-
// replicated keyspace.
//
// valDirs correspond to the encoding direction of each encoded value in key.
// For example, table keys could have column values encoded in ascending or
Expand Down
8 changes: 7 additions & 1 deletion pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestPrettyPrint(t *testing.T) {
{RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"},
{RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"},
{LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"},
{RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"},
{RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"},
{RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState"},
{RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"},
{RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"},
{RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`},
Expand Down Expand Up @@ -184,6 +185,11 @@ func TestPrettyPrint(t *testing.T) {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
keyInfo := MassagePrettyPrintedSpanForTest(PrettyPrint(nil /* valDirs */, test.key), nil)
exp := MassagePrettyPrintedSpanForTest(test.exp, nil)
t.Logf(`---- test case #%d:
input: %q
output: %s
exp: %s
`, i+1, []byte(test.key), keyInfo, exp)
if exp != keyInfo {
t.Errorf("%d: expected %s, got %s", i, exp, keyInfo)
}
Expand Down
172 changes: 172 additions & 0 deletions pkg/server/version_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

type testClusterWithHelpers struct {
Expand Down Expand Up @@ -191,6 +195,174 @@ func TestClusterVersionPersistedOnJoin(t *testing.T) {
}
}

// TestClusterVersionUnreplicatedRaftTruncatedState exercises the
// VersionUnreplicatedRaftTruncatedState migration in as much detail as possible
// in a unit test.
//
// It starts a four node cluster with a pre-migration version and upgrades into
// the new version while traffic and scattering are active, verifying that the
// truncated states are rewritten.
func TestClusterVersionUnreplicatedRaftTruncatedState(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

dir, finish := testutils.TempDir(t)
defer finish()

oldVersion := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState - 1)
oldVersionS := oldVersion.String()
newVersionS := cluster.VersionByKey(cluster.VersionUnreplicatedRaftTruncatedState).String()

// Four node cluster in which all versions support newVersion (i.e. would in
// principle upgrade to it) but are bootstrapped at oldVersion.
versions := [][2]string{
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
{oldVersionS, newVersionS},
}

bootstrapVersion := cluster.ClusterVersion{Version: oldVersion}

knobs := base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BootstrapVersion: &bootstrapVersion,
},
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
},
}

tc := setupMixedCluster(t, knobs, versions, dir)
defer tc.TestCluster.Stopper().Stop(ctx)

if _, err := tc.ServerConn(0).Exec(`
CREATE TABLE kv (id INT PRIMARY KEY, v INT);
ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1, 9) AS g(i);
`); err != nil {
t.Fatal(err)
}

scatter := func() {
t.Helper()
if _, err := tc.ServerConn(0).Exec(
`ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i%$1+1], i FROM generate_series(0, 9) AS g(i)`, len(versions),
); err != nil {
t.Log(err)
}
}

var n int
insert := func() {
t.Helper()
n++
// Write only to a subset of our ranges to guarantee log truncations there.
_, err := tc.ServerConn(0).Exec(`UPSERT INTO kv VALUES($1, $2)`, n%2, n)
if err != nil {
t.Fatal(err)
}
}

for i := 0; i < 500; i++ {
insert()
}
scatter()

for _, server := range tc.Servers {
assert.NoError(t, server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error {
s.VisitReplicas(func(r *storage.Replica) bool {
key := keys.RaftTruncatedStateKey(r.RangeID)
var truncState roachpb.RaftTruncatedState
found, err := engine.MVCCGetProto(
context.Background(), s.Engine(), key,
hlc.Timestamp{}, &truncState, engine.MVCCGetOptions{},
)
if err != nil {
t.Fatal(err)
}
if found {
t.Errorf("unexpectedly found unreplicated TruncatedState at %s", key)
}
return true // want more
})
return nil
}))
}

if v := tc.getVersionFromSelect(0); v != oldVersionS {
t.Fatalf("running %s, wanted %s", v, oldVersionS)
}

assert.NoError(t, tc.setVersion(0, newVersionS))
for i := 0; i < 500; i++ {
insert()
}
scatter()

for _, server := range tc.Servers {
testutils.SucceedsSoon(t, func() error {
err := server.GetStores().(*storage.Stores).VisitStores(func(s *storage.Store) error {
// We scattered and so old copies of replicas may be laying around.
// If we're not proactive about removing them, the test gets pretty
// slow because those replicas aren't caught up any more.
s.MustForceReplicaGCScanAndProcess()
var err error
s.VisitReplicas(func(r *storage.Replica) bool {
snap := s.Engine().NewSnapshot()
defer snap.Close()

keyLegacy := keys.RaftTruncatedStateLegacyKey(r.RangeID)
keyUnreplicated := keys.RaftTruncatedStateKey(r.RangeID)

if found, innerErr := engine.MVCCGetProto(
context.Background(), snap, keyLegacy,
hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
); innerErr != nil {
t.Fatal(innerErr)
} else if found {
if err == nil {
err = errors.New("found legacy TruncatedState")
}
err = errors.Wrap(err, r.String())

// Force a log truncation to prove that this rectifies
// the situation.
status := r.RaftStatus()
if status != nil {
desc := r.Desc()
truncate := &roachpb.TruncateLogRequest{}
truncate.Key = desc.StartKey.AsRawKey()
truncate.RangeID = desc.RangeID
truncate.Index = status.HardState.Commit
var ba roachpb.BatchRequest
ba.RangeID = r.RangeID
ba.Add(truncate)
if _, err := s.DB().NonTransactionalSender().Send(ctx, ba); err != nil {
t.Fatal(err)
}
}
return true // want more
}

if found, err := engine.MVCCGetProto(
context.Background(), snap, keyUnreplicated,
hlc.Timestamp{}, nil, engine.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
} else if !found {
// We can't have neither of the keys present.
t.Fatalf("%s: unexpectedly did not find unreplicated TruncatedState at %s", r, keyUnreplicated)
}

return true // want more
})
return err
})
return err
})
}
}

func TestClusterVersionUpgrade(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
Loading