Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
34296: storage: improve message on slow Raft proposal r=petermattis a=tbg

Touches #33007.

Release note: None

34589: importccl: fix flaky test TestImportCSVStmt r=rytaft a=rytaft

`TestImportCSVStmt` tests that `IMPORT` jobs appear in a certain order
in the `system.jobs` table. Automatic statistics were causing this
test to be flaky since `CreateStats` jobs were present in the jobs
table as well, in an unpredictable order. This commit fixes the problem
by only selecting `IMPORT` jobs from the jobs table.

Fixes #34568

Release note: None

34660: storage: make RaftTruncatedState unreplicated r=bdarnell a=tbg

This isn't 100% polished yet, but generally ready for review.

----

See #34287.

Today, Raft (or preemptive) snapshots include the past Raft log, that is,
log entries which are already reflected in the state of the snapshot.
Fundamentally, this is because we have historically used a replicated
TruncatedState.

TruncatedState essentially tells us what the first index in the log is
(though it also includes a Term). If the TruncatedState cannot diverge
across replicas, we *must* send the whole log in snapshots, as the first
log index must match what the TruncatedState claims it is.

The Raft log is typically, but not necessarily small. Log truncations are
driven by a queue and use a complex decision process. That decision process
can be faulty and even if it isn't, the queue could be held up. Besides,
even when the Raft log contains only very few entries, these entries may be
quite large (see SSTable ingestion during RESTORE).

All this motivates that we don't want to (be forced to) send the Raft log
as part of snapshots, and in turn we need the TruncatedState to be
unreplicated.

This change migrates the TruncatedState into unreplicated keyspace. It does
not yet allow snapshots to avoid sending the past Raft log, but that is a
relatively straightforward follow-up change.

VersionUnreplicatedRaftTruncatedState, when active, moves the truncated
state into unreplicated keyspace on log truncations.

The migration works as follows:

1. at any log position, the replicas of a Range either use the new
(unreplicated) key or the old one, and exactly one of them exists.

2. When a log truncation evaluates under the new cluster version, it
initiates the migration by deleting the old key. Under the old cluster
version, it behaves like today, updating the replicated truncated state.

3. The deletion signals new code downstream of Raft and triggers a write to
the new, unreplicated, key (atomic with the deletion of the old key).

4. Future log truncations don't write any replicated data any more, but
(like before) send along the TruncatedState which is written downstream of
Raft atomically with the deletion of the log entries. This actually uses
the same code as 3. What's new is that the truncated state needs to be
verified before replacing a previous one. If replicas disagree about their
truncated state, it's possible for replica X at FirstIndex=100 to apply a
truncated state update that sets FirstIndex to, say, 50 (proposed by a
replica with a "longer" historical log). In that case, the truncated state
update must be ignored (this is straightforward downstream-of-Raft code).

5. When a split trigger evaluates, it seeds the RHS with the legacy key iff
the LHS uses the legacy key, and the unreplicated key otherwise. This makes
sure that the invariant that all replicas agree on the state of the
migration is upheld.

6. When a snapshot is applied, the receiver is told whether the snapshot
contains a legacy key. If not, it writes the truncated state (which is part
of the snapshot metadata) in its unreplicated version. Otherwise it doesn't
have to do anything (the range will migrate later).

The following diagram visualizes the above. Note that it abuses sequence
diagrams to get a nice layout; the vertical lines belonging to NewState and
OldState don't imply any particular ordering of operations.

```
┌────────┐                            ┌────────┐
│OldState│                            │NewState│
└───┬────┘                            └───┬────┘
   │                        Bootstrap under old version
   │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
─ ─
   │                                     │
   │                                     │     Bootstrap under new version
   │                                     │ <─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
─ ─
   │                                     │
   │─ ─ ┐
   │    | Log truncation under old version
   │< ─ ┘
   │                                     │
   │─ ─ ┐                                │
   │    | Snapshot                       │
   │< ─ ┘                                │
   │                                     │
   │                                     │─ ─ ┐
   │                                     │    | Snapshot
   │                                     │< ─ ┘
   │                                     │
   │   Log truncation under new version  │
   │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─>│
   │                                     │
   │                                     │─ ─ ┐
   │                                     │    | Log truncation under new version
   │                                     │< ─ ┘
   │                                     │
   │                                     │─ ─ ┐
   │                                     │    | Log truncation under old version
   │                                     │< ─ ┘ (necessarily running new binary)
```

