Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107302: storage: add method to ingest external files, rename IngestExternalFiles r=RaduBerinde a=itsbilal

Requires cockroachdb/pebble#2753

This change renames the existing IngestExternalFiles method on storage.Engine to IngestLocalFiles, and adds a new IngestExternalFiles that ingests pebble.ExternalFile, for use with online restore.

Depends on cockroachdb/pebble#2753.

Epic: none

Release note: None

108402: serverutils: remove ad-hoc code from StartNewTestCluster r=yuzefovich a=knz

This function is a convenience alias for NewTestCluster+Start. This should not contain custom logic specific to certain tests. Any custom logic should be conditional on testing knobs and put inside `(*testcluster.TestCluster).Start()` instead.

(The code removed here was mistakenly added in the wrong place in 70f85cd).

Release note: None
Needed for #107986.
Epic: CRDB-18499

108446: kv: skip TestConstraintConformanceReportIntegration under deadlock r=erikgrinaker a=nvanbenschoten

Fixes #108430.

This commit avoids flakiness in `TestConstraintConformanceReportIntegration` by skipping the test under deadlock builds. It has been observed to run slowly and flake under stress, and we see the same kinds of behavior under deadlock builds.

Release notes: None

108451: schemachanger: Refactor tests for concurrent schema changer behaviors r=Xiang-Gu a=Xiang-Gu

1. It cleans up some redundant tests about concurrent schema changer behavior and refactor in a new simpler, cleaner test
2. It adds an integration style test for testing concurrent schema change behaviors where we run many schema changes for an extended period of time and assert that all of they eventually succeed and the descriptors end up in the expected state.

Fix #108140
Fix #107223

Epic: None
Release note: None

108492: kv: remove errSavepointInvalidAfterTxnRestart r=knz a=nvanbenschoten

This commit simplifies logic in `checkSavepointLocked`.

Epic: None
Release note: None

108497: sql: don't start default test tenant in MT admin function tests r=yuzefovich a=yuzefovich

These tests themselves start multiple tenants, so there is no need to create a default test tenant (doing that also makes it a bit more confusing because the default tenant as well as the first test tenant share the same TenantID effectively making it two SQL pod config, which is confusing). Starting the default test tenant was enabled recently in c899661 when we enabled the CCL license, and we have seen at least one confusing failure that is possibly related to this.

Starting the default test tenant was originally added in cfa4375, but I don't see a good reason for it.

This PR is opportunistic fix of #108081.

Fixes: #108081.

Release note: None

108502: kvstreamer: add more assertions to RequestsProvider.enqueue r=yuzefovich a=michae2

If we ever enqueue zero-length requests, it could cause a deadlock where the `workerCoordinator` is waiting for more requests and the enqueuer is waiting for results. Add assertions that we never do this.

Informs: #101823
Release note: None

108517: roachtest: pin liveness lease to live node in lease prefs test r=erikgrinaker a=kvoli

The lease preferences roachtest could occasionally fail, if the liveness leaseholder were on a stopped node. We should address this issue, for now, pin the liveness lease to a live node to prevent flakes.

Informs: #108512
Resolves: #108425
Release note: None

Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Michael Erickson <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
8 people committed Aug 10, 2023
9 parents c0fbeb3 + d9573d2 + 6f3652a + 5975d2d + c02d1f3 + 7f6f9c3 + b661394 + 1e3cefe + 31b24e5 commit f194f92
Show file tree
Hide file tree
Showing 22 changed files with 488 additions and 840 deletions.
8 changes: 8 additions & 0 deletions pkg/cmd/roachtest/tests/lease_preferences.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ func runLeasePreferences(
// enforcement.
require.NoError(t, WaitForReplication(ctx, t, conn, spec.replFactor))

// Set a lease preference for the liveness range, to be on n5. This test
// would occasionally fail due to the liveness heartbeat failures, when the
// liveness lease is on a stopped node. This is not ideal behavior, #108512.
configureZone(t, ctx, conn, "RANGE liveness", zoneConfig{
replicas: spec.replFactor,
leaseNode: 5,
})

t.L().Printf("setting lease preferences: %s", spec.preferences)
setLeasePreferences(ctx, spec.preferences)
t.L().Printf("waiting for initial lease preference conformance")
Expand Down
37 changes: 11 additions & 26 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// savepoint captures the state in the TxnCoordSender necessary to restore that
Expand Down Expand Up @@ -121,15 +122,8 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
}

sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
err := tc.checkSavepointLocked(sp, "rollback to")
if err != nil {
if errors.Is(err, errSavepointInvalidAfterTxnRestart) {
err = kvpb.NewTransactionRetryWithProtoRefreshError(
"cannot rollback to savepoint after a transaction restart",
tc.mu.txn.ID,
tc.mu.txn,
)
}
return err
}

Expand Down Expand Up @@ -165,15 +159,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
}

sp := s.(*savepoint)
err := tc.checkSavepointLocked(sp)
if errors.Is(err, errSavepointInvalidAfterTxnRestart) {
err = kvpb.NewTransactionRetryWithProtoRefreshError(
"cannot release savepoint after a transaction restart",
tc.mu.txn.ID,
tc.mu.txn,
)
}
return err
return tc.checkSavepointLocked(sp, "release")
}

type errSavepointOperationInErrorTxn struct{}
Expand All @@ -193,23 +179,22 @@ func (tc *TxnCoordSender) assertNotFinalized() error {
return nil
}

var errSavepointInvalidAfterTxnRestart = errors.New("savepoint invalid after transaction restart")

// checkSavepointLocked checks whether the provided savepoint is still valid.
// Returns errSavepointInvalidAfterTxnRestart if the savepoint is not an
// Returns a TransactionRetryWithProtoRefreshError if the savepoint is not an
// "initial" one and the transaction has restarted since the savepoint was
// created.
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint) error {
func (tc *TxnCoordSender) checkSavepointLocked(s *savepoint, op redact.SafeString) error {
// Only savepoints taken before any activity are allowed to be used after a
// transaction restart.
if s.Initial() {
return nil
}
if s.txnID != tc.mu.txn.ID {
return errSavepointInvalidAfterTxnRestart
}
if s.epoch != tc.mu.txn.Epoch {
return errSavepointInvalidAfterTxnRestart
if s.txnID != tc.mu.txn.ID || s.epoch != tc.mu.txn.Epoch {
return kvpb.NewTransactionRetryWithProtoRefreshError(
redact.Sprintf("cannot %s savepoint after a transaction restart", op),
s.txnID,
tc.mu.txn,
)
}

if s.seqNum < 0 || s.seqNum > tc.interceptorAlloc.txnSeqNumAllocator.writeSeq {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func (p *outOfOrderRequestsProvider) enqueue(requests []singleRangeBatch) {
if len(p.requests) > 0 {
panic(errors.AssertionFailedf("outOfOrderRequestsProvider has old requests in enqueue"))
}
if len(requests) == 0 {
panic(errors.AssertionFailedf("outOfOrderRequestsProvider enqueuing zero requests"))
}
p.requests = requests
p.hasWork.Signal()
}
Expand Down Expand Up @@ -388,6 +391,9 @@ func (p *inOrderRequestsProvider) enqueue(requests []singleRangeBatch) {
if len(p.requests) > 0 {
panic(errors.AssertionFailedf("inOrderRequestsProvider has old requests in enqueue"))
}
if len(requests) == 0 {
panic(errors.AssertionFailedf("inOrderRequestsProvider enqueuing zero requests"))
}
p.requests = requests
p.heapInit()
p.hasWork.Signal()
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_cockroachdb_pebble//objstorage/remote",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) {
} else {
require.NotNil(t, result.Replicated.AddSSTable)
require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data))
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
}

var expect kvs
Expand Down Expand Up @@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
require.NoError(t, err)

require.NoError(t, fs.WriteFile(engine, "sst", sst))
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))

