Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72586: bazel, docker: run docker image tests, and increase memory allocation to docker image in CI r=ZhouXing19 a=ZhouXing19

This PR is to add a test infrastructure for docker image in TeamCity using
bazel, and also increase the allocated memory to the `cockroach-ci` 
docker image in the CI.

It downloads the docker image tar from an upstream build configuration
(Build Docker Image), loads the docker image from the tar,
builds a docker container based on it, and runs SQL queries inside that
container.

Release note: None

76243: server: Fix regression on /jobs/:id page. r=benbardin a=benbardin

Job details page is currently broken on master. Looks like the bug was introduced in 931957c. I added a test to protect against this going forward!

Release note: None

76540: ci,dev: allow injecting cockroach binary into `compose` tests r=rail a=rickystewart

Same sort of thing we do in `acceptance`: build the `cockroach` and
`compare_test` binaries ahead of time and pass the locations to the
binaries as flags.

Release note: None

76605: changefeedccl: periodic pts record updates r=samiskin a=samiskin

Previously changefeeds only laid down protected timestamp records to
protect against either an ongoing backfill or the changefeed lagging
behind.  This is insufficient in cases such as if the gcttl is very
short, recurring errors retry the changefeed for too long, or in
upcoming work to enable serverless to shut down idle changefeeds.

This PR removes the manual PTS protection on backfills and begins an
async routine on the changeFrontier that updates the protected timestamp
record to the current highwater mark.

Fixes #76247

Release note (enterprise change): changefeeds running on tables with a
low gcttl will function more reliably due to protected timestamps being
maintained for the changefeed targets at the resolved timestamp of the
changefeed.  The frequency at which the protected timestamp is updated
to the resolved timestamp can be configured through the
`changefeed.protect_timestamp_interval` cluster setting. If the
changefeed lags too far behind such that storage of old data becomes an
issue, cancelling the changefeed will release the protected timestamps
and allow garbage collection to resume. If
`protect_data_from_gc_on_pause` is unset, pausing the changefeed will
release the existing protected timestamp record.


76641: sql: only create a LeafTxn for local flows if Streamer is enabled r=yuzefovich a=yuzefovich

Previously, during the flow setup stage we would always choose to create
a LeafTxn for local flows if there is a processor that might use
a Streamer API. This was the case even when the Streamer is disabled by
a cluster setting. Such behavior is a regression (since leaf txns don't
have a transparent span refresh mechanism and read spans have to be
collected with the metadata at the end of the execution), so this commit
fixes things by only using a LeafTxn if the Streamer is actually
enabled. Now, the users of the Streamer API are expected to check that
they do have a LeafTxn and only use the Streamer if so.

Additionally, this commit correctly populates `HasConcurrency` field for
the local flows even when the query might be distributed. The fix is
needed since the physical planning decision to distribute the query or
not is finalized after we decide which txn to use for the flow, which is
too late. Thus, whenever the Streamer is enabled and we find an index
join in the plan, we will make sure to use the LeafTxn (this was already
the case if the plan ended up being distributed).

Release note: None

76716: tenantsettings: delete overrides when removing tenant r=RaduBerinde a=RaduBerinde

Release note: None

Co-authored-by: Jane Xing <[email protected]>
Co-authored-by: Ben Bardin <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
7 people committed Feb 18, 2022
7 parents 4b5277f + f894ce0 + a0a5bd0 + afbc568 + 54203d6 + 7b0ef97 + e81c2e7 commit 44d2361
Show file tree
Hide file tree
Showing 32 changed files with 1,048 additions and 246 deletions.
7 changes: 2 additions & 5 deletions build/teamcity/cockroach/ci/builds/build_docker_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ docker_tag="cockroachdb/cockroach-ci"
docker build \
--no-cache \
--tag="$docker_tag" \
--memory 30g \
--memory-swap -1 \
build/deploy

docker save "$docker_tag" | gzip > "${artifacts}/${docker_image_tar_name}".gz

cp upstream_artifacts/cockroach "${artifacts}"/cockroach

docker_fsnotify_dir="$(dirname "${0}")/docker-fsnotify"
cd $docker_fsnotify_dir && go build
cp ./docker-fsnotify "${artifacts}"/docker-fsnotify

tc_end_block "Build and save docker image to artifacts"

15 changes: 0 additions & 15 deletions build/teamcity/cockroach/ci/builds/docker-fsnotify/BUILD.bazel

This file was deleted.

17 changes: 17 additions & 0 deletions build/teamcity/cockroach/ci/tests/docker_image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env bash

set -euo pipefail

dir="$(dirname $(dirname $(dirname $(dirname $(dirname "${0}")))))"
source "$dir/teamcity-support.sh"

tc_prepare

tc_start_block "Run docker image tests"

bazel run \
//pkg/testutils/docker:docker_test \
--config=crosslinux --config=test \
--test_timeout=3000

tc_end_block "Run docker image tests"
9 changes: 8 additions & 1 deletion build/teamcity/cockroach/nightlies/compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ tc_start_block "Run compose tests"
bazel build //pkg/cmd/bazci --config=ci
BAZCI=$(bazel info bazel-bin --config=ci)/pkg/cmd/bazci/bazci_/bazci

