Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
48024: roachpb: rework replicas.CanMakeProgress() r=andreimatei a=andreimatei

CanMakeProgress was delegating to Raft, but also had a local fast-path.
The fast-path was broken: it was considering a configuration with two
voter-fulls and a voter-incoming, and one of the fulls dead, to be able
to make progress. That was false; the configuration doesn't have quorum
for the outgoing config.

This patch fixes the bug, and also removes the dependency on the Raft
code. The way in which we were using the Raft library was quite
inefficient, and I want to start using this function in the replication
report, which will call it a lot. For purposes of deciding
under-replication, that report already needs to understand the different
replica states instead of delegating everything to Raft. So let's
embrace this responsibility more fully.

Release note: None

48159: sql: use rangefeeds for notifications about changes to table descriptors r=lucyzhang a=ajwerner

This PR comes in several commits:

1) Enable rangefeeds unconditionally on system ranges
  * this is going to need to some modification to work with tenant system spans
2) Add a cluster version for the above functionality
3) Add the basic (messy) implementation of rangefeed based lease manager notifications
4) Add some testing knobs
5) Clean up 3 in a number of ways, add some more testing, fix a synchronization issue.
6) Updates a test to not break a new assertion that values in the descriptor table actually be descriptors

Relates to but does not fully resolve #47150. Another PR will follow-up and disable the gossip of the descriptors table and lift the enforcement of the system config transaction anchor. 


48668: colexec: short-circuit hash join when build side is empty r=yuzefovich a=yuzefovich

Previously, we would always fully consume both sides of the join, even
when the build (right) side is empty. We can, however, short-circuit in
such case for INNER, RIGHT OUTER, and LEFT SEMI joins and skip probing
phase altogether in such scenarios. For example, this helps query 93 of
TPC-DS benchmark with scale factor 1.

Release note: None

48669: colexec: fix performance inefficiency in materializer r=yuzefovich a=yuzefovich

**colexec: fix performance inefficiency in materializer**

We mistakenly were passing `sqlbase.DatumAlloc` by value, and not by
pointer, and as a result we would always be allocating 16 datums but
using only 1 - i.e. we were not only not pooling the allocations, but
actually making a bunch of useless allocations as well.

This inefficiency becomes noticeable when the vectorized query returns
many rows and when we have wrapped processors and those processors get
a lot of input rows - in all cases when we need to materialize a lot.
For example, TPC-H query 16 sees about 10% improvement (it returns 18k
rows) and TPC-DS query 6 sees 2x improvement (it has wrapped hash
aggregator with a decimal column) with this fix.

Release note (performance improvement): A performance inefficiency has
been fixed in the vectorized execution engine which results in speed ups
on all queries when run via the vectorized engine, with most noticeable
gains on the queries that output many rows.

**sqlbase: prohibit copying DatumAlloc by value**

This commit adds `_ util.NoCopy` to `DatumAlloc` struct to prevent us
from misusing it. A few places failed the linter, and those have been
addressed, but there was no performance problems AFAICT due to the
removed copies by value.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed May 12, 2020
5 parents fe17ac9 + b48d26f + 9bf0ceb + 4c860fd + 8d67e14 commit 91b2880
Show file tree
Hide file tree
Showing 43 changed files with 1,081 additions and 303 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>20.1-3</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>20.1-4</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
12 changes: 12 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
VersionStart20_2
VersionGeospatialType
VersionEnums
VersionRangefeedLeases

// Add new versions here (step one of two).
)
Expand Down Expand Up @@ -471,6 +472,7 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 1},
},
{

// VersionGeospatialType enables the use of Geospatial features.
Key: VersionGeospatialType,
Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 2},
Expand All @@ -480,6 +482,16 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionEnums,
Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 3},
},
{

// VersionRangefeedLeases is the enablement of leases uses rangefeeds.
// All nodes with this versions will have rangefeeds enabled on all system
// ranges. Once this version is finalized, gossip is not needed in the
// schema lease subsystem. Nodes which start with this version finalized
// will not pass gossip to the SQL layer.
Key: VersionRangefeedLeases,
Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 4},
},

// Add new versions here (step two of two).

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/versionkey_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 104 additions & 0 deletions pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestRangefeedWorksOnSystemRangesUnconditionally ensures that a rangefeed will
// not return an error when operating on a system span even if the setting is
// disabled. The test also ensures that an error is received if a rangefeed is
// run on a user table.
func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// Make sure the rangefeed setting really is disabled.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = false")
require.NoError(t, err)

db := tc.Server(0).DB()
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)

t.Run("works on system ranges", func(t *testing.T) {
startTS := db.Clock().Now()
descTableKey := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)
descTableSpan := roachpb.Span{
Key: descTableKey,
EndKey: descTableKey.PrefixEnd(),
}

evChan := make(chan *roachpb.RangeFeedEvent)
rangefeedErrChan := make(chan error, 1)
ctxToCancel, cancel := context.WithCancel(ctx)
go func() {
rangefeedErrChan <- ds.RangeFeed(ctxToCancel, descTableSpan, startTS, false /* withDiff */, evChan)
}()

// Note: 42 is a system descriptor.
const junkDescriptorID = 42
require.GreaterOrEqual(t, keys.MaxReservedDescID, junkDescriptorID)
junkDescriptorKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, junkDescriptorID)
junkDescriptor := sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{
Name: "junk",
ID: junkDescriptorID,
})
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
return txn.Put(ctx, junkDescriptorKey, junkDescriptor)
}))
after := db.Clock().Now()
for {
ev := <-evChan
if ev.Checkpoint != nil && after.Less(ev.Checkpoint.ResolvedTS) {
t.Fatal("expected to see write which occurred before the checkpoint")
}

if ev.Val != nil && ev.Val.Key.Equal(junkDescriptorKey) {
var gotProto sqlbase.Descriptor
require.NoError(t, ev.Val.Value.GetProto(&gotProto))
require.EqualValues(t, junkDescriptor, &gotProto)
break
}
}
cancel()
// There are several cases that seems like they can happen due
// to closed connections. Instead we just expect an error.
// The main point is we get an error in a timely manner.
require.Error(t, <-rangefeedErrChan)
})
t.Run("does not work on user ranges", func(t *testing.T) {
k := tc.ScratchRange(t)
require.NoError(t, tc.WaitForSplitAndInitialization(k))
startTS := db.Clock().Now()
scratchSpan := roachpb.Span{Key: k, EndKey: k.PrefixEnd()}
evChan := make(chan *roachpb.RangeFeedEvent)
require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`,
ds.RangeFeed(ctx, scratchSpan, startTS, false /* withDiff */, evChan))
})
}
69 changes: 69 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/caller"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
Expand Down Expand Up @@ -3138,6 +3140,73 @@ func TestStrictGCEnforcement(t *testing.T) {
})
}

// TestProposalOverhead ensures that the command overhead for put operations
// is as expected. It exists to prevent changes which might increase the
// byte overhead of replicating commands.
//
// Note that it intentionally avoids using a system range which incurs the
// overhead due to the logical op log.
func TestProposalOverhead(t *testing.T) {
defer leaktest.AfterTest(t)()

var overhead uint32
var key atomic.Value
key.Store(roachpb.Key{})
filter := func(args storagebase.ProposalFilterArgs) *roachpb.Error {
if len(args.Req.Requests) != 1 {
return nil
}
req, ok := args.Req.GetArg(roachpb.Put)
if !ok {
return nil
}
put := req.(*roachpb.PutRequest)
if !bytes.Equal(put.Key, key.Load().(roachpb.Key)) {
return nil
}
// Sometime the logical portion of the timestamp can be non-zero which makes
// the overhead non-deterministic.
args.Cmd.ReplicatedEvalResult.Timestamp.Logical = 0
atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size()))
// We don't want to print the WriteBatch because it's explicitly
// excluded from the size computation. Nil'ing it out does not
// affect the memory held by the caller because neither `args` nor
// `args.Cmd` are pointers.
args.Cmd.WriteBatch = nil
t.Logf(pretty.Sprint(args.Cmd))
return nil
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{TestingProposalFilter: filter},
},
},
})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
// NB: the expected overhead reflects the space overhead currently
// present in Raft commands. This test will fail if that overhead
// changes. Try to make this number go down and not up. It slightly
// undercounts because our proposal filter is called before
// maxLeaseIndex is filled in. The difference between the user and system
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
const (
expectedUserOverhead uint32 = 42
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
k := roachpb.Key(encoding.EncodeStringAscending(userKey, "foo"))
key.Store(k)
require.NoError(t, db.Put(ctx, k, "v"))
require.Equal(t, expectedUserOverhead, atomic.LoadUint32(&overhead))
})

}

// getRangeInfo retreives range info by performing a get against the provided
// key and setting the ReturnRangeInfo flag to true.
func getRangeInfo(
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -211,10 +212,10 @@ func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *te
txB := db.NewTxn(ctx, "b")

require.NoError(t, txA.SetSystemConfigTrigger())
require.NoError(t, txA.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(1000), "foo"))
require.NoError(t, txA.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(1000), &sqlbase.Descriptor{}))

require.NoError(t, txB.SetSystemConfigTrigger())
require.NoError(t, txB.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(2000), "bar"))
require.NoError(t, txB.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(2000), &sqlbase.Descriptor{}))

const someTime = 10 * time.Millisecond
clearNotifictions := func(ch <-chan struct{}) {
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func (r *Replica) getImpliedGCThresholdRLocked(

// The GC threshold is the oldest value we can return here.
if isAdmin || !StrictGCEnforcement.Get(&r.store.ClusterSettings().SV) ||
r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.UserTableDataMin)) {
r.isSystemRangeRLocked() {
return threshold
}

Expand Down Expand Up @@ -806,6 +806,17 @@ func (r *Replica) getImpliedGCThresholdRLocked(
return threshold
}

// isSystemRange returns true if r's key range precedes keys.UserTableDataMin.
func (r *Replica) isSystemRange() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.isSystemRangeRLocked()
}

func (r *Replica) isSystemRangeRLocked() bool {
return r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.UserTableDataMin))
}

// maxReplicaIDOfAny returns the maximum ReplicaID of any replica, including
// voters and learners.
func maxReplicaIDOfAny(desc *roachpb.RangeDescriptor) roachpb.ReplicaID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (i iteratorWithCloser) Close() {
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
if !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
if !r.isSystemRangeRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -661,17 +663,28 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT)
})
t.Run(roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING.String(), func(t *testing.T) {
mtc, rangeID := setup(t)
mtc, _ := setup(t)
defer mtc.Stop()

// Split the range so that the RHS is not a system range and thus will
// respect the rangefeed_enabled cluster setting.
startKey := keys.UserTableDataMin
splitArgs := adminSplitArgs(startKey)
if _, pErr := kv.SendWrapped(ctx, mtc.distSenders[0], splitArgs); pErr != nil {
t.Fatalf("split saw unexpected error: %v", pErr)
}
rightRangeID := mtc.Store(0).LookupReplica(roachpb.RKey(startKey)).RangeID

// Establish a rangefeed.
stream := newTestStream()
streamErrC := make(chan *roachpb.Error, 1)
rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}

endKey := keys.TableDataMax
rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey}
go func() {
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: rangeID,
RangeID: rightRangeID,
},
Span: rangefeedSpan,
}
Expand All @@ -688,7 +701,8 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
kvserver.RangefeedEnabled.Override(&mtc.storeConfig.Settings.SV, false)

// Perform a write on the range.
pArgs := putArgs(roachpb.Key("c"), []byte("val2"))
writeKey := encoding.EncodeStringAscending(keys.SystemSQLCodec.TablePrefix(55), "c")
pArgs := putArgs(writeKey, []byte("val2"))
if _, pErr := kv.SendWrapped(ctx, mtc.distSenders[0], pArgs); pErr != nil {
t.Fatal(pErr)
}
Expand Down
Loading

0 comments on commit 91b2880

Please sign in to comment.