Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84208: Update activerecord.go v7 support r=rafiss a=gemma-shay

cockroachdb/docs#14485
[DOC-4878](https://cockroachlabs.atlassian.net/browse/DOC-4878)
cc `@kernfeld-cockroach` 

84214: storage: handle range keys in readAsOfIterator r=erikgrinaker a=msbutler

Previously, the readAsOfIterator used in RESTORE could not handle range keys.
This PR implements the new SimpleMVCCIterator methods that handle range keys.
Further, this patch ensures the  readAsOfIterator skips over point keys
shadowed by range keys  at or below the caller's specified asOf timestamp.

Next, Backup needs to be tought about RangeKeys.

Informs #71155

Release note: none

84323: tracing: delete old field r=andreimatei a=andreimatei

The RecordedSpan.RedactableLogs. This field was unused since 22.1.

Release note: None

84742: sql/distsql: delete unused lazyInternalExecutor r=andreimatei a=andreimatei

Release note: None

84784: sql/sqlinstance/instancestorage: use CommitInBatch to optimize round-… r=ajwerner a=ajwerner

…trips

By using CommitInBatch we can hit the 1PC optimization and avoid a round-trip
to the leaseholder of the range in question.

Release note: None

84849: opt: clarify plan gist comment r=mgartner a=mgartner

Release note: None

84851: sql: fix recent leak of a context r=yuzefovich a=yuzefovich

Fixes: #84801.

Release note: None

84853: ccl/streamingccl/streamingest: skip TestTenantStreamingPauseResumeIngestion r=yuzefovich a=yuzefovich

Refs: #84414

Reason: flaky test

Generated by bin/skip-test.

Release justification: non-production code changes

Release note: None

84862: logcrash: fix test on arm r=dt a=dt

Release note: none.

Co-authored-by: Gemma Shay <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
8 people committed Jul 21, 2022
10 parents 9c263df + d270fa0 + 341a77f + 221ac55 + 0d036d4 + 066edbe + f61c7db + a4e457e + 3f744ce + 617c32a commit 79edfce
Show file tree
Hide file tree
Showing 15 changed files with 308 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) {

func TestTenantStreamingPauseResumeIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 84414, "flaky test")
defer log.Scope(t).Close(t)

// TODO(casper): now this has the same race issue with
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/activerecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

var activerecordResultRegex = regexp.MustCompile(`^(?P<test>[^\s]+#[^\s]+) = (?P<timing>\d+\.\d+ s) = (?P<result>.)$`)
var railsReleaseTagRegex = regexp.MustCompile(`^v(?P<major>\d+)\.(?P<minor>\d+)\.(?P<point>\d+)\.?(?P<subpoint>\d*)$`)
var supportedRailsVersion = "6.1.6"
var activerecordAdapterVersion = "v6.1.10"
var supportedRailsVersion = "7.0.3"
var activerecordAdapterVersion = "v7.0.0"

// This test runs activerecord's full test suite against a single cockroach node.

Expand Down
10 changes: 2 additions & 8 deletions pkg/server/node_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package server
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

Expand All @@ -22,7 +21,8 @@ const TraceRedactedMarker = redact.RedactableString("verbose trace message redac

// redactRecordingForTenant redacts the sensitive parts of log messages in the
// recording if the tenant to which this recording is intended is not the system
// tenant (the system tenant gets an. See https://github.com/cockroachdb/cockroach/issues/70407.
// tenant (the system tenant gets an unredacted trace).
// See https://github.com/cockroachdb/cockroach/issues/70407.
// The recording is modified in place.
//
// tenID is the tenant that will receive this recording.
Expand All @@ -36,12 +36,6 @@ func redactRecordingForTenant(tenID roachpb.TenantID, rec tracingpb.Recording) e
sp.TagGroups = nil
for j := range sp.Logs {
record := &sp.Logs[j]
if record.Message != "" && !sp.RedactableLogs {
// If Message is set, the record should have been produced by a 22.1
// node that also sets RedactableLogs.
return errors.AssertionFailedf(
"recording has non-redactable span with the Message field set: %s", sp)
}
record.Message = record.Message.Redact()
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func TestRedactRecordingForTenant(t *testing.T) {
TagGroups []tracingpb.TagGroup
StartTime time.Time
Duration time.Duration
RedactableLogs bool
Logs []tracingpb.LogRecord
Verbose bool
RecordingMode tracingpb.RecordingMode
Expand Down
24 changes: 13 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,6 @@ func (ex *connExecutor) execStmtInOpenState(
ast := parserStmt.AST
ctx = withStatement(ctx, ast)

var cancelQuery context.CancelFunc
ctx, cancelQuery = contextutil.WithCancel(ctx)

makeErrEvent := func(err error) (fsm.Event, fsm.EventPayload, error) {
ev, payload := ex.makeErrEvent(err, ast)
return ev, payload, nil
Expand Down Expand Up @@ -327,10 +324,9 @@ func (ex *connExecutor) execStmtInOpenState(
ex.planner.EvalContext().Placeholders = pinfo
}

var cancelQuery context.CancelFunc
ctx, cancelQuery = contextutil.WithCancel(ctx)
ex.addActiveQuery(ast, formatWithPlaceholders(ast, ex.planner.EvalContext()), queryID, cancelQuery)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
}

// Make sure that we always unregister the query. It also deals with
// overwriting res.Error to a more user-friendly message in case of query
Expand All @@ -343,10 +339,6 @@ func (ex *connExecutor) execStmtInOpenState(
<-doneAfterFunc
}
}
ex.removeActiveQuery(queryID, ast)
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
}

// Detect context cancelation and overwrite whatever error might have been
// set on the result before. The idea is that once the query's context is
Expand All @@ -365,6 +357,12 @@ func (ex *connExecutor) execStmtInOpenState(
retPayload = eventNonRetriableErrPayload{err: cancelchecker.QueryCanceledError}
}

ex.removeActiveQuery(queryID, ast)
cancelQuery()
if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Dec(1)
}

// If the query timed out, we intercept the error, payload, and event here
// for the same reasons we intercept them for canceled queries above.
// Overriding queries with a QueryTimedOut error needs to happen after
Expand All @@ -386,6 +384,10 @@ func (ex *connExecutor) execStmtInOpenState(
}
}(ctx, res)

