diff --git a/build/teamcity/cockroach/ci/builds/build_docker_image.sh b/build/teamcity/cockroach/ci/builds/build_docker_image.sh index 0cc6c0f96cfc..30b6c74ffdd3 100755 --- a/build/teamcity/cockroach/ci/builds/build_docker_image.sh +++ b/build/teamcity/cockroach/ci/builds/build_docker_image.sh @@ -36,15 +36,12 @@ docker_tag="cockroachdb/cockroach-ci" docker build \ --no-cache \ --tag="$docker_tag" \ + --memory 30g \ + --memory-swap -1 \ build/deploy docker save "$docker_tag" | gzip > "${artifacts}/${docker_image_tar_name}".gz cp upstream_artifacts/cockroach "${artifacts}"/cockroach -docker_fsnotify_dir="$(dirname "${0}")/docker-fsnotify" -cd $docker_fsnotify_dir && go build -cp ./docker-fsnotify "${artifacts}"/docker-fsnotify - tc_end_block "Build and save docker image to artifacts" - diff --git a/build/teamcity/cockroach/ci/builds/docker-fsnotify/BUILD.bazel b/build/teamcity/cockroach/ci/builds/docker-fsnotify/BUILD.bazel deleted file mode 100644 index 5f69c3b2c990..000000000000 --- a/build/teamcity/cockroach/ci/builds/docker-fsnotify/BUILD.bazel +++ /dev/null @@ -1,15 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") - -go_library( - name = "docker-fsnotify_lib", - srcs = ["ListenFileCreation.go"], - importpath = "github.com/cockroachdb/cockroach/build/teamcity/cockroach/ci/builds/docker-fsnotify", - visibility = ["//visibility:private"], - deps = ["@com_github_fsnotify_fsnotify//:fsnotify"], -) - -go_binary( - name = "docker-fsnotify", - embed = [":docker-fsnotify_lib"], - visibility = ["//visibility:public"], -) diff --git a/build/teamcity/cockroach/ci/tests/docker_image.sh b/build/teamcity/cockroach/ci/tests/docker_image.sh new file mode 100755 index 000000000000..badb742d3f63 --- /dev/null +++ b/build/teamcity/cockroach/ci/tests/docker_image.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +set -euo pipefail + +dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))" +source "$dir/teamcity-support.sh" + +tc_prepare + +tc_start_block "Run docker image tests" + +bazel run \ + //pkg/testutils/docker:docker_test \ + --config=crosslinux --config=test \ + --test_timeout=3000 + +tc_end_block "Run docker image tests" diff --git a/build/teamcity/cockroach/nightlies/compose.sh b/build/teamcity/cockroach/nightlies/compose.sh index 4039397bd3c3..da364dde1468 100755 --- a/build/teamcity/cockroach/nightlies/compose.sh +++ b/build/teamcity/cockroach/nightlies/compose.sh @@ -10,9 +10,16 @@ tc_start_block "Run compose tests" bazel build //pkg/cmd/bazci --config=ci BAZCI=$(bazel info bazel-bin --config=ci)/pkg/cmd/bazci/bazci_/bazci -$BAZCI run --config=crosslinux --config=test --config=with_ui --artifacts_dir=$PWD/artifacts \ +bazel build //pkg/cmd/cockroach //pkg/compose/compare/compare:compare_test --config=ci --config=crosslinux --config=test --config=with_ui +CROSSBIN=$(bazel info bazel-bin --config=ci --config=crosslinux --config=test --config=with_ui) +COCKROACH=$CROSSBIN/pkg/cmd/cockroach/cockroach_/cockroach +COMPAREBIN=$(bazel run //pkg/compose/compare/compare:compare_test --config=ci --config=crosslinux --config=test --config=with_ui --run_under=realpath | grep '^/' | tail -n1) + +$BAZCI run --config=ci --config=test --artifacts_dir=$PWD/artifacts \ //pkg/compose:compose_test -- \ --test_env=GO_TEST_WRAP_TESTV=1 \ + --test_arg -cockroach --test_arg $COCKROACH \ + --test_arg -compare --test_arg $COMPAREBIN \ --test_timeout=1800 tc_end_block "Run compose tests" diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 544900e7f5fb..a4acc8557a39 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -15,15 +15,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" ) const ( @@ -46,32 +43,30 @@ func emitResolvedTimestamp( return nil } +func shouldProtectTimestamps(codec keys.SQLCodec) bool { + // TODO(smiskin): Remove this restriction once tenant based pts are enabled + return codec.ForSystemTenant() +} + // createProtectedTimestampRecord will create a record to protect the spans for // this changefeed at the resolved timestamp. The progress struct will be // updated to refer to this new protected timestamp record. func createProtectedTimestampRecord( ctx context.Context, codec keys.SQLCodec, - pts protectedts.Storage, - txn *kv.Txn, jobID jobspb.JobID, targets jobspb.ChangefeedTargets, resolved hlc.Timestamp, progress *jobspb.ChangefeedProgress, -) error { - if !codec.ForSystemTenant() { - return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed") - } - +) *ptpb.Record { progress.ProtectedTimestampRecord = uuid.MakeV4() - log.VEventf(ctx, 2, "creating protected timestamp %v at %v", - progress.ProtectedTimestampRecord, resolved) deprecatedSpansToProtect := makeSpansToProtect(codec, targets) targetToProtect := makeTargetToProtect(targets) - rec := jobsprotectedts.MakeRecord( + + log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved) + return jobsprotectedts.MakeRecord( progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) - return pts.Protect(ctx, txn, rec) } func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target { diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5f8932193d0c..539d9131a32c 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -913,6 +912,10 @@ type changeFrontier struct { // slowLogEveryN rate-limits the logging of slow spans slowLogEveryN log.EveryN + // lastProtectedTimestampUpdate is the last time the protected timestamp + // record was updated to the frontier's highwater mark + lastProtectedTimestampUpdate time.Time + // js, if non-nil, is called to checkpoint the changefeed's // progress in the corresponding system job entry. js *jobState @@ -1219,13 +1222,6 @@ func (cf *changeFrontier) closeMetrics() { cf.metrics.mu.Unlock() } -// shouldProtectBoundaries checks the job's spec to determine whether it should -// install protected timestamps when encountering scan boundaries. -func (cf *changeFrontier) shouldProtectBoundaries() bool { - policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy]) - return policy == changefeedbase.OptSchemaChangePolicyBackfill -} - // Next is part of the RowSource interface. func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for cf.State == execinfra.StateRunning { @@ -1317,7 +1313,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { return err } - isBehind := cf.maybeLogBehindSpan(frontierChanged) + cf.maybeLogBehindSpan(frontierChanged) // If frontier changed, we emit resolved timestamp. emitResolved := frontierChanged @@ -1327,7 +1323,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { // have no distributed state whatsoever. Because of this they also do not // use protected timestamps. if cf.js != nil { - checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind) + checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged) if err != nil { return err } @@ -1354,7 +1350,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { } func (cf *changeFrontier) maybeCheckpointJob( - resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool, + resolvedSpan jobspb.ResolvedSpan, frontierChanged bool, ) (bool, error) { // When in a Backfill, the frontier remains unchanged at the backfill boundary // as we receive spans from the scan request at the Backfill Timestamp @@ -1376,11 +1372,8 @@ func (cf *changeFrontier) maybeCheckpointJob( !inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged)) if updateCheckpoint || updateHighWater { - manageProtected := updateHighWater checkpointStart := timeutil.Now() - if err := cf.checkpointJobProgress( - cf.frontier.Frontier(), manageProtected, checkpoint, isBehind, - ); err != nil { + if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil { return false, err } cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) @@ -1390,16 +1383,8 @@ func (cf *changeFrontier) maybeCheckpointJob( return false, nil } -// checkpointJobProgress checkpoints a changefeed-level job information. -// In addition, if 'manageProtected' is true, which only happens when frontier advanced, -// this method manages the protected timestamp state. -// The isBehind argument is used to determine whether an existing protected timestamp -// should be released. func (cf *changeFrontier) checkpointJobProgress( - frontier hlc.Timestamp, - manageProtected bool, - checkpoint jobspb.ChangefeedProgress_Checkpoint, - isBehind bool, + frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint, ) (err error) { updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency if updateRunStatus { @@ -1420,16 +1405,15 @@ func (cf *changeFrontier) checkpointJobProgress( HighWater: &frontier, } - // Manage protected timestamps. changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed - if manageProtected { - if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil { - return err + changefeedProgress.Checkpoint = &checkpoint + + if shouldProtectTimestamps(cf.flowCtx.Codec()) { + if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) } } - changefeedProgress.Checkpoint = &checkpoint - if updateRunStatus { md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier) } @@ -1448,77 +1432,40 @@ func (cf *changeFrontier) checkpointJobProgress( }) } -// manageProtectedTimestamps is called when the resolved timestamp is being -// checkpointed. The changeFrontier always checkpoints resolved timestamps -// which occur at scan boundaries. It releases previously protected timestamps -// if the changefeed is not behind. See maybeLogBehindSpan for details on the -// behind calculation. -// -// Note that this function is never called for sinkless changefeeds as they have -// no corresponding job and thus no corresponding distributed state on which to -// attach protected timestamp information. -// -// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds, -// perhaps by using whatever mechanism is eventually built to protect -// data for long-running SQL transactions. There's some discussion of this -// use case in the protected timestamps RFC. +// manageProtectedTimestamps periodically advances the protected timestamp for +// the changefeed's targets to the current highwater mark. The record is +// cleared during changefeedResumer.OnFailOrCancel func (cf *changeFrontier) manageProtectedTimestamps( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - txn *kv.Txn, - resolved hlc.Timestamp, - isBehind bool, + ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress, ) error { - pts := cf.flowCtx.Cfg.ProtectedTimestampProvider - if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil { - return err - } - return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved) -} - -// maybeReleaseProtectedTimestamp will release the current protected timestamp -// if either the resolved timestamp is close to the present or we've reached -// a new schemaChangeBoundary which will be protected. -func (cf *changeFrontier) maybeReleaseProtectedTimestamp( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - pts protectedts.Storage, - txn *kv.Txn, - isBehind bool, -) error { - if progress.ProtectedTimestampRecord == uuid.Nil { - return nil - } - if !cf.frontier.schemaChangeBoundaryReached() && isBehind { - log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind") + ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV) + if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval { return nil } - log.VEventf(ctx, 2, "releasing protected timestamp %v", - progress.ProtectedTimestampRecord) - if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil { - return err + cf.lastProtectedTimestampUpdate = timeutil.Now() + + pts := cf.flowCtx.Cfg.ProtectedTimestampProvider + + // Create / advance the protected timestamp record to the highwater mark + highWater := cf.frontier.Frontier() + if highWater.Less(cf.highWaterAtStart) { + highWater = cf.highWaterAtStart } - progress.ProtectedTimestampRecord = uuid.Nil - return nil -} -// maybeProtectTimestamp creates a new protected timestamp when the -// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates -// that we should perform a backfill (see cf.shouldProtectBoundaries()). -func (cf *changeFrontier) maybeProtectTimestamp( - ctx context.Context, - progress *jobspb.ChangefeedProgress, - pts protectedts.Storage, - txn *kv.Txn, - resolved hlc.Timestamp, -) error { - if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() { - return nil + recordID := progress.ProtectedTimestampRecord + if recordID == uuid.Nil { + ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress) + if err := pts.Protect(ctx, txn, ptr); err != nil { + return err + } + } else { + log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater) + if err := pts.UpdateTimestamp(ctx, txn, recordID, highWater); err != nil { + return err + } } - jobID := cf.spec.JobID - targets := cf.spec.Feed.Targets - return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress) + return nil } func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { @@ -1600,12 +1547,6 @@ func (cf *changeFrontier) isSinkless() bool { return cf.spec.JobID == 0 } -// isTenant() bool returns true if this changeFrontier is running on a -// tenant. -func (cf *changeFrontier) isTenant() bool { - return !cf.flowCtx.Codec().ForSystemTenant() -} - // type to make embedding span.Frontier in schemaChangeFrontier convenient. type spanFrontier struct { *span.Frontier diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 27577ba98815..b88eca4886c3 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" @@ -317,29 +316,24 @@ func changefeedPlanHook( return err } - // The below block creates the job and if there's an initial scan, protects - // the data required for that scan. We protect the data here rather than in + // The below block creates the job and protects the data required for the + // changefeed to function from being garbage collected even if the + // changefeed lags behind the gcttl. We protect the data here rather than in // Resume to shorten the window that data may be GC'd. The protected - // timestamps are removed and created during the execution of the changefeed - // by the changeFrontier when checkpointing progress. Additionally protected - // timestamps are removed in OnFailOrCancel. See the comment on - // changeFrontier.manageProtectedTimestamps for more details on the handling of - // protected timestamps. + // timestamps are updated to the highwater mark periodically during the + // execution of the changefeed by the changeFrontier. Protected timestamps + // are removed in OnFailOrCancel. See + // changeFrontier.manageProtectedTimestamps for more details on the handling + // of protected timestamps. var sj *jobs.StartableJob jobID := p.ExecCfg().JobRegistry.MakeJobID() { - - var protectedTimestampID uuid.UUID var ptr *ptpb.Record - - shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant() - if shouldProtectTimestamp { - protectedTimestampID = uuid.MakeV4() - deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets) - targetToProtect := makeTargetToProtect(details.Targets) - progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID - ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime, - deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) + var protectedTimestampID uuid.UUID + codec := p.ExecCfg().Codec + if shouldProtectTimestamps(codec) { + ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed()) + protectedTimestampID = ptr.ID.GetUUID() } jr := jobs.Record{ @@ -900,38 +894,27 @@ func (b *changefeedResumer) maybeCleanUpProtectedTimestamp( var _ jobs.PauseRequester = (*changefeedResumer)(nil) // OnPauseRequest implements jobs.PauseRequester. If this changefeed is being -// paused, we want to install a protected timestamp at the most recent high -// watermark if there isn't already one. +// paused, we may want to clear the protected timestamp record. func (b *changefeedResumer) OnPauseRequest( ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress, ) error { details := b.job.Details().(jobspb.ChangefeedDetails) - if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { - return nil - } cp := progress.GetChangefeed() + execCfg := jobExec.(sql.JobExecContext).ExecCfg() - // If we already have a protected timestamp record, keep it where it is. - if cp.ProtectedTimestampRecord != uuid.Nil { - return nil - } - - resolved := progress.GetHighWater() - if resolved == nil { - // This should only happen if the job was created in a version that did not - // use protected timestamps but has yet to checkpoint its high water. - // Changefeeds from older versions didn't get protected timestamps so it's - // fine to not protect this one. In newer versions changefeeds which perform - // an initial scan at the statement time (and don't have an initial high - // water) will have a protected timestamp. - return nil + if _, shouldProtect := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldProtect { + // Release existing pts record to avoid a single changefeed left on pause + // resulting in storage issues + if cp.ProtectedTimestampRecord != uuid.Nil { + if err := execCfg.ProtectedTimestampProvider.Release(ctx, txn, cp.ProtectedTimestampRecord); err != nil { + log.Warningf(ctx, "failed to release protected timestamp %v: %v", cp.ProtectedTimestampRecord, err) + } else { + cp.ProtectedTimestampRecord = uuid.Nil + } + } } - - execCfg := jobExec.(sql.JobExecContext).ExecCfg() - pts := execCfg.ProtectedTimestampProvider - return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, b.job.ID(), - details.Targets, *resolved, cp) + return nil } // getQualifiedTableName returns the database-qualified name of the table diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 7be60d2a2e17..fe7b608ec1f0 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3663,6 +3663,75 @@ func TestChangefeedPauseUnpauseCursorAndInitialScan(t *testing.T) { t.Run(`pubsub`, pubsubTest(testFn)) } +func TestChangefeedUpdateProtectedTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + ptsInterval := 50 * time.Millisecond + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &f.Server().ClusterSettings().SV, ptsInterval) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`) + defer closeFeed(t, foo) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + f.Server().DB(), keys.SystemSQLCodec, "d", "foo") + tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + ptsProvider := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider + + var tableID int + sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+ + `WHERE name = 'foo' AND database_name = current_database()`). + Scan(&tableID) + + getTablePtsRecord := func() *ptpb.Record { + var r *ptpb.Record + require.NoError(t, ptsProvider.Refresh(context.Background(), f.Server().Clock().Now())) + ptsProvider.Iterate(context.Background(), tableSpan.Key, tableSpan.EndKey, func(record *ptpb.Record) (wantMore bool) { + r = record + return false + }) + + expectedKeys := map[string]struct{}{ + string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {}, + string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {}, + } + require.Equal(t, len(r.DeprecatedSpans), len(expectedKeys)) + for _, s := range r.DeprecatedSpans { + require.Contains(t, expectedKeys, string(s.Key)) + } + return r + } + + // Wait and return the next resolved timestamp after the wait time + waitAndDrainResolved := func(ts time.Duration) hlc.Timestamp { + targetTs := timeutil.Now().Add(ts) + for { + resolvedTs, _ := expectResolvedTimestamp(t, foo) + if resolvedTs.GoTime().UnixNano() > targetTs.UnixNano() { + return resolvedTs + } + } + } + + // Observe the protected timestamp advancing along with resolved timestamps + for i := 0; i < 5; i++ { + // Progress the changefeed and allow time for a pts record to be laid down + nextResolved := waitAndDrainResolved(100 * time.Millisecond) + time.Sleep(2 * ptsInterval) + rec := getTablePtsRecord() + require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), rec.Timestamp.GoTime().UnixNano()) + } + } + + t.Run(`enterprise`, enterpriseTest(testFn, feedTestNoTenants)) + t.Run(`kafka`, kafkaTest(testFn, feedTestNoTenants)) + t.Run(`webhook`, webhookTest(testFn, feedTestNoTenants)) +} + func TestChangefeedProtectedTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3766,11 +3835,24 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `WHERE name = 'foo' AND database_name = current_database()`). Scan(&tableID) + changefeedbase.ProtectTimestampInterval.Override( + context.Background(), &f.Server().ClusterSettings().SV, 100*time.Millisecond) + ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider getPtsRec := mkGetPtsRec(t, ptp, f.Server().Clock()) waitForRecord := mkWaitForRecordCond(t, getPtsRec, mkCheckRecord(t, tableID)) waitForNoRecord := mkWaitForRecordCond(t, getPtsRec, checkNoRecord) waitForBlocked := requestBlockedScan() + waitForRecordAdvanced := func(ts hlc.Timestamp) { + check := func(ptr *ptpb.Record) error { + if ptr != nil && !ptr.Timestamp.LessEq(ts) { + return nil + } + return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, ptr.Timestamp) + } + + mkWaitForRecordCond(t, getPtsRec, check)() + } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved`) defer closeFeed(t, foo) @@ -3787,8 +3869,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [7]->{"after": {"a": 7, "b": "d"}}`, `foo: [8]->{"after": {"a": 8, "b": "e"}}`, }) - expectResolvedTimestamp(t, foo) - waitForNoRecord() + resolved, _ := expectResolvedTimestamp(t, foo) + waitForRecordAdvanced(resolved) } { @@ -3806,8 +3888,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [7]->{"after": {"a": 7, "b": "d", "c": 1}}`, `foo: [8]->{"after": {"a": 8, "b": "e", "c": 1}}`, }) - expectResolvedTimestamp(t, foo) - waitForNoRecord() + resolved, _ := expectResolvedTimestamp(t, foo) + waitForRecordAdvanced(resolved) } { @@ -3878,7 +3960,7 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) { r, err = pts.GetRecord(ctx, txn, details.ProtectedTimestampRecord) return err })) - require.Equal(t, r.Timestamp, *progress.GetHighWater()) + require.True(t, r.Timestamp.LessEq(*progress.GetHighWater())) } else { require.Equal(t, uuid.Nil, details.ProtectedTimestampRecord) } @@ -3888,21 +3970,26 @@ func TestChangefeedProtectedTimestampOnPause(t *testing.T) { // the changefeed has caught up. require.NoError(t, feedJob.Resume()) testutils.SucceedsSoon(t, func() error { - expectResolvedTimestamp(t, foo) + resolvedTs, _ := expectResolvedTimestamp(t, foo) j, err := jr.LoadJob(ctx, feedJob.JobID()) require.NoError(t, err) details := j.Progress().Details.(*jobspb.Progress_Changefeed).Changefeed - if details.ProtectedTimestampRecord != uuid.Nil { - return fmt.Errorf("expected no protected timestamp record") - } - return nil + + err = serverCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + r, err := pts.GetRecord(ctx, txn, details.ProtectedTimestampRecord) + if err != nil || r.Timestamp.Less(resolvedTs) { + return fmt.Errorf("expected protected timestamp record %v to have timestamp greater than %v", r, resolvedTs) + } + return nil + }) + return err }) } } testutils.RunTrueAndFalse(t, "protect_on_pause", func(t *testing.T, shouldPause bool) { - t.Run(`enterprise`, enterpriseTest(testFn(shouldPause), feedTestNoTenants)) - t.Run(`cloudstorage`, cloudStorageTest(testFn(shouldPause), feedTestNoTenants)) + // t.Run(`enterprise`, enterpriseTest(testFn(shouldPause), feedTestNoTenants)) + // t.Run(`cloudstorage`, cloudStorageTest(testFn(shouldPause), feedTestNoTenants)) t.Run(`kafka`, kafkaTest(testFn(shouldPause), feedTestNoTenants)) t.Run(`webhook`, webhookTest(testFn(shouldPause), feedTestNoTenants)) t.Run(`pubsub`, pubsubTest(testFn(shouldPause), feedTestNoTenants)) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 4651dac53413..39fa3ba1250c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -167,3 +167,12 @@ var EventMemoryMultiplier = settings.RegisterFloatSetting( return nil }, ) + +// ProtectTimestampInterval controls the frequency of protected timestamp record updates +var ProtectTimestampInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.protect_timestamp_interval", + "controls how often the changefeed forwards its protected timestamp to the resolved timestamp", + 10*time.Minute, + settings.PositiveDuration, +) diff --git a/pkg/ccl/logictestccl/testdata/logic_test/tenant_settings b/pkg/ccl/logictestccl/testdata/logic_test/tenant_settings index 794f06cc8a05..00de3680b0e5 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/tenant_settings +++ b/pkg/ccl/logictestccl/testdata/logic_test/tenant_settings @@ -101,3 +101,21 @@ query T retry SHOW CLUSTER SETTING kv.protectedts.reconciliation.interval ---- 00:00:45 + +user host-cluster-root + +# Verify that destroying a tenant cleans up any tenant-specific overrides. +statement ok +SELECT crdb_internal.create_tenant(1234) + +# TODO(radu): replace with ALTER TENANT when it's available. +statement ok +INSERT INTO system.tenant_settings (tenant_id, name, value, value_type) VALUES (1234, 'sql.notices.enabled', 'true', 'b') + +statement ok +SELECT crdb_internal.destroy_tenant(1234, true) + +query I +SELECT count(*) FROM system.tenant_settings WHERE tenant_id = 1234 +---- +0 \ No newline at end of file diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index 2baa8c209e0a..dd44608c671c 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -36,6 +36,7 @@ go_test( "//pkg/sql", "//pkg/sql/distsql", "//pkg/sql/tests", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", @@ -43,6 +44,7 @@ go_test( "//pkg/util/envutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/timeutil", "@com_github_elastic_gosigar//:gosigar", diff --git a/pkg/ccl/serverccl/admin_test.go b/pkg/ccl/serverccl/admin_test.go index a0e3e063caed..29508142bbda 100644 --- a/pkg/ccl/serverccl/admin_test.go +++ b/pkg/ccl/serverccl/admin_test.go @@ -10,21 +10,36 @@ package serverccl import ( "context" + "fmt" "reflect" "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/stretchr/testify/require" ) var adminPrefix = "/_admin/v1/" +func getAdminJSONProto( + ts serverutils.TestServerInterface, path string, response protoutil.Message, +) error { + return getAdminJSONProtoWithAdminOption(ts, path, response, true) +} + +func getAdminJSONProtoWithAdminOption( + ts serverutils.TestServerInterface, path string, response protoutil.Message, isAdmin bool, +) error { + return serverutils.GetJSONProtoWithAdminOption(ts, adminPrefix+path, response, isAdmin) +} + // TestAdminAPIDataDistributionPartitioning partitions a table and verifies // that we see all zone configs (#27718). func TestAdminAPIDataDistributionPartitioning(t *testing.T) { @@ -64,9 +79,8 @@ func TestAdminAPIDataDistributionPartitioning(t *testing.T) { } var resp serverpb.DataDistributionResponse - if err := serverutils.GetJSONProto(firstServer, adminPrefix+"data_distribution", &resp); err != nil { - t.Fatal(err) - } + err := serverutils.GetJSONProto(firstServer, adminPrefix+"data_distribution", &resp) + require.NoError(t, err) actualZoneConfigNames := map[string]struct{}{} for name := range resp.ZoneConfigs { @@ -93,3 +107,43 @@ func TestAdminAPIChartCatalog(t *testing.T) { err := serverutils.GetJSONProto(firstServer, adminPrefix+"chartcatalog", &resp) require.NoError(t, err) } + +func TestAdminAPIJobs(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + //ctx := context.Background() + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir}) + defer s.Stopper().Stop(context.Background()) + sqlDB := sqlutils.MakeSQLRunner(conn) + + sqlDB.Exec(t, `BACKUP INTO 'nodelocal://0/backup/1?AWS_SECRET_ACCESS_KEY=neverappears'`) + + var jobsRes serverpb.JobsResponse + err := getAdminJSONProto(s, "jobs", &jobsRes) + require.NoError(t, err) + + var backups []serverpb.JobResponse + for _, job := range jobsRes.Jobs { + if job.Type != "BACKUP" { + continue + } + backups = append(backups, job) + + } + + if len(backups) != 1 { + t.Errorf("Expected 1 Backup job, got %d", len(backups)) + } + + jobID := backups[0].ID + + var jobRes serverpb.JobResponse + path := fmt.Sprintf("jobs/%v", jobID) + err = getAdminJSONProto(s, path, &jobRes) + require.NoError(t, err) + + require.Equal(t, backups[0], jobRes) +} diff --git a/pkg/cmd/dev/build.go b/pkg/cmd/dev/build.go index 56a0e11270a8..23d0d8d84443 100644 --- a/pkg/cmd/dev/build.go +++ b/pkg/cmd/dev/build.go @@ -292,7 +292,7 @@ func (d *dev) getBasicBuildArgs( typ := fields[0] args = append(args, fullTargetName) buildTargets = append(buildTargets, buildTarget{fullName: fullTargetName, kind: typ}) - if typ == "go_test" { + if typ == "go_test" || typ == "go_transition_test" { shouldBuildWithTestConfig = true } } diff --git a/pkg/cmd/dev/compose.go b/pkg/cmd/dev/compose.go index 470ec7179033..5adbc3f6f577 100644 --- a/pkg/cmd/dev/compose.go +++ b/pkg/cmd/dev/compose.go @@ -12,6 +12,7 @@ package main import ( "fmt" + "path/filepath" "github.com/spf13/cobra" ) @@ -27,6 +28,7 @@ func makeComposeCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.C } addCommonBuildFlags(composeCmd) addCommonTestFlags(composeCmd) + composeCmd.Flags().String(volumeFlag, "bzlhome", "the Docker volume to use as the container home directory (only used for cross builds)") return composeCmd } @@ -38,6 +40,23 @@ func (d *dev) compose(cmd *cobra.Command, _ []string) error { timeout = mustGetFlagDuration(cmd, timeoutFlag) ) + crossArgs, targets, err := d.getBasicBuildArgs(ctx, []string{"//pkg/cmd/cockroach:cockroach", "//pkg/compose/compare/compare:compare_test"}) + if err != nil { + return err + } + volume := mustGetFlagString(cmd, volumeFlag) + err = d.crossBuild(ctx, crossArgs, targets, "crosslinux", volume) + if err != nil { + return err + } + + workspace, err := d.getWorkspace(ctx) + if err != nil { + return err + } + cockroachBin := filepath.Join(workspace, "artifacts", "cockroach") + compareBin := filepath.Join(workspace, "artifacts", "compare_test") + var args []string args = append(args, "run", "//pkg/compose:compose_test", "--config=test") if numCPUs != 0 { @@ -53,6 +72,9 @@ func (d *dev) compose(cmd *cobra.Command, _ []string) error { args = append(args, fmt.Sprintf("--test_timeout=%d", int(timeout.Seconds()))) } + args = append(args, "--test_arg", "-cockroach", "--test_arg", cockroachBin) + args = append(args, "--test_arg", "-compare", "--test_arg", compareBin) + logCommand("bazel", args...) return d.exec.CommandContextInheritingStdStreams(ctx, "bazel", args...) } diff --git a/pkg/cmd/dev/testdata/datadriven/compose b/pkg/cmd/dev/testdata/datadriven/compose deleted file mode 100644 index a419588c2064..000000000000 --- a/pkg/cmd/dev/testdata/datadriven/compose +++ /dev/null @@ -1,9 +0,0 @@ -exec -dev compose ----- -bazel run //pkg/compose:compose_test --config=test - -exec -dev compose --cpus 12 --short --timeout 1m -f TestComposeCompare ----- -bazel run //pkg/compose:compose_test --config=test --local_cpu_resources=12 --test_filter=TestComposeCompare --test_arg -test.short --test_timeout=60 diff --git a/pkg/compose/compose_test.go b/pkg/compose/compose_test.go index 02eb5e0e480b..5ca8d834aadb 100644 --- a/pkg/compose/compose_test.go +++ b/pkg/compose/compose_test.go @@ -35,6 +35,8 @@ var ( flagEach = flag.Duration("each", 10*time.Minute, "individual test timeout") flagTests = flag.String("tests", ".", "tests within docker compose to run") flagArtifacts = flag.String("artifacts", "", "artifact directory") + flagCockroach = flag.String("cockroach", "", "path to the cockroach executable") + flagCompare = flag.String("compare", "", "path to the compare test (only valid for bazel-driven test)") ) func copyBin(src, dst string) error { @@ -60,13 +62,11 @@ func TestComposeCompare(t *testing.T) { if err != nil { t.Fatal(err) } - origCockroachBin, found := bazel.FindBinary("pkg/cmd/cockroach", "cockroach") - if !found { - t.Fatal("could not find cockroach binary") + if *flagCockroach == "" { + t.Fatal("-cockroach not set") } - origCompareBin, found := bazel.FindBinary("pkg/compose/compare/compare", "compare_test") - if !found { - t.Fatal("could not find compare_test binary") + if *flagCompare == "" { + t.Fatal("-compare not set") } // These binaries are going to be mounted as volumes when we // start up docker-compose, but the files themselves will be @@ -79,11 +79,11 @@ func TestComposeCompare(t *testing.T) { defer func() { _ = os.RemoveAll(composeBinsDir) }() compareDir = composeBinsDir cockroachBin = filepath.Join(composeBinsDir, "cockroach") - err = copyBin(origCockroachBin, cockroachBin) + err = copyBin(*flagCockroach, cockroachBin) if err != nil { t.Fatal(err) } - err = copyBin(origCompareBin, filepath.Join(composeBinsDir, "compare.test")) + err = copyBin(*flagCompare, filepath.Join(composeBinsDir, "compare.test")) if err != nil { t.Fatal(err) } @@ -94,7 +94,13 @@ func TestComposeCompare(t *testing.T) { } } } else { - cockroachBin = "../../../cockroach-linux-2.6.32-gnu-amd64" + if *flagCompare != "" { + t.Fatal("should not set -compare unless test is driven with Bazel") + } + cockroachBin = *flagCockroach + if cockroachBin == "" { + cockroachBin = "../../../cockroach-linux-2.6.32-gnu-amd64" + } compareDir = "./compare" dockerComposeYml = filepath.Join("compare", "docker-compose.yml") if *flagArtifacts == "" { diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 822a90574c0d..5be0801cc5d5 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -296,6 +296,8 @@ func max(a, b int64) int64 { // NewStreamer creates a new Streamer. // +// txn must be a LeafTxn. +// // limitBytes determines the maximum amount of memory this Streamer is allowed // to use (i.e. it'll be used lazily, as needed). The more memory it has, the // higher its internal concurrency and throughput. @@ -315,6 +317,9 @@ func NewStreamer( limitBytes int64, acc *mon.BoundAccount, ) *Streamer { + if txn.Type() != kv.LeafTxn { + panic(errors.AssertionFailedf("RootTxn is given to the Streamer")) + } s := &Streamer{ distSender: distSender, stopper: stopper, diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 0862bab38c4d..3005984aaebc 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -104,6 +104,20 @@ func TestStreamerLimitations(t *testing.T) { // responded to. require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) }) + + t.Run("unexpected RootTxn", func(t *testing.T) { + require.Panics(t, func() { + NewStreamer( + s.DistSenderI().(*kvcoord.DistSender), + s.Stopper(), + kv.NewTxn(ctx, s.DB(), s.NodeID()), + cluster.MakeTestingClusterSettings(), + lock.WaitPolicy(0), + math.MaxInt64, /* limitBytes */ + nil, /* acc */ + ) + }) + }) } // TestLargeKeys verifies that the Streamer successfully completes the queries diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 08ae4e5dd8e8..849f6a23b2ef 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2054,13 +2054,12 @@ func (s *adminServer) jobHelper( ctx context.Context, request *serverpb.JobRequest, userName security.SQLUsername, ) (_ *serverpb.JobResponse, retErr error) { const query = ` - SELECT job_id, job_type, description, statement, user_name, descriptor_ids, status, - running_status, created, started, finished, modified, - fraction_completed, high_water_timestamp, error, last_run, - next_run, num_runs - FROM crdb_internal.jobs - WHERE job_id = $1` - + SELECT job_id, job_type, description, statement, user_name, descriptor_ids, status, + running_status, created, started, finished, modified, + fraction_completed, high_water_timestamp, error, last_run, + next_run, num_runs, execution_events::string::bytes + FROM crdb_internal.jobs + WHERE job_id = $1` row, cols, err := s.server.sqlServer.internalExecutor.QueryRowExWithCols( ctx, "admin-job", nil, sessiondata.InternalExecutorOverride{User: userName}, diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index a523bc0f8930..397eca79f0b4 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/typeconv" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" @@ -461,7 +462,9 @@ func NewColIndexJoin( memoryLimit := execinfra.GetWorkMemLimit(flowCtx) - useStreamer := row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering + useStreamer := flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn && + row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) && + !spec.MaintainOrdering if useStreamer { if streamerBudgetAcc == nil { return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") diff --git a/pkg/sql/distsql/BUILD.bazel b/pkg/sql/distsql/BUILD.bazel index e49e3e95f9eb..4330deeee7cd 100644 --- a/pkg/sql/distsql/BUILD.bazel +++ b/pkg/sql/distsql/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/faketreeeval", "//pkg/sql/flowinfra", + "//pkg/sql/row", "//pkg/sql/rowflow", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index df3d8fd12f75..b465dad5e714 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/faketreeeval" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowflow" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -410,13 +411,15 @@ func (ds *ServerImpl) setupFlow( // localState.Txn. Otherwise, we create a txn based on the request's // LeafTxnInputState. useLeaf := false - for _, proc := range req.Flow.Processors { - if jr := proc.Core.JoinReader; jr != nil { - if !jr.MaintainOrdering && jr.IsIndexJoin() { - // Index joins when ordering doesn't have to be maintained are - // executed via the Streamer API that has concurrency. - useLeaf = true - break + if req.LeafTxnInputState != nil && row.CanUseStreamer(ctx, ds.Settings) { + for _, proc := range req.Flow.Processors { + if jr := proc.Core.JoinReader; jr != nil { + if !jr.MaintainOrdering && jr.IsIndexJoin() { + // Index joins when ordering doesn't have to be maintained + // are executed via the Streamer API that has concurrency. + useLeaf = true + break + } } } } @@ -524,7 +527,6 @@ type LocalState struct { IsLocal bool // HasConcurrency indicates whether the local flow uses multiple goroutines. - // It is set only if IsLocal is true. HasConcurrency bool // Txn is filled in on the gateway only. It is the RootTxn that the query is running in. diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 39266ff5bb43..44547e258c39 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -452,9 +453,21 @@ func (dsp *DistSQLPlanner) Run( localState.HasConcurrency = localState.HasConcurrency || execinfra.HasParallelProcessors(flow) } } + } + // Even if planCtx.isLocal is false (which is the case when we think it's + // worth distributing the query), we need to go through the processors to + // figure out whether any of them have concurrency. + // + // At the moment of writing, this is only relevant whenever the Streamer API + // might be used by some of the processors. The Streamer internally can have + // concurrency, so it expects to be given a LeafTxn. In order for that + // LeafTxn to be created later, during the flow setup, we need to populate + // leafInputState below, so we tell the localState that there is + // concurrency. + if row.CanUseStreamer(ctx, dsp.st) { for _, proc := range plan.Processors { - if js := proc.Spec.Core.JoinReader; js != nil { - if !js.MaintainOrdering && js.IsIndexJoin() { + if jr := proc.Spec.Core.JoinReader; jr != nil { + if !jr.MaintainOrdering && jr.IsIndexJoin() { // Index joins when ordering doesn't have to be maintained // are executed via the Streamer API that has concurrency. localState.HasConcurrency = true diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 0de195a3817c..1bbbae2ab115 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -16,6 +16,7 @@ import ( "sort" "unsafe" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -303,7 +304,9 @@ func newJoinReader( if flowCtx.EvalCtx.SessionData().ParallelizeMultiKeyLookupJoinsEnabled { shouldLimitBatches = false } - tryStreamer := row.CanUseStreamer(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering + tryStreamer := flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn && + row.CanUseStreamer(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Settings) && + !spec.MaintainOrdering jr := &joinReader{ desc: tableDesc, diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index c3a10d330564..4ead4dca5249 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -448,6 +448,15 @@ func GCTenantSync(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Ten return errors.Wrapf(err, "deleting tenant %d usage", info.ID) } + if execCfg.Settings.Version.IsActive(ctx, clusterversion.TenantSettingsTable) { + if _, err := execCfg.InternalExecutor.ExecEx( + ctx, "delete-tenant-settings", txn, sessiondata.NodeUserSessionDataOverride, + `DELETE FROM system.tenant_settings WHERE tenant_id = $1`, info.ID, + ); err != nil { + return errors.Wrapf(err, "deleting tenant %d settings", info.ID) + } + } + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.PreSeedTenantSpanConfigs) { return nil } diff --git a/pkg/testutils/docker/BUILD.bazel b/pkg/testutils/docker/BUILD.bazel new file mode 100644 index 000000000000..e562f80ae8ef --- /dev/null +++ b/pkg/testutils/docker/BUILD.bazel @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +# keep +go_test( + name = "docker_test", + srcs = ["single_node_docker_test.go"], + data = glob(["testdata/**"]) + [ + "//pkg/testutils/docker:testdata", + "//pkg/testutils/docker/docker-fsnotify:docker-fsnotify", + ], + gotags = ["docker"], + deps = [ + "//pkg/util/contextutil", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_docker_docker//api/types", + "@com_github_docker_docker//api/types/container", + "@com_github_docker_docker//api/types/filters", + "@com_github_docker_docker//client", + "@com_github_docker_docker//pkg/stdcopy", + "@com_github_docker_go_connections//nat", + ], +) + +go_library( + name = "testutils_docker", + srcs = ["empty.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/docker", + visibility = ["//visibility:public"], +) diff --git a/pkg/testutils/docker/docker-fsnotify/BUILD.bazel b/pkg/testutils/docker/docker-fsnotify/BUILD.bazel new file mode 100644 index 000000000000..d1e759f8ef5a --- /dev/null +++ b/pkg/testutils/docker/docker-fsnotify/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") + +go_library( + name = "docker-fsnotify_lib", + srcs = ["listen_file_creation.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/docker/docker-fsnotify", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/log", # keep + "@com_github_cockroachdb_errors//:errors", + "@com_github_fsnotify_fsnotify//:fsnotify", + ], +) + +go_binary( + name = "docker-fsnotify", + out = "docker-fsnotify-bin", + embed = [":docker-fsnotify_lib"], + visibility = ["//visibility:public"], +) diff --git a/build/teamcity/cockroach/ci/builds/docker-fsnotify/ListenFileCreation.go b/pkg/testutils/docker/docker-fsnotify/listen_file_creation.go similarity index 66% rename from build/teamcity/cockroach/ci/builds/docker-fsnotify/ListenFileCreation.go rename to pkg/testutils/docker/docker-fsnotify/listen_file_creation.go index 9c794c715809..10ef0e1bc193 100644 --- a/build/teamcity/cockroach/ci/builds/docker-fsnotify/ListenFileCreation.go +++ b/pkg/testutils/docker/docker-fsnotify/listen_file_creation.go @@ -8,20 +8,19 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -// Usage: go run ./ListenFileChange.go parent_folder_path file_name [timeout_duration] +// Usage: go run ./listen_file_creation.go parent_folder_path file_name [timeout_duration] package main import ( - "errors" "fmt" - "log" "os" "path/filepath" "strconv" "strings" "time" + "github.com/cockroachdb/errors" "github.com/fsnotify/fsnotify" ) @@ -30,12 +29,14 @@ type result struct { err error } -const defaultTimeout = 30 +const defaultTimeout = 30 * time.Second func main() { - if len(os.Args) < 2 { - panic(fmt.Errorf("must provide the folder to watch and the file to listen to")) + panic(errors.Wrap( + fmt.Errorf("must provide the folder to watch and the file to listen to"), + "fail to run fsnotify to listen to file creation"), + ) } var err error @@ -45,19 +46,25 @@ func main() { timeout := defaultTimeout + var timeoutVal int if len(os.Args) > 3 { - timeoutArg := os.Args[3] - timeout, err = strconv.Atoi(timeoutArg) + timeoutVal, err = strconv.Atoi(os.Args[3]) if err != nil { - panic(fmt.Errorf("timeout argument must be an integer: %v", err)) + panic(errors.Wrap(err, "timeout argument must be an integer")) } } + timeout = time.Duration(timeoutVal) * time.Second + watcher, err := fsnotify.NewWatcher() if err != nil { - log.Fatal(err) + panic(errors.Wrap(err, "cannot create new fsnotify file watcher")) } - defer watcher.Close() + defer func() { + if err := watcher.Close(); err != nil { + panic(errors.Wrap(err, "error closing the file watcher in docker-fsnotify")) + } + }() done := make(chan result) @@ -102,12 +109,10 @@ func main() { if res.finished && res.err == nil { fmt.Println("finished") } else { - fmt.Printf("error: %v", res.err) + fmt.Printf("error in docker-fsnotify: %v", res.err) } - case <-time.After(time.Duration(timeout) * time.Second): - fmt.Printf("timeout for %d second", timeout) + case <-time.After(timeout): + fmt.Printf("timeout for %s", timeout) } - - return } diff --git a/pkg/testutils/docker/empty.go b/pkg/testutils/docker/empty.go new file mode 100644 index 000000000000..b1971832c8f9 --- /dev/null +++ b/pkg/testutils/docker/empty.go @@ -0,0 +1,13 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package docker + +// This file is here so go test always finds at least one file. diff --git a/pkg/testutils/docker/single_node_docker_test.go b/pkg/testutils/docker/single_node_docker_test.go new file mode 100644 index 000000000000..bd8db59f9395 --- /dev/null +++ b/pkg/testutils/docker/single_node_docker_test.go @@ -0,0 +1,556 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build docker +// +build docker + +package docker + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" + "github.com/docker/go-connections/nat" +) + +const fsnotifyBinName = "docker-fsnotify-bin" + +// sqlQuery consists of a sql query and the expected result. +type sqlQuery struct { + query string + expectedResult string +} + +// runContainerArgs are equivalent to arguments passed to a `docker run` +// command. +type runContainerArgs struct { + // envSetting is to set the environment variables for the docker container. + envSetting []string + // volSetting is to set how local directories will be mounted to the container. + volSetting []string + // cmd is the command to run when starting the container. + cmd []string +} + +// singleNodeDockerTest consists of two main parts: start the container with +// a single-node cockroach server using runContainerArgs, +// and execute sql queries in this running container. +type singleNodeDockerTest struct { + testName string + runContainerArgs runContainerArgs + containerName string + // sqlOpts are arguments passed to a `cockroach sql` command. + sqlOpts []string + // sqlQueries are queries to run in this container, and their expected results. + sqlQueries []sqlQuery +} + +func TestSingleNodeDocker(t *testing.T) { + ctx := context.Background() + pwd, err := os.Getwd() + if err != nil { + t.Fatal(errors.NewAssertionErrorWithWrappedErrf(err, "cannot get pwd")) + } + + fsnotifyPath := filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(pwd)))))), "docker-fsnotify") + + var dockerTests = []singleNodeDockerTest{ + { + testName: "single-node-secure-mode", + containerName: "roach1", + runContainerArgs: runContainerArgs{ + envSetting: []string{ + "COCKROACH_DATABASE=mydb", + "COCKROACH_USER=myuser", + "COCKROACH_PASSWORD=23333", + }, + volSetting: []string{ + fmt.Sprintf("%s/testdata/single-node-test/docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d", pwd), + fmt.Sprintf("%s/docker-fsnotify-bin:/cockroach/docker-fsnotify", fsnotifyPath), + }, + cmd: []string{"start-single-node", "--certs-dir=certs"}, + }, + sqlOpts: []string{ + "--format=csv", + "--certs-dir=certs", + "--user=myuser", + "--url=postgresql://myuser:23333@127.0.0.1:26257/mydb?sslcert=certs%2Fclient.myuser.crt&sslkey=certs%2Fclient.myuser.key&sslmode=verify-full&sslrootcert=certs%2Fca.crt", + }, + sqlQueries: []sqlQuery{ + {"SELECT current_user", "current_user\nmyuser"}, + {"SELECT current_database()", "current_database\nmydb"}, + {"CREATE TABLE hello (X INT)", "CREATE TABLE"}, + {"INSERT INTO hello VALUES (1), (2), (3)", "INSERT 3"}, + {"SELECT * FROM hello", "x\n1\n2\n3"}, + {"SELECT * FROM bello", "id,name\n1,a\n2,b\n3,c"}, + }, + }, + { + testName: "single-node-insecure-mode", + containerName: "roach2", + runContainerArgs: runContainerArgs{ + envSetting: []string{ + "COCKROACH_DATABASE=mydb", + }, + volSetting: []string{ + fmt.Sprintf("%s/testdata/single-node-test/docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d", pwd), + fmt.Sprintf("%s/docker-fsnotify-bin:/cockroach/docker-fsnotify", fsnotifyPath), + }, + cmd: []string{"start-single-node", "--insecure"}, + }, + sqlOpts: []string{ + "--format=csv", + "--insecure", + "--database=mydb", + }, + sqlQueries: []sqlQuery{ + {"SELECT current_user", "current_user\nroot"}, + {"SELECT current_database()", "current_database\nmydb"}, + {"CREATE TABLE hello (X INT)", "CREATE TABLE"}, + {"INSERT INTO hello VALUES (1), (2), (3)", "INSERT 3"}, + {"SELECT * FROM hello", "x\n1\n2\n3"}, + {"SELECT * FROM bello", "id,name\n1,a\n2,b\n3,c"}, + }, + }, + } + + cl, err := client.NewClientWithOpts(client.FromEnv) + cl.NegotiateAPIVersion(ctx) + + if err != nil { + t.Fatal(err) + } + dn := dockerNode{ + cl: cl, + } + + if err := removeLocalData(); err != nil { + t.Fatal(err) + } + + if err := contextutil.RunWithTimeout( + ctx, + "remove all containers using current image", + defaultTimeout, + func(ctx context.Context) error { + return dn.removeAllContainers(ctx) + }); err != nil { + t.Errorf("%v", err) + } + + for _, test := range dockerTests { + t.Run(test.testName, func(t *testing.T) { + + if err := contextutil.RunWithTimeout( + ctx, + "start container", + defaultTimeout, + func(ctx context.Context) error { + return dn.startContainer( + ctx, + test.containerName, + test.runContainerArgs.envSetting, + test.runContainerArgs.volSetting, + test.runContainerArgs.cmd, + ) + }, + ); err != nil { + t.Fatal(err) + } + + if err := contextutil.RunWithTimeout( + ctx, + "wait for the server to fully start up", + serverStartTimeout, + func(ctx context.Context) error { + return dn.waitServerStarts(ctx) + }, + ); err != nil { + t.Fatal(err) + } + + if err := contextutil.RunWithTimeout( + ctx, + "show log", + defaultTimeout, + func(ctx context.Context) error { + return dn.showContainerLog(ctx, fmt.Sprintf("%s.log", test.testName)) + }, + ); err != nil { + log.Warningf(ctx, "cannot show container log: %v", err) + } + + for _, qe := range test.sqlQueries { + query := qe.query + expected := qe.expectedResult + + if err := contextutil.RunWithTimeout( + ctx, + fmt.Sprintf("execute command \"%s\"", query), + defaultTimeout, + func(ctx context.Context) error { + resp, err := dn.execSQLQuery(ctx, query, test.sqlOpts) + if err != nil { + return err + } + cleanedOutput, err := cleanQueryResult(resp.stdOut) + if err != nil { + return err + } + if cleanedOutput != expected { + return fmt.Errorf("executing %s, expect:\n%#v\n, got\n%#v", query, expected, cleanedOutput) + } + return nil + }, + ); err != nil { + t.Errorf("%v", err) + } + } + + if err := contextutil.RunWithTimeout( + ctx, + "remove current container", + defaultTimeout, + func(ctx context.Context) error { + return dn.rmContainer(ctx) + }, + ); err != nil { + t.Errorf("%v", err) + } + + }) + } + +} + +const ( + imageName = "cockroachdb/cockroach-ci:latest" + defaultTimeout = 10 * time.Second + serverStartTimeout = 80 * time.Second + listenURLFile = "demoFile" + cockroachEntrypoint = "./cockroach" + hostPort = "8080" + cockroachPort = "26257" + hostIP = "127.0.0.1" +) + +type dockerNode struct { + cl client.APIClient + contID string +} + +// removeLocalData removes existing database saved in cockroach-data. +func removeLocalData() error { + err := os.RemoveAll("./cockroach-data") + if err != nil { + return errors.Wrap(err, "cannot remove local data") + } + return nil +} + +// showContainerLog outputs the container's logs to the logFile and stderr. +func (dn *dockerNode) showContainerLog(ctx context.Context, logFileName string) error { + + cmdLog, err := os.Create(logFileName) + if err != nil { + return errors.Wrap(err, "cannot create log file") + } + out := io.MultiWriter(cmdLog, os.Stderr) + + rc, err := dn.cl.ContainerLogs(ctx, dn.contID, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + }) + if err != nil { + return errors.Wrap(err, "cannot create docker logs") + } + + // The docker log output is not quite plaintext: each line has a + // prefix consisting of one byte file descriptor (stdout vs stderr), + // three bytes padding, four byte length. We could use this to + // disentangle stdout and stderr if we wanted to output them into + // separate streams, but we don't really care. + for { + var header uint64 + if err := binary.Read(rc, binary.BigEndian, &header); err == io.EOF { + break + } else if err != nil { + return err + } + size := header & math.MaxUint32 + if _, err := io.CopyN(out, rc, int64(size)); err != nil { + return err + } + } + + if err := rc.Close(); err != nil { + return errors.Wrap(err, "cannot close docker log") + } + + return nil +} + +// startContainer starts a container with given setting for environment +// variables, mounted volumes, and command to run. +func (dn *dockerNode) startContainer( + ctx context.Context, containerName string, envSetting []string, volSetting []string, cmd []string, +) error { + + containerConfig := container.Config{ + Hostname: containerName, + Image: imageName, + Env: envSetting, + ExposedPorts: nat.PortSet{hostPort: struct{}{}, cockroachPort: struct{}{}}, + Cmd: append(cmd, fmt.Sprintf("--listening-url-file=%s", listenURLFile)), + } + + hostConfig := container.HostConfig{ + Binds: volSetting, + PortBindings: map[nat.Port][]nat.PortBinding{ + nat.Port(hostPort): {{HostIP: hostIP, HostPort: hostPort}}, + nat.Port(cockroachPort): {{HostIP: hostIP, HostPort: cockroachPort}}, + }, + } + + resp, err := dn.cl.ContainerCreate( + ctx, + &containerConfig, + &hostConfig, + nil, + nil, + containerName, + ) + if err != nil { + return errors.Wrap(err, "cannot create container") + } + + dn.contID = resp.ID + + if err := dn.cl.ContainerStart(ctx, dn.contID, + types.ContainerStartOptions{}); err != nil { + return errors.Wrap(err, "cannot start container") + } + + return nil +} + +// removeAllContainers removes all running containers based on the cockroach-ci +// docker image by force. +func (dn *dockerNode) removeAllContainers(ctx context.Context) error { + filter := filters.NewArgs(filters.Arg("ancestor", imageName)) + conts, err := dn.cl.ContainerList(ctx, + types.ContainerListOptions{All: true, Filters: filter}) + if err != nil { + return errors.Wrapf( + err, + "cannot list all containers on docker image %s", + imageName, + ) + } + for _, cont := range conts { + err := dn.cl.ContainerRemove(ctx, cont.ID, + types.ContainerRemoveOptions{Force: true}) + if err != nil { + return errors.Wrapf(err, "cannot remove container %s", cont.Names) + } + } + return nil +} + +type execResult struct { + stdOut string + stdErr string + exitCode int +} + +// InspectExecResp inspects the result of a docker command execution, saves its +// stdout, stderr message and exit code to an execResult, and returns this +// execResult and a possible error. +func (dn *dockerNode) InspectExecResp(ctx context.Context, execID string) (execResult, error) { + var execRes execResult + resp, err := dn.cl.ContainerExecAttach(ctx, execID, types.ExecStartCheck{}) + if err != nil { + return execResult{}, err + } + defer resp.Close() + + var outBuf, errBuf bytes.Buffer + outputDone := make(chan error) + + go func() { + _, err = stdcopy.StdCopy(&outBuf, &errBuf, resp.Reader) + outputDone <- err + }() + + select { + case err := <-outputDone: + if err != nil { + return execRes, err + } + break + + case <-ctx.Done(): + return execRes, ctx.Err() + } + + stdout, err := ioutil.ReadAll(&outBuf) + if err != nil { + return execRes, err + } + stderr, err := ioutil.ReadAll(&errBuf) + if err != nil { + return execRes, err + } + + res, err := dn.cl.ContainerExecInspect(ctx, execID) + if err != nil { + return execRes, err + } + + execRes.exitCode = res.ExitCode + execRes.stdOut = string(stdout) + execRes.stdErr = string(stderr) + return execRes, nil +} + +// execCommand is to execute command in the current container, and returns the +// execution result and possible error. +func (dn *dockerNode) execCommand( + ctx context.Context, cmd []string, workingDir string, +) (*execResult, error) { + execID, err := dn.cl.ContainerExecCreate(ctx, dn.contID, types.ExecConfig{ + User: "root", + AttachStderr: true, + AttachStdout: true, + Tty: true, + Cmd: cmd, + WorkingDir: workingDir, + }) + + if err != nil { + return nil, errors.Wrapf( + err, + "cannot create command \"%s\"", + strings.Join(cmd, " "), + ) + } + + res, err := dn.InspectExecResp(ctx, execID.ID) + if err != nil { + return nil, errors.Wrapf( + err, + "cannot execute command \"%s\"", + strings.Join(cmd, " "), + ) + } + + if res.exitCode != 0 { + return &res, errors.Wrapf( + errors.Newf("%s", res.stdErr), + "command \"%s\" exit with code %d:\n %+v", + strings.Join(cmd, " "), + res.exitCode, + res, + ) + } + + return &res, nil +} + +// waitServerStarts waits till the server truly starts or timeout, whichever +// earlier. It keeps listening to the listenURLFile till it is closed and +// written. This is because in #70238, the improved init process for single-node +// server is to start the server, run the init process, and then restart the +// server. We mark it as fully started until the server is successfully +// restarted, and hence write the url to listenURLFile. +func (dn *dockerNode) waitServerStarts(ctx context.Context) error { + var res *execResult + var err error + + // Run the binary which listens to the /cockroach folder until the + // initialization process has finished or timeout. + res, err = dn.execCommand(ctx, []string{ + "./docker-fsnotify", + "/cockroach", + listenURLFile, + strconv.Itoa(int(serverStartTimeout.Seconds())), + }, "/cockroach") + if err != nil { + return errors.Wrapf(err, "cannot run fsnotify to listen to %s:\nres:%#v", listenURLFile, res) + } + + if strings.Contains(res.stdOut, "finished\r\n") { + return nil + } + return errors.Wrap(errors.Newf("%s", res), "error in waiting the server to start") +} + +// execSQLQuery executes the sql query and returns the server's output and +// possible error. +func (dn *dockerNode) execSQLQuery( + ctx context.Context, sqlQuery string, sqlQueryOpts []string, +) (*execResult, error) { + query := append([]string{cockroachEntrypoint, "sql", "-e", sqlQuery}, + sqlQueryOpts..., + ) + + res, err := dn.execCommand(ctx, query, "/cockroach") + if err != nil { + return nil, errors.Wrapf(err, "error executing query \"%s\"", sqlQuery) + } + + return res, nil +} + +//rmContainer performs a forced deletion of the current container. +func (dn *dockerNode) rmContainer(ctx context.Context) error { + if err := dn.cl.ContainerRemove(ctx, dn.contID, types.ContainerRemoveOptions{ + Force: true, + }); err != nil { + return errors.Wrapf(err, "cannot remove container %s", dn.contID) + } + dn.contID = "" + return nil +} + +// cleanQueryResult is to parse the result from a sql query to a cleaner format. +// e.g. +// "id,name\r\n1,a\r\n2,b\r\n3,c\r\n\r\n\r\nTime: 11ms\r\n\r\n" +// => "id,name\n1,a\n2,b\n3,c" +func cleanQueryResult(queryRes string) (string, error) { + formatted := strings.ReplaceAll(queryRes, "\r\n", "\n") + r := regexp.MustCompile(`([\s\S]+)\n{3}Time:.+`) + res := r.FindStringSubmatch(formatted) + if len(res) < 2 { + return "", errors.Wrapf(errors.Newf("%s", queryRes), "cannot parse the query result: %#v") + } + return res[1], nil + +} diff --git a/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test1.sql b/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test1.sql new file mode 100755 index 000000000000..79cb977873b7 --- /dev/null +++ b/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test1.sql @@ -0,0 +1,8 @@ +USE mydb; + +CREATE TABLE bello ( + id INT UNIQUE, + name varchar(12) +); + +INSERT INTO bello (id, name) values (1, 'a'), (2, 'b'), (3, 'c'); diff --git a/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test2.sql b/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test2.sql new file mode 100755 index 000000000000..c47d942bffd3 --- /dev/null +++ b/pkg/testutils/docker/testdata/single-node-test/docker-entrypoint-initdb.d/test2.sql @@ -0,0 +1,7 @@ +CREATE TABLE donut ( + id INT UNIQUE, + name varchar(12) +); + + +INSERT INTO donut (id, name) values (1, 'a'), (2, 'b'), (3, 'c');