Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
64253: builtins: implement ST_Envelope(box2d) r=sumeerbhola a=otan

Release note (sql change): Implement ST_Envelope for Box2D.

64265: kvserver: speed up intent resolution for aborted txns r=sumeerbhola a=erikgrinaker

### kvserver: speed up intent resolution for aborted txns

Cleaning up intents for aborted txns during `EndTxn` could be very slow
for intents that had already been removed by a concurrent process, due
to suboptimal iterator reuse.

This patch changes this intent resolution path to instead create a new
iterator with `Prefix:true` for each intent rather than seeking a
reusable iterator. This reduces intent resolution time by two orders of
magnitude (~200s → ~1s in tests). A simple performance regression test
has been added for this.

Resolves #64092.

Release note (performance improvement): improved intent cleanup
performance for aborted transactions.

### intentresolver: reduce ranged resolution batch size

This patch adds the constants `intentResolverRangeBatchSize` and
`intentResolverRangeRequestSize` to control the number of requests and
number of intents per request for ranged intent resolution. It also
reduces the number of range requests per batch from 100 to 10, since
ranged requests can fan out to hit 200 intents each (via range scans)
which is significantly more expensive than single-intent requests.

Release note: None

/cc @cockroachdb/kv 

64324: kvserver: synchronize replica removal with read-only requests r=tbg,nvanbenschoten a=erikgrinaker

Replica removal did not synchronize with in-flight read-only requests,
which could cause them to be evaluated on a removed (empty) replica,
returning an empty result.

This patch fixes the problem by locking `Replica.readOnlyCmdMu` during
replica removal, thus either waiting for read-only requests to complete
or not evaluating them.

Resolves #64325.

Release note (bug fix): Fixed a race condition where read-only requests
during replica removal (e.g. during range merges or rebalancing) could
be evaluated on the removed replica, returning an empty result.

/cc @cockroachdb/kv 

We lock `readOnlyCmdMu` during `removeUninitializedReplicaRaftMuLocked` and `tryGetOrCreateReplica` as well, even though I don't believe reads can be routed to these replicas, following review comments.

64342: opt: remove non-equivalent group columns in OrderingChoice.Simplify r=mgartner a=mgartner

Previously, `OrderingChoice.Simplify` would add but never remove columns
from ordering column groups based on equivalency in an FD. In rare
cases, this could cause the optimizer to generate expressions which
violated an invariant that all columns in an ordering column group are
equivalent according to the expression's FD.

Violation of this invariant only panics in test builds, and in the test
cases found that trigger this panic, there is likely no correctness
issues with the expression. Therefore, there was probably no impact in
any release builds.

This commit updates `OrderingChoice.Simplify` so that non-equivalent
columns in an ordering column group are removed from the group,
satisfying the invariant.

Fixes #63794

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
4 people committed Apr 28, 2021
5 parents c834a34 + fca11ec + a4b7a43 + 3357d2e + ecfff8c commit 9190550
Show file tree
Hide file tree
Showing 21 changed files with 369 additions and 189 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,8 @@ from the given Geometry.</p>
</span></td></tr>
<tr><td><a name="st_endpoint"></a><code>st_endpoint(geometry: geometry) &rarr; geometry</code></td><td><span class="funcdesc"><p>Returns the last point of a geometry which has shape LineString. Returns NULL if the geometry is not a LineString.</p>
</span></td></tr>
<tr><td><a name="st_envelope"></a><code>st_envelope(box2d: box2d) &rarr; geometry</code></td><td><span class="funcdesc"><p>Returns a bounding geometry for the given box.</p>
</span></td></tr>
<tr><td><a name="st_envelope"></a><code>st_envelope(geometry: geometry) &rarr; geometry</code></td><td><span class="funcdesc"><p>Returns a bounding envelope for the given geometry.</p>
<p>For geometries which have a POINT or LINESTRING bounding box (i.e. is a single point
or a horizontal or vertical line), a POINT or LINESTRING is returned. Otherwise, the
Expand Down
20 changes: 5 additions & 15 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,21 +510,18 @@ func TestSpanSetNonMVCCBatch(t *testing.T) {
}
}