Release note: None

34762: distsqlplan: fix error in union planning r=jordanlewis a=jordanlewis

Previously, if 2 inputs to a UNION ALL had identical post processing
except for renders, further post processing on top of that union all
could invalidate the plan and cause errors or crashes.

Fixes #34437.

Release note (bug fix): fix a planning crash during UNION ALL operations
that had projections, filters or renders directly on top of the UNION
ALL in some cases.

34767: sql: reuse already allocated memory for the cache in a row container r=yuzefovich a=yuzefovich

Previously, we would always allocate new memory for every row that
we put in the cache of DiskBackedIndexedRowContainer and simply
discard the memory underlying the row that we remove from the cache.
Now, we're reusing that memory.

Release note: None

34779: opt: add stats to tpch xform test r=justinj a=justinj

Since we have stats by default now, this should be the default testing
mechanism. I left in tpch-no-stats since we also have that for tpcc,
just for safety.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Justin Jaffray <[email protected]>
  • Loading branch information
6 people committed Feb 11, 2019
7 parents 235c81c + 0c36412 + db5f5db + d0aa09e + 471df55 + a132513 + 828b925 commit d93d63a
Show file tree
Hide file tree
Showing 45 changed files with 4,708 additions and 663 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-5</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-6</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
7 changes: 2 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,6 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
conn := sqlDB.DB.(*gosql.DB)
defer cleanupFn()

// Get the number of existing jobs.
baseNumJobs := jobutils.GetSystemJobsCount(t, sqlDB)

sanitizedIncDir := localFoo + "/inc"
incDir := sanitizedIncDir + "?secretCredentialsHere"

Expand All @@ -462,7 +459,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
sqlDB.Exec(t, `SET DATABASE = data`)