$BAZCI run --config=crosslinux --config=test --config=with_ui --artifacts_dir=$PWD/artifacts \
bazel build //pkg/cmd/cockroach //pkg/compose/compare/compare:compare_test --config=ci --config=crosslinux --config=test --config=with_ui
CROSSBIN=$(bazel info bazel-bin --config=ci --config=crosslinux --config=test --config=with_ui)
COCKROACH=$CROSSBIN/pkg/cmd/cockroach/cockroach_/cockroach
COMPAREBIN=$(bazel run //pkg/compose/compare/compare:compare_test --config=ci --config=crosslinux --config=test --config=with_ui --run_under=realpath | grep '^/' | tail -n1)

$BAZCI run --config=ci --config=test --artifacts_dir=$PWD/artifacts \
//pkg/compose:compose_test -- \
--test_env=GO_TEST_WRAP_TESTV=1 \
--test_arg -cockroach --test_arg $COCKROACH \
--test_arg -compare --test_arg $COMPAREBIN \
--test_timeout=1800

tc_end_block "Run compose tests"
23 changes: 9 additions & 14 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

const (
Expand All @@ -46,32 +43,30 @@ func emitResolvedTimestamp(
return nil
}

func shouldProtectTimestamps(codec keys.SQLCodec) bool {
// TODO(smiskin): Remove this restriction once tenant based pts are enabled
return codec.ForSystemTenant()
}

// createProtectedTimestampRecord will create a record to protect the spans for
// this changefeed at the resolved timestamp. The progress struct will be
// updated to refer to this new protected timestamp record.
func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
pts protectedts.Storage,
txn *kv.Txn,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) error {
if !codec.ForSystemTenant() {
return errors.AssertionFailedf("createProtectedTimestampRecord called on tenant-based changefeed")
}

) *ptpb.Record {
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
rec := jobsprotectedts.MakeRecord(

log.VEventf(ctx, 2, "creating protected timestamp %v at %v", progress.ProtectedTimestampRecord, resolved)
return jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
Expand Down
139 changes: 40 additions & 99 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -913,6 +912,10 @@ type changeFrontier struct {
// slowLogEveryN rate-limits the logging of slow spans
slowLogEveryN log.EveryN

// lastProtectedTimestampUpdate is the last time the protected timestamp
// record was updated to the frontier's highwater mark
lastProtectedTimestampUpdate time.Time

// js, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
js *jobState
Expand Down Expand Up @@ -1219,13 +1222,6 @@ func (cf *changeFrontier) closeMetrics() {
cf.metrics.mu.Unlock()
}

// shouldProtectBoundaries checks the job's spec to determine whether it should
// install protected timestamps when encountering scan boundaries.
func (cf *changeFrontier) shouldProtectBoundaries() bool {
policy := changefeedbase.SchemaChangePolicy(cf.spec.Feed.Opts[changefeedbase.OptSchemaChangePolicy])
return policy == changefeedbase.OptSchemaChangePolicyBackfill
}

// Next is part of the RowSource interface.
func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for cf.State == execinfra.StateRunning {
Expand Down Expand Up @@ -1317,7 +1313,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
return err
}

isBehind := cf.maybeLogBehindSpan(frontierChanged)
cf.maybeLogBehindSpan(frontierChanged)

// If frontier changed, we emit resolved timestamp.
emitResolved := frontierChanged
Expand All @@ -1327,7 +1323,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// have no distributed state whatsoever. Because of this they also do not
// use protected timestamps.
if cf.js != nil {
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged, isBehind)
checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
return err
}
Expand All @@ -1354,7 +1350,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
}

func (cf *changeFrontier) maybeCheckpointJob(
resolvedSpan jobspb.ResolvedSpan, frontierChanged, isBehind bool,
resolvedSpan jobspb.ResolvedSpan, frontierChanged bool,
) (bool, error) {
// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
Expand All @@ -1376,11 +1372,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
!inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged))

if updateCheckpoint || updateHighWater {
manageProtected := updateHighWater
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(
cf.frontier.Frontier(), manageProtected, checkpoint, isBehind,
); err != nil {
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
return false, err
}
cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart))
Expand All @@ -1390,16 +1383,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
return false, nil
}