// TestSpanSetMVCCResolveWriteIntentRangeUsingIter verifies that
// MVCCResolveWriteIntentRangeUsingIter does not stray outside of the passed-in
// TestSpanSetMVCCResolveWriteIntentRange verifies that
// MVCCResolveWriteIntentRange does not stray outside of the passed-in
// key range (which it only used to do in this corner case tested here).
//
// See #20894.
func TestSpanSetMVCCResolveWriteIntentRangeUsingIter(t *testing.T) {
func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

ctx := context.Background()

value := roachpb.MakeValueFromString("irrelevant")

if err := storage.MVCCPut(
ctx,
eng,
Expand All @@ -536,24 +533,17 @@ func TestSpanSetMVCCResolveWriteIntentRangeUsingIter(t *testing.T) {
); err != nil {
t.Fatal(err)
}

var ss spanset.SpanSet
ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")})

batch := spanset.NewBatch(eng.NewBatch(), &ss)
defer batch.Close()

intent := roachpb.LockUpdate{
Span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")},
Txn: enginepb.TxnMeta{}, // unused
Status: roachpb.PENDING,
}

iterAndBuf := storage.GetIterAndBuf(batch, storage.IterOptions{UpperBound: intent.Span.EndKey})
defer iterAndBuf.Cleanup()

if _, _, err := storage.MVCCResolveWriteIntentRangeUsingIter(
ctx, batch, iterAndBuf, nil /* ms */, intent, 0,
if _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, 0,
); err != nil {
t.Fatal(err)
}
Expand Down
73 changes: 11 additions & 62 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,56 +424,6 @@ func IsEndTxnTriggeringRetryError(

const lockResolutionBatchSize = 500

// iterManager provides a storage.IterAndBuf appropriate for working with a
// span of keys that are either all local or all global keys, identified by
// the start key of the span, that is passed to getIterAndBuf. This is to deal
// with the constraint that a single MVCCIterator using
// MVCCKeyAndIntentsIterKind can either iterate over local keys or global
// keys, but not both. We don't wish to create a new iterator for each span,
// so iterManager lazily creates a new one when needed.
type iterManager struct {
reader storage.Reader
globalKeyUpperBound roachpb.Key
iterAndBuf storage.IterAndBuf

iter storage.MVCCIterator
isLocalIter bool
}

func (im *iterManager) getIterAndBuf(key roachpb.Key) storage.IterAndBuf {
isLocal := keys.IsLocal(key)
if im.iter != nil {
if im.isLocalIter == isLocal {
return im.iterAndBuf
}
im.iterAndBuf.SwitchIter(nil /* iter */)
im.iter.Close()
im.iter = nil
}
if isLocal {
im.iter = im.reader.NewMVCCIterator(
storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: keys.LocalMax,
})
im.isLocalIter = true
im.iterAndBuf.SwitchIter(im.iter)
} else {
im.iter = im.reader.NewMVCCIterator(
storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: im.globalKeyUpperBound,
})
im.isLocalIter = false
im.iterAndBuf.SwitchIter(im.iter)
}
return im.iterAndBuf
}

func (im *iterManager) Close() {
im.iterAndBuf.Cleanup()
im.iterAndBuf = storage.IterAndBuf{}
im.iter = nil
}

// resolveLocalLocks synchronously resolves any locks that are local to this
// range in the same batch and returns those lock spans. The remainder are
// collected and returned so that they can be handed off to asynchronous
Expand All @@ -497,13 +447,6 @@ func resolveLocalLocks(
desc = &mergeTrigger.LeftDesc
}

iterManager := &iterManager{
reader: readWriter,
globalKeyUpperBound: desc.EndKey.AsRawKey(),
iterAndBuf: storage.GetBufUsingIter(nil),
}
defer iterManager.Close()

var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
Expand All @@ -524,9 +467,15 @@ func resolveLocalLocks(
externalLocks = append(externalLocks, span)
return nil
}
resolveMS := ms
ok, err := storage.MVCCResolveWriteIntentUsingIter(
ctx, readWriter, iterManager.getIterAndBuf(span.Key), resolveMS, update)
// It may be tempting to reuse an iterator here, but this call
// can create the iterator with Prefix:true which is much faster
// than seeking -- especially for intents that are missing, e.g.
// due to async intent resolution. See:
// https://github.com/cockroachdb/cockroach/issues/64092
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
if err != nil {
return err
}
Expand All @@ -543,8 +492,8 @@ func resolveLocalLocks(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
num, resumeSpan, err := storage.MVCCResolveWriteIntentRangeUsingIter(
ctx, readWriter, iterManager.getIterAndBuf(update.Span.Key), ms, update, resolveAllowance)
num, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, resolveAllowance)
if err != nil {
return err
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ func ResolveIntentRange(

update := args.AsLockUpdate()

iterAndBuf := storage.GetIterAndBuf(readWriter, storage.IterOptions{UpperBound: args.EndKey})
defer iterAndBuf.Cleanup()

numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRangeUsingIter(
ctx, readWriter, iterAndBuf, ms, update, h.MaxSpanRequestKeys,
)
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
if err != nil {
return result.Result{}, err
}
Expand Down
110 changes: 110 additions & 0 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -364,3 +369,108 @@ func TestAdminRelocateRangeRandom(t *testing.T) {
relocateAndCheck(t, tc, k, tc.Targets(voters...), tc.Targets(nonVoters...))
}
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/64325
// which makes sure an in-flight read operation during replica removal won't
// return empty results.
func TestReplicaRemovalDuringRequestEvaluation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

type magicKey struct{}

// delayReadC is used to synchronize the in-flight read request with the main
// test goroutine. It is read from twice:
//
// 1. The first read allows the test to block until the request eval filter
// is called, i.e. when the read request is ready.
// 2. The second read allows the test to close the channel to unblock
// the eval filter, causing the read request to be evaluated.
delayReadC := make(chan struct{})
evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if args.Ctx.Value(magicKey{}) != nil {
<-delayReadC
<-delayReadC
}
return nil
}

ctx := context.Background()
manual := hlc.NewHybridManualClock()
args := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: evalFilter,
},
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
},
},
}
tc := testcluster.StartTestCluster(t, 2, args)
defer tc.Stopper().Stop(ctx)