statsEvaled := statsBefore
statsEvaled.Add(*cArgs.Stats)
Expand Down
45 changes: 30 additions & 15 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -628,19 +630,32 @@ func addSSTablePreApply(
sst.Span,
sst.RemoteFileLoc,
)
// TODO(bilal): replace this with the real ingest.
/*
start := storage.EngineKey{Key: sst.Span.Key}
end := storage.EngineKey{Key: sst.Span.EndKey}
externalFile := pebble.ExternalFile{
Locator: shared.Locator(sst.RemoteFileLoc),
ObjName: sst.RemoteFilePath,
Size: sst.BackingFileSize,
SmallestUserKey: start.Encode(),
LargestUserKey: end.Encode(),
}*/
log.Fatalf(ctx, "Unsupported IngestRemoteFile")
start := storage.EngineKey{Key: sst.Span.Key}
end := storage.EngineKey{Key: sst.Span.EndKey}
externalFile := pebble.ExternalFile{
Locator: remote.Locator(sst.RemoteFileLoc),
ObjName: sst.RemoteFilePath,
Size: sst.BackingFileSize,
SmallestUserKey: start.Encode(),
LargestUserKey: end.Encode(),
}
tBegin := timeutil.Now()
defer func() {
if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() {
log.Infof(ctx,
"ingesting SST of size %s at index %d took %.2fs",
humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(),
)
}
}()

_, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile})
if ingestErr != nil {
log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr)
}
// Adding without modification succeeded, no copy necessary.
log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath)
return false
}
checksum := util.CRC32(sst.Data)

Expand Down Expand Up @@ -685,7 +700,7 @@ func addSSTablePreApply(
}

// Regular path - we made a hard link, so we can ingest the hard link now.
ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath})
ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath})
if ingestErr != nil {
log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr)
}
Expand Down Expand Up @@ -726,7 +741,7 @@ func ingestViaCopy(
if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil {
return errors.Wrapf(err, "while ingesting %s", ingestPath)
}
if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil {
if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil {
return errors.Wrapf(err, "while ingesting %s", ingestPath)
}
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (r *Replica) applySnapshot(
// TODO: separate ingestions for log and statemachine engine. See:
//
// https://github.com/cockroachdb/cockroach/issues/93251
r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
if r.store.cfg.KVAdmissionController != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/reports/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func TestConstraintConformanceReportIntegration(t *testing.T) {
// don't make progress.
skip.UnderStressRace(t)
skip.UnderRace(t, "takes >1min under race")
// Similarly, skip the test under deadlock builds.
skip.UnderDeadlock(t, "takes >1min under deadlock")

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 5, base.TestClusterArgs{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/create_as_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ AND status != 'succeeded'`

func TestFormat(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testCases := []struct {
sql string
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/create_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ SELECT nextval(105:::REGCLASS);`,

func TestVersionGatingUDFInCheckConstraints(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

t.Run("new_schema_changer_version_enabled", func(t *testing.T) {
params, _ := createTestServerParams()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,7 @@ func (t *logicTest) newCluster(
stats.DefaultRefreshInterval = time.Millisecond

t.cluster = serverutils.StartNewTestCluster(t.rootT, cfg.NumNodes, params)
t.purgeZoneConfig()
if cfg.UseFakeSpanResolver {
// We need to update the DistSQL span resolver with the fake resolver.
// Note that DistSQL was disabled in makeClusterSetting above, so we
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/multitenant_admin_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (tc testCase) runTest(
if numNodes == 0 {
numNodes = 1
}
cfg.ServerArgs.DefaultTestTenant = base.TestTenantProbabilistic
cfg.ServerArgs.DefaultTestTenant = base.TestControlsTenantsExplicitly
testCluster := serverutils.StartNewTestCluster(t, numNodes, cfg.TestClusterArgs)
defer testCluster.Stopper().Stop(ctx)

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7586,6 +7586,7 @@ CREATE TABLE t.test (pk INT PRIMARY KEY, v INT);
// to conclude. If the locks were not dropped, a deadlock could occur.
func TestConcurrentSchemaChangesDoNotDeadlock(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/schemachanger/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ go_test(
"//pkg/ccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/execinfra",
"//pkg/sql/pgwire/pgcode",
Expand All @@ -54,24 +55,21 @@ go_test(
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scplan",
"//pkg/sql/schemachanger/sctest", # keep
"//pkg/sql/sqltestutils",
"//pkg/sql/sessiondatapb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//errorspb",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
Loading

0 comments on commit f194f92

Please sign in to comment.