// checkpointJobProgress checkpoints a changefeed-level job information.
// In addition, if 'manageProtected' is true, which only happens when frontier advanced,
// this method manages the protected timestamp state.
// The isBehind argument is used to determine whether an existing protected timestamp
// should be released.
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp,
manageProtected bool,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
isBehind bool,
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (err error) {
updateRunStatus := timeutil.Since(cf.js.lastRunStatusUpdate) > runStatusUpdateFrequency
if updateRunStatus {
Expand All @@ -1420,16 +1405,15 @@ func (cf *changeFrontier) checkpointJobProgress(
HighWater: &frontier,
}

// Manage protected timestamps.
changefeedProgress := progress.Details.(*jobspb.Progress_Changefeed).Changefeed
if manageProtected {
if err := cf.manageProtectedTimestamps(cf.Ctx, changefeedProgress, txn, frontier, isBehind); err != nil {
return err
changefeedProgress.Checkpoint = &checkpoint

if shouldProtectTimestamps(cf.flowCtx.Codec()) {
if err := cf.manageProtectedTimestamps(cf.Ctx, txn, changefeedProgress); err != nil {
log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err)
}
}

changefeedProgress.Checkpoint = &checkpoint

if updateRunStatus {
md.Progress.RunningStatus = fmt.Sprintf("running: resolved=%s", frontier)
}
Expand All @@ -1448,77 +1432,40 @@ func (cf *changeFrontier) checkpointJobProgress(
})
}

// manageProtectedTimestamps is called when the resolved timestamp is being
// checkpointed. The changeFrontier always checkpoints resolved timestamps
// which occur at scan boundaries. It releases previously protected timestamps
// if the changefeed is not behind. See maybeLogBehindSpan for details on the
// behind calculation.
//
// Note that this function is never called for sinkless changefeeds as they have
// no corresponding job and thus no corresponding distributed state on which to
// attach protected timestamp information.
//
// TODO(ajwerner): Adopt protected timestamps for sinkless changefeeds,
// perhaps by using whatever mechanism is eventually built to protect
// data for long-running SQL transactions. There's some discussion of this
// use case in the protected timestamps RFC.
// manageProtectedTimestamps periodically advances the protected timestamp for
// the changefeed's targets to the current highwater mark. The record is
// cleared during changefeedResumer.OnFailOrCancel
func (cf *changeFrontier) manageProtectedTimestamps(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
txn *kv.Txn,
resolved hlc.Timestamp,
isBehind bool,
ctx context.Context, txn *kv.Txn, progress *jobspb.ChangefeedProgress,
) error {
pts := cf.flowCtx.Cfg.ProtectedTimestampProvider
if err := cf.maybeReleaseProtectedTimestamp(ctx, progress, pts, txn, isBehind); err != nil {
return err
}
return cf.maybeProtectTimestamp(ctx, progress, pts, txn, resolved)
}

// maybeReleaseProtectedTimestamp will release the current protected timestamp
// if either the resolved timestamp is close to the present or we've reached
// a new schemaChangeBoundary which will be protected.
func (cf *changeFrontier) maybeReleaseProtectedTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
isBehind bool,
) error {
if progress.ProtectedTimestampRecord == uuid.Nil {
return nil
}
if !cf.frontier.schemaChangeBoundaryReached() && isBehind {
log.VEventf(ctx, 2, "not releasing protected timestamp because changefeed is behind")
ptsUpdateInterval := changefeedbase.ProtectTimestampInterval.Get(&cf.flowCtx.Cfg.Settings.SV)
if timeutil.Since(cf.lastProtectedTimestampUpdate) < ptsUpdateInterval {
return nil
}
log.VEventf(ctx, 2, "releasing protected timestamp %v",
progress.ProtectedTimestampRecord)
if err := pts.Release(ctx, txn, progress.ProtectedTimestampRecord); err != nil {
return err
cf.lastProtectedTimestampUpdate = timeutil.Now()

pts := cf.flowCtx.Cfg.ProtectedTimestampProvider

// Create / advance the protected timestamp record to the highwater mark
highWater := cf.frontier.Frontier()
if highWater.Less(cf.highWaterAtStart) {
highWater = cf.highWaterAtStart
}
progress.ProtectedTimestampRecord = uuid.Nil
return nil
}

// maybeProtectTimestamp creates a new protected timestamp when the
// changeFrontier reaches a scanBoundary and the schemaChangePolicy indicates
// that we should perform a backfill (see cf.shouldProtectBoundaries()).
func (cf *changeFrontier) maybeProtectTimestamp(
ctx context.Context,
progress *jobspb.ChangefeedProgress,
pts protectedts.Storage,
txn *kv.Txn,
resolved hlc.Timestamp,
) error {
if cf.isSinkless() || cf.isTenant() || !cf.frontier.schemaChangeBoundaryReached() || !cf.shouldProtectBoundaries() {
return nil
recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
} else {
log.VEventf(ctx, 2, "updating protected timestamp %v at %v", recordID, highWater)
if err := pts.UpdateTimestamp(ctx, txn, recordID, highWater); err != nil {
return err
}
}

jobID := cf.spec.JobID
targets := cf.spec.Feed.Targets
return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress)
return nil
}

func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error {
Expand Down Expand Up @@ -1600,12 +1547,6 @@ func (cf *changeFrontier) isSinkless() bool {
return cf.spec.JobID == 0
}

// isTenant() bool returns true if this changeFrontier is running on a
// tenant.
func (cf *changeFrontier) isTenant() bool {
return !cf.flowCtx.Codec().ForSystemTenant()
}

// type to make embedding span.Frontier in schemaChangeFrontier convenient.
type spanFrontier struct {
*span.Frontier
Expand Down
Loading

0 comments on commit 44d2361

Please sign in to comment.