// Create range and upreplicate.
key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Perform write.
pArgs := putArgs(key, []byte("foo"))
_, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), pArgs)
require.Nil(t, pErr)

// Perform read on write and wait for read to block.
type reply struct {
resp roachpb.Response
err *roachpb.Error
}
readResC := make(chan reply)
err := tc.Stopper().RunAsyncTask(ctx, "get", func(ctx context.Context) {
readCtx := context.WithValue(ctx, magicKey{}, struct{}{})
gArgs := getArgs(key)
resp, pErr := kv.SendWrapped(readCtx, tc.Servers[0].DistSender(), gArgs)
readResC <- reply{resp, pErr}
})
require.NoError(t, err)
delayReadC <- struct{}{}

// Transfer leaseholder to other store.
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID)
require.NoError(t, err)
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Remove first store from raft group.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))

// This is a bit iffy. We want to make sure that, in the buggy case, we
// will typically fail (i.e. the read returns empty because the replica was
// removed). However, in the non-buggy case the in-flight read request will
// be holding readOnlyCmdMu until evaluated, blocking the replica removal,
// so waiting for replica removal would deadlock. We therefore take the
// easy way out by starting an async replica GC and sleeping for a bit.
err = tc.Stopper().RunAsyncTask(ctx, "replicaGC", func(ctx context.Context) {
assert.NoError(t, tc.GetFirstStoreFromServer(t, 0).ManualReplicaGC(repl))
})
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

// Allow read to resume. Should return "foo".
close(delayReadC)
r := <-readResC
require.Nil(t, r.err)
require.NotNil(t, r.resp)
require.NotNil(t, r.resp.(*roachpb.GetResponse).Value)
val, err := r.resp.(*roachpb.GetResponse).Value.GetBytes()
require.NoError(t, err)
require.Equal(t, []byte("foo"), val)
}
38 changes: 38 additions & 0 deletions pkg/kv/kvserver/intent_resolver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@ package kvserver

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func beginTransaction(
Expand Down Expand Up @@ -164,3 +170,35 @@ func TestContendedIntentWithDependencyCycle(t *testing.T) {
t.Fatal(err)
}
}

// Regression test for https://github.com/cockroachdb/cockroach/issues/64092
// which makes sure that synchronous ranged intent resolution during rollback
// completes in a reasonable time.
func TestRollbackSyncRangedIntentResolution(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t)
skip.UnderStress(t)

ctx := context.Background()
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
DisableLoadBasedSplitting: true,
IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{
ForceSyncIntentResolution: true,
},
},
},
})
defer srv.Stopper().Stop(ctx)

txn := srv.DB().NewTxn(ctx, "test")
for i := 0; i < 100000; i++ {
require.NoError(t, txn.Put(ctx, []byte(fmt.Sprintf("key%v", i)), []byte("value")))
}
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
require.NoError(t, txn.Rollback(ctx))
require.NoError(t, ctx.Err())
}
Loading

0 comments on commit 9190550

Please sign in to comment.