Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119933: kv: add ability to verify pipelined replicated shared/exclusive locks  r=nvanbenschoten a=arulajmani

Previously, QueryIntent requests were only used to verify whether an
intent was successfully evaluated and replicated. This patch extends
QueryIntent request to also be able to verify whether a pipelined
shared or exclusive lock was successfully replicated or not.

Informs #117978

Release note: None

120787: ui: use max aggregator for commit latency on changefeed dashboard r=rharding6373 a=rharding6373

Previously, the commit latency in the changefeed dashboard in the db console would be aggregated by sum across all nodes. This was confusing for users who might see unexpectedly high commit latency.

In this change, we use max aggregation for the commit latency so that users see the max commit latency from all the nodes instead of the sum. This provides more useful observability into changefeed behavior.

Fixes: #119246
Fixes: #112947
Epic: None

Release note (ui change): The "Commit Latency" chart in the changefeed dashboard now aggregates by max instead of by sum for multi-node changefeeds. This more accurately reflects the amount of time for events to be acknowledged by the downstream sink.

121217: backupccl: fix data race with admission pacer r=msbutler a=aadityasondhi

We now use one pacer per fileSSTSink.

Fixes #121199.
Fixes #121202.
Fixes #121201.
Fixes #121200.
Fixes #121198.
Fixes #121197.
Fixes #121196.
Fixes #121195.
Fixes #121194.
Fixes #121193.
Fixes #121192.
Fixes #121191.
Fixes #121190.
Fixes #121189.
Fixes #121188.
Fixes #121187.

Release note: None

121222: optbuilder: fix recently introduced nil pointer in error case r=yuzefovich a=yuzefovich

This commit fixes a recently introduced nil pointer internal error when attempting to CALL not a procedure that is specified not by its name. `ResolvableFunctionReference` might not have `ReferenceByName`, so this commit switches to using `FunctionReference` that is always set.

Fixes: #121095.

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: rharding6373 <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
5 people committed Mar 27, 2024
5 parents 40cc14f + 5cb2264 + 072fd4e + 1a26361 + 7103eb9 commit 3e488c5
Show file tree
Hide file tree
Showing 17 changed files with 558 additions and 100 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,4 @@ trace.snapshot.rate duration 0s if non-zero, interval at which background trace
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez application
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000023.2-upgrading-to-1000024.1-step-022 set the active cluster version in the format '<major>.<minor>' application
version version 1000023.2-upgrading-to-1000024.1-step-024 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,6 @@
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-022</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.2-upgrading-to-1000024.1-step-024</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
40 changes: 20 additions & 20 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,29 +462,29 @@ func runBackupProcessor(
if len(chunk) > 0 {
todo <- chunk
}
return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error {
readTime := spec.BackupEndTime.GoTime()

// Passing a nil pacer is effectively a noop if CPU control is disabled.
var pacer *admission.Pacer = nil
if fileSSTSinkElasticCPUControlEnabled.Get(&clusterSettings.SV) {
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
// Passing a nil pacer is effectively a noop if CPU control is disabled.
var pacer *admission.Pacer = nil
if fileSSTSinkElasticCPUControlEnabled.Get(&clusterSettings.SV) {
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
}
pacer = flowCtx.Cfg.AdmissionPacerFactory.NewPacer(
100*time.Millisecond,
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
},
)
}
pacer = flowCtx.Cfg.AdmissionPacerFactory.NewPacer(
100*time.Millisecond,
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
},
)
}
// It is safe to close a nil pacer.
defer pacer.Close()
// It is safe to close a nil pacer.
defer pacer.Close()