if ex.executorType != executorTypeInternal {
ex.metrics.EngineMetrics.SQLActiveStatements.Inc(1)
}

p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
Expand Down Expand Up @@ -505,7 +507,7 @@ func (ex *connExecutor) execStmtInOpenState(
timeoutTicker = time.AfterFunc(
timerDuration,
func() {
ex.cancelQuery(queryID)
cancelQuery()
queryTimedOut = true
doneAfterFunc <- struct{}{}
})
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/mon",
Expand Down
41 changes: 0 additions & 41 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package distsql
import (
"context"
"io"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -34,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -683,42 +681,3 @@ func (ds *ServerImpl) FlowStream(stream execinfrapb.DistSQL_FlowStreamServer) er
}
return err
}

// lazyInternalExecutor is a tree.InternalExecutor that initializes
// itself only on the first call to QueryRow.
type lazyInternalExecutor struct {
// Set when an internal executor has been initialized.
sqlutil.InternalExecutor

// Used for initializing the internal executor exactly once.
once sync.Once

// newInternalExecutor must be set when instantiating a lazyInternalExecutor,
// it provides an internal executor to use when necessary.
newInternalExecutor func() sqlutil.InternalExecutor
}

var _ sqlutil.InternalExecutor = &lazyInternalExecutor{}

func (ie *lazyInternalExecutor) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
opts sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
ie.once.Do(func() {
ie.InternalExecutor = ie.newInternalExecutor()
})
return ie.InternalExecutor.QueryRowEx(ctx, opName, txn, opts, stmt, qargs...)
}

func (ie *lazyInternalExecutor) QueryRow(
ctx context.Context, opName string, txn *kv.Txn, stmt string, qargs ...interface{},
) (tree.Datums, error) {
ie.once.Do(func() {
ie.InternalExecutor = ie.newInternalExecutor()
})
return ie.InternalExecutor.QueryRow(ctx, opName, txn, stmt, qargs...)
}
12 changes: 8 additions & 4 deletions pkg/sql/opt/exec/explain/plan_gist_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import (

func init() {
if numOperators != 58 {
// If this error occurs please make sure the new op is the last one in order
// to not invalidate existing plan gists/hashes. If we are just adding an
// operator at the end there's no need to update version below and we can
// just bump the hardcoded literal here.
// This error occurs when an operator has been added or removed in
// pkg/sql/opt/exec/explain/factory.opt. If an operator is added at the
// end of factory.opt, simply adjust the hardcoded value above. If an
// operator is removed or added anywhere else in factory.opt, increment
// gistVersion below. Note that we currently do not have a mechanism for
// decoding gists of older versions. This means that if gistVersion is
// incremented in a release, upgrading a cluster to that release will
// cause decoding errors for any previously generated plan gists.
panic(errors.AssertionFailedf("Operator field changed (%d), please update check and consider incrementing version", numOperators))
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/sqlinstance/instancestorage/instancestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (s *Storage) CreateInstance(
log.Warningf(ctx, "failed to encode row for instance id %d: %v", instanceID, err)
return err
}
return txn.Put(ctx, row.Key, row.Value)
b := txn.NewBatch()
b.Put(row.Key, row.Value)
return txn.CommitInBatch(ctx, b)
})

if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ var commands = map[string]cmd{

"iter_new": {typReadOnly, cmdIterNew},
"iter_new_incremental": {typReadOnly, cmdIterNewIncremental}, // MVCCIncrementalIterator
"iter_new_read_as_of": {typReadOnly, cmdIterNewReadAsOf}, // readAsOfIterator
"iter_seek_ge": {typReadOnly, cmdIterSeekGE},
"iter_seek_lt": {typReadOnly, cmdIterSeekLT},
"iter_seek_intent_ge": {typReadOnly, cmdIterSeekIntentGE},
Expand Down Expand Up @@ -1393,6 +1394,29 @@ func cmdIterNewIncremental(e *evalCtx) error {
return nil
}

func cmdIterNewReadAsOf(e *evalCtx) error {
if e.iter != nil {
e.iter.Close()
}
var asOf hlc.Timestamp
if e.hasArg("asOfTs") {
asOf = e.getTsWithName("asOfTs")
}
opts := IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
RangeKeyMaskingBelow: asOf}
if e.hasArg("k") {
opts.LowerBound, opts.UpperBound = e.getKeyRange()
}
if len(opts.UpperBound) == 0 {
opts.UpperBound = keys.MaxKey
}
r, closer := metamorphicReader(e, "iter-reader")
iter := &iterWithCloser{r.NewMVCCIterator(MVCCKeyIterKind, opts), closer}
e.iter = NewReadAsOfIterator(iter, asOf)
return nil
}

func cmdIterSeekGE(e *evalCtx) error {
key := e.getKey()
ts := e.getTs(nil)
Expand Down
Loading

0 comments on commit 79edfce

Please sign in to comment.