diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 14e2904fc7d0..6a5633142141 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -189,8 +189,6 @@ go_test( "//pkg/kv/kvserver/protectedts", "//pkg/multitenant/tenantcapabilities", "//pkg/roachpb", - "//pkg/rpc", - "//pkg/rpc/nodedialer", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/security/username", @@ -235,7 +233,6 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", - "//pkg/util/circuit", "//pkg/util/ctxgroup", "//pkg/util/encoding/csv", "//pkg/util/envutil", diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 6f186ee77be2..d2ceecedb274 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -44,8 +44,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -73,7 +71,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/circuit" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" "github.com/cockroachdb/cockroach/pkg/util/ioctx" @@ -2939,106 +2936,6 @@ func TestExportImportRoundTrip(t *testing.T) { } } -// TestImportRetriesBreakerOpenFailure tests that errors resulting from open -// breakers on the coordinator node are retried. -func TestImportRetriesBreakerOpenFailure(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderShort(t) - skip.UnderRace(t, "takes >1min under race") - - const nodes = 3 - numFiles := nodes + 2 - rowsPerFile := 1 - - ctx := context.Background() - tc := serverutils.StartCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ - DefaultTestTenant: base.TODOTestTenantDisabled, - ExternalIODir: datapathutils.TestDataPath(t, "csv")}}) - defer tc.Stopper().Stop(ctx) - - aboutToRunDSP := make(chan struct{}) - allowRunDSP := make(chan struct{}) - for i := 0; i < tc.NumServers(); i++ { - tc.Server(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor( - jobspb.TypeImport, - func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*importResumer) - r.testingKnobs.beforeRunDSP = func() error { - select { - case aboutToRunDSP <- struct{}{}: - case <-time.After(testutils.DefaultSucceedsSoonDuration): - return errors.New("timed out on aboutToRunDSP") - } - select { - case <-allowRunDSP: - case <-time.After(testutils.DefaultSucceedsSoonDuration): - return errors.New("timed out on allowRunDSP") - } - return nil - } - return r - }) - } - - sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - setSmallIngestBufferSizes(t, sqlDB) - sqlDB.Exec(t, `CREATE TABLE t (a INT, b STRING)`) - var tableID int64 - sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 't'`).Scan(&tableID) - - g := ctxgroup.WithContext(ctx) - g.GoCtx(func(ctx context.Context) error { - testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerFile) - fileListStr := strings.Join(testFiles.files, ", ") - redactedFileListStr := strings.ReplaceAll(fileListStr, "?AWS_SESSION_TOKEN=secrets", "?AWS_SESSION_TOKEN=redacted") - query := fmt.Sprintf(`IMPORT INTO t (a, b) CSV DATA (%s)`, fileListStr) - sqlDB.Exec(t, query) - return jobutils.VerifySystemJob(t, sqlDB, 0, jobspb.TypeImport, jobs.StatusSucceeded, jobs.Record{ - Username: username.RootUserName(), - Description: fmt.Sprintf(`IMPORT INTO defaultdb.public.t(a, b) CSV DATA (%s)`, redactedFileListStr), - DescriptorIDs: []descpb.ID{descpb.ID(tableID)}, - }) - }) - - // On the first attempt, we trip the node 3 breaker between distsql planning - // and actually running the plan. - select { - case <-aboutToRunDSP: - case <-time.After(testutils.DefaultSucceedsSoonDuration): - t.Fatal("timed out on aboutToRunDSP") - } - { - b, ok := tc.Server(0).NodeDialer().(*nodedialer.Dialer).GetCircuitBreaker(roachpb.NodeID(3), rpc.DefaultClass) - require.True(t, ok) - undo := circuit.TestingSetTripped(b, errors.New("boom")) - defer undo() - } - - timeout := testutils.DefaultSucceedsSoonDuration - select { - case allowRunDSP <- struct{}{}: - case <-time.After(timeout): - t.Fatalf("timed out on allowRunDSP attempt 1") - } - - // The failure above should be retried. We expect this to succeed even if we - // don't reset the breaker because node 3 should no longer be included in - // the plan. - select { - case <-aboutToRunDSP: - case <-time.After(timeout): - t.Fatalf("timed out on aboutToRunDSP") - } - select { - case allowRunDSP <- struct{}{}: - case <-time.After(timeout): - t.Fatalf("timed out on allowRunDSP") - } - require.NoError(t, g.Wait()) -} - // TODO(adityamaru): Tests still need to be added incrementally as // relevant IMPORT INTO logic is added. Some of them include: // -> FK and constraint violation