return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error {
readTime := spec.BackupEndTime.GoTime()
sink := makeFileSSTSink(sinkConf, storage, pacer)
defer func() {
if err := sink.flush(ctx); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ const (
// splits.
V24_1_EstimatedMVCCStatsInSplit

// V24_1_ReplicatedLockPipelining allows exclusive and shared replicated locks
// to be pipelined.
V24_1_ReplicatedLockPipelining

numKeys
)

Expand Down Expand Up @@ -372,6 +376,7 @@ var versionTable = [numKeys]roachpb.Version{
V24_1_SystemDatabaseSurvivability: {Major: 23, Minor: 2, Internal: 18},
V24_1_GossipMaximumIOOverload: {Major: 23, Minor: 2, Internal: 20},
V24_1_EstimatedMVCCStatsInSplit: {Major: 23, Minor: 2, Internal: 22},
V24_1_ReplicatedLockPipelining: {Major: 23, Minor: 2, Internal: 24},
}

// Latest is always the highest version key. This is the maximum logical cluster
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,21 @@ func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange {
return sl
}

// StrengthOrDefault returns the strength of the lock being queried by the
// QueryIntentRequest.
func (qir *QueryIntentRequest) StrengthOrDefault() lock.Strength {
// TODO(arul): the Strength field on QueryIntentRequest was introduced in
// v24.1. Prior to that, rather unsurprisingly, QueryIntentRequest would only
// query replicated locks with strength. To maintain compatibility between
// v23.2 <-> v24.1 nodes, if this field is unset, we assume it's lock.Intent.
// In the future, once compatibility with v23.2 is no longer a concern, we
// should be able to get rid of this logic.
if qir.Strength == lock.None {
return lock.Intent
}
return qir.Strength
}

// AsLockUpdate creates a lock update message corresponding to the given resolve
// intent request.
func (rir *ResolveIntentRequest) AsLockUpdate() roachpb.LockUpdate {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,19 @@ message QueryIntentRequest {
// If true, return an IntentMissingError if no matching intent (neither a
// "partial match" nor a "full match") is found.
bool error_if_missing = 3;

// The strength with which the lock being queried was acquired at. To ensure
// the supplied protection was provided, we check whether the lock was held
// with the supplied lock strength or something stronger at the sequence
// number.
kv.kvserver.concurrency.lock.Strength strength = 4;

// The list of sequence numbers that have been ignored by the transaction that
// acquired the lock. Any locks found at sequence numbers which are considered
// ignored will be treated as "not found"; that's because they can be removed
// at any time.
repeated storage.enginepb.IgnoredSeqNumRange ignored_seqnums = 5
[(gogoproto.nullable) = false, (gogoproto.customname) = "IgnoredSeqNums"];
}

// A QueryIntentResponse is the return value from the QueryIntent() method.
Expand Down
123 changes: 81 additions & 42 deletions pkg/kv/kvserver/batcheval/cmd_query_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand All @@ -28,17 +32,27 @@ func init() {
}

func declareKeysQueryIntent(
_ ImmutableRangeState,
rs ImmutableRangeState,
_ *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
_ *lockspanset.LockSpanSet,
_ time.Duration,
) error {
// QueryIntent requests read the specified keys at the maximum timestamp in
// order to read any intent present, if one exists, regardless of the
// timestamp it was written at.
// QueryIntent requests acquire a non-MVCC latch in order to read the queried
// lock, if one exists, regardless of the time it was written at. This
// isolates them from in-flight intent writes and exclusive lock acquisitions
// they're trying to query.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span())
// They also acquire a read latch on the per-transaction local key that all
// replicated shared lock acquisitions acquire latches on. This isolates them
// from the in-flight shared lock acquisition they're trying to query.
//
// TODO(arul): add a test.
txnID := req.(*kvpb.QueryIntentRequest).Txn.ID
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), txnID),
})
return nil
}

Expand Down Expand Up @@ -74,53 +88,78 @@ func QueryIntent(
h.Timestamp, args.Txn.WriteTimestamp)
}

// Read from the lock table to see if an intent exists.
intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
if enginepb.TxnSeqIsIgnored(args.Txn.Sequence, args.IgnoredSeqNums) {
return result.Result{}, errors.AssertionFailedf(
"QueryIntent request for lock at sequence number %d but sequence number is ignored %v",
args.Txn.Sequence, args.IgnoredSeqNums,
)
}

reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)
var intent *roachpb.Intent

if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)
// Intents have special handling because there's an associated timestamp
// component with them.
if args.StrengthOrDefault() == lock.Intent {
// Read from the lock table to see if an intent exists.
var err error
intent, err = storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
}

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)