sqlDB.Exec(t, `BACKUP TABLE bank TO $1 INCREMENTAL FROM $2`, incDir, fullDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+1, jobspb.TypeBackup, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, 1, jobspb.TypeBackup, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`BACKUP TABLE bank TO '%s' INCREMENTAL FROM '%s'`,
Expand All @@ -477,7 +474,7 @@ func TestBackupRestoreSystemJobs(t *testing.T) {
}

sqlDB.Exec(t, `RESTORE TABLE bank FROM $1, $2 WITH OPTIONS ('into_db'='restoredb')`, fullDir, incDir)
if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+2, jobspb.TypeRestore, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, 0, jobspb.TypeRestore, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(
`RESTORE TABLE bank FROM '%s', '%s' WITH into_db = 'restoredb'`,
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,13 +867,18 @@ func TestImportCSVStmt(t *testing.T) {
if testing.Short() {
t.Skip("short")
}
t.Skip(`#34568`)

const (
nodes = 3
numFiles = nodes + 2
rowsPerFile = 1000
)
const nodes = 3

numFiles := nodes + 2
rowsPerFile := 1000
if util.RaceEnabled {
// This test takes a while with the race detector, so reduce the number of
// files and rows per file in an attempt to speed it up.
numFiles = nodes
rowsPerFile = 16
}

ctx := context.Background()
dir, cleanup := testutils.TempDir(t)
defer cleanup()
Expand All @@ -897,9 +902,6 @@ func TestImportCSVStmt(t *testing.T) {
t.Fatal(err)
}

// Get the number of existing jobs.
baseNumJobs := jobutils.GetSystemJobsCount(t, sqlDB)

if err := ioutil.WriteFile(filepath.Join(dir, "empty.csv"), nil, 0666); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1174,7 +1176,7 @@ func TestImportCSVStmt(t *testing.T) {
}
jobPrefix += `t (a INT8 PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)`

if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
if err := jobutils.VerifySystemJob(t, sqlDB, testNum, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: fmt.Sprintf(jobPrefix+tc.jobOpts, strings.Join(tc.files, ", ")),
}); err != nil {
Expand All @@ -1188,7 +1190,6 @@ func TestImportCSVStmt(t *testing.T) {
t, "does not exist",
`SELECT count(*) FROM t`,
)
testNum++
sqlDB.QueryRow(
t, `RESTORE csv.* FROM $1 WITH into_db = $2`, backupPath, intodb,
).Scan(
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func runDebugCheckStoreRaft(ctx context.Context, db *engine.RocksDB) error {
return false, err
}
getReplicaInfo(rangeID).committedIndex = hs.Commit
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix):
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix):
var trunc roachpb.RaftTruncatedState
if err := kv.Value.GetProto(&trunc); err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func tryRangeIDKey(kv engine.MVCCKeyValue) (string, error) {
case bytes.Equal(suffix, keys.LocalRaftTombstoneSuffix):
msg = &roachpb.RaftTombstone{}

case bytes.Equal(suffix, keys.LocalRaftTruncatedStateSuffix):
case bytes.Equal(suffix, keys.LocalRaftTruncatedStateLegacySuffix):
msg = &roachpb.RaftTruncatedState{}

case bytes.Equal(suffix, keys.LocalRangeLeaseSuffix):
Expand Down
5 changes: 3 additions & 2 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ var (
LocalRaftAppliedIndexLegacySuffix = []byte("rfta")
// LocalRaftTombstoneSuffix is the suffix for the raft tombstone.
LocalRaftTombstoneSuffix = []byte("rftb")
// LocalRaftTruncatedStateSuffix is the suffix for the RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")
// LocalRaftTruncatedStateLegacySuffix is the suffix for the legacy RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
LocalRaftTruncatedStateLegacySuffix = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease index.
Expand Down
23 changes: 17 additions & 6 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,10 @@ func LeaseAppliedIndexLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).LeaseAppliedIndexLegacyKey()
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey()
// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState.
// See VersionUnreplicatedRaftTruncatedState.
func RaftTruncatedStateLegacyKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateLegacyKey()
}

// RangeFrozenStatusKey returns a system-local key for the frozen status.
Expand Down Expand Up @@ -314,6 +315,11 @@ func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTombstoneKey()
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftTruncatedStateKey()
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func RaftHardStateKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftHardStateKey()
Expand Down Expand Up @@ -897,9 +903,9 @@ func (b RangeIDPrefixBuf) LeaseAppliedIndexLegacyKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalLeaseAppliedIndexLegacySuffix...)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRaftTruncatedStateSuffix...)
// RaftTruncatedStateLegacyKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateLegacyKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...)
}

// RangeFrozenStatusKey returns a system-local key for the frozen status.
Expand Down Expand Up @@ -935,6 +941,11 @@ func (b RangeIDPrefixBuf) RaftTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTombstoneSuffix...)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func (b RangeIDPrefixBuf) RaftTruncatedStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftTruncatedStateLegacySuffix...)
}

// RaftHardStateKey returns a system-local key for a Raft HardState.
func (b RangeIDPrefixBuf) RaftHardStateKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftHardStateSuffix...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestKeyAddressError(t *testing.T) {
AbortSpanKey(0, uuid.MakeV4()),
RaftTombstoneKey(0),
RaftAppliedIndexLegacyKey(0),
RaftTruncatedStateKey(0),
RaftTruncatedStateLegacyKey(0),
RangeLeaseKey(0),
RangeStatsLegacyKey(0),
RaftHardStateKey(0),
Expand Down
39 changes: 4 additions & 35 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ var (
ppFunc: raftLogKeyPrint,
psFunc: raftLogKeyParse,
},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateSuffix},
{name: "RaftTruncatedState", suffix: LocalRaftTruncatedStateLegacySuffix},
{name: "RaftLastIndex", suffix: LocalRaftLastIndexSuffix},
{name: "RangeLastReplicaGCTimestamp", suffix: LocalRangeLastReplicaGCTimestampSuffix},
{name: "RangeLastVerificationTimestamp", suffix: LocalRangeLastVerificationTimestampSuffixDeprecated},
Expand Down Expand Up @@ -563,40 +563,9 @@ func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRaw
return str
}

// PrettyPrint prints the key in a human readable format:
//
// Key format Key value
// /Local/... "\x01"+...
// /Store/... "\x01s"+...
// /RangeID/... "\x01s"+[rangeid]
// /[rangeid]/AbortSpan/[id] "\x01s"+[rangeid]+"abc-"+[id]
// /[rangeid]/Lease "\x01s"+[rangeid]+"rfll"
// /[rangeid]/RaftTombstone "\x01s"+[rangeid]+"rftb"
// /[rangeid]/RaftHardState "\x01s"+[rangeid]+"rfth"
// /[rangeid]/RaftAppliedIndex "\x01s"+[rangeid]+"rfta"
// /[rangeid]/RaftLog/logIndex:[logIndex] "\x01s"+[rangeid]+"rftl"+[logIndex]
// /[rangeid]/RaftTruncatedState "\x01s"+[rangeid]+"rftt"
// /[rangeid]/RaftLastIndex "\x01s"+[rangeid]+"rfti"
// /[rangeid]/RangeLastReplicaGCTimestamp "\x01s"+[rangeid]+"rlrt"
// /[rangeid]/RangeLastVerificationTimestamp "\x01s"+[rangeid]+"rlvt"
// /[rangeid]/RangeStats "\x01s"+[rangeid]+"stat"
// /Range/... "\x01k"+...
// [key]/RangeDescriptor "\x01k"+[key]+"rdsc"
// [key]/Transaction/[id] "\x01k"+[key]+"txn-"+[txn-id]
// [key]/QueueLastProcessed/[queue] "\x01k"+[key]+"qlpt"+[queue]
// /Local/Max "\x02"
//
// /Meta1/[key] "\x02"+[key]
// /Meta2/[key] "\x03"+[key]
// /System/... "\x04"
// /NodeLiveness/[key] "\x04\0x00liveness-"+[key]
// /StatusNode/[key] "\x04status-node-"+[key]
// /System/Max "\x05"
//
// /Table/[key] [key]
//
// /Min ""
// /Max "\xff\xff"
// PrettyPrint prints the key in a human readable format, see TestPrettyPrint.
// The output does not indicate whether a key is part of the replicated or un-
// replicated keyspace.
//
// valDirs correspond to the encoding direction of each encoded value in key.
// For example, table keys could have column values encoded in ascending or
Expand Down
8 changes: 7 additions & 1 deletion pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestPrettyPrint(t *testing.T) {
{RangeAppliedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeAppliedState"},
{RaftAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftAppliedIndex"},
{LeaseAppliedIndexLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/LeaseAppliedIndex"},
{RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"},
{RaftTruncatedStateLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTruncatedState"},
{RaftTruncatedStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftTruncatedState"},
{RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease"},
{RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats"},
{RangeTxnSpanGCThresholdKey(roachpb.RangeID(1000001)), `/Local/RangeID/1000001/r/RangeTxnSpanGCThreshold`},
Expand Down Expand Up @@ -184,6 +185,11 @@ func TestPrettyPrint(t *testing.T) {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
keyInfo := MassagePrettyPrintedSpanForTest(PrettyPrint(nil /* valDirs */, test.key), nil)
exp := MassagePrettyPrintedSpanForTest(test.exp, nil)
t.Logf(`---- test case #%d:
input: %q
output: %s
exp: %s
`, i+1, []byte(test.key), keyInfo, exp)
if exp != keyInfo {
t.Errorf("%d: expected %s, got %s", i, exp, keyInfo)
}
Expand Down
Loading

0 comments on commit d93d63a

Please sign in to comment.