if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)

if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
}
}
}
} else {
log.VEventf(ctx, 2, "found no intent")
}
} else {
log.VEventf(ctx, 2, "found no intent")
found, err := storage.VerifyLock(
ctx, reader, &args.Txn, args.Strength, args.Key, args.IgnoredSeqNums,
)
if err != nil {
return result.Result{}, err
}
if found {
reply.FoundIntent = true
reply.FoundUnpushedIntent = true
}
}

if !reply.FoundIntent && args.ErrorIfMissing {
Expand Down
45 changes: 30 additions & 15 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,36 @@ func (r *Replica) updateTimestampCache(
}
addToTSCache(start, end, ts, txnID)
case *kvpb.QueryIntentRequest:
missing := false
if pErr != nil {
_, missing = pErr.GetDetail().(*kvpb.IntentMissingError)
} else {
missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent
}
if missing {
// If the QueryIntent determined that the intent is missing
// then we update the timestamp cache at the intent's key to
// the intent's transactional timestamp. This will prevent
// the intent from ever being written in the future. We use
// an empty transaction ID so that we block the intent
// regardless of whether it is part of the current batch's
// transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
// NB: We only need to bump the timestamp cache if the QueryIntentRequest
// was querying a write intent and if it wasn't found. This prevents the
// intent from ever being written in the future. This is done for the
// benefit of txn recovery, where we don't want an intent to land after a
// QueryIntent request has already evaluated and determined the fate of
// the transaction being recovered. Letting the intent land would cause us
// to commit a transaction that we've determined was aborted.
//
// However, for other replicated locks (shared, exclusive), we know that
// they'll never be pipelined if they belong to a batch that's being
// committed in parallel. This means that any QueryIntent request for a
// replicated shared or exclusive lock is doing so with the knowledge that
// the request evaluated successfully (so it can't land later) -- it's
// only checking whether the replication succeeded or not.
if t.StrengthOrDefault() == lock.Intent {
missing := false
if pErr != nil {
_, missing = pErr.GetDetail().(*kvpb.IntentMissingError)
} else {
missing = !resp.(*kvpb.QueryIntentResponse).FoundUnpushedIntent
}
if missing {
// If the QueryIntent determined that the intent is missing then we
// update the timestamp cache at the intent's key to the intent's
// transactional timestamp. This will prevent the intent from ever
// being written in the future. We use an empty transaction ID so that
// we block the intent regardless of whether it is part of the current
// batch's transaction or not.
addToTSCache(start, end, t.Txn.WriteTimestamp, uuid.UUID{})
}
}
case *kvpb.ResolveIntentRequest:
// Update the timestamp cache on the key the request resolved if there
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/txnrecovery/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/txnrecovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -206,6 +207,9 @@ func (m *manager) resolveIndeterminateCommitForTxnProbe(
Key: w.Key,
},
Txn: meta,
// TODO(nvanbenschoten): pass in the correct lock strength here.
Strength: lock.Intent,
IgnoredSeqNums: txn.IgnoredSeqNums,
})
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/procedure
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ CALL first_value(1);
statement error pgcode 42809 addgeometrycolumn is not a procedure
CALL addgeometrycolumn(null, null, null, null, null);

let $funcOID
SELECT oid FROM pg_proc WHERE proname = 'count_rows';

statement error pgcode 42809 count_rows is not a procedure
CALL [ FUNCTION $funcOID ] ();

statement ok
CREATE PROCEDURE p_inner(OUT param INTEGER) AS $$ SELECT 1; $$ LANGUAGE SQL;

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *Builder) buildProcedure(c *tree.Call, inScope *scope) *scope {
f, ok := typedExpr.(*tree.FuncExpr)
if !ok {
panic(pgerror.Newf(pgcode.WrongObjectType,
"%s is not a procedure", c.Proc.Func.ReferenceByName.String(),
"%s is not a procedure", c.Proc.Func.String(),
))
}

Expand Down
Loading

0 comments on commit 3e488c5

Please sign in to comment.