Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
99275: sql: enabling forward indexes and ORDERBY on JSONB columns r=celiala a=Shivs11

Currently, #97928 outlines the scheme for JSONB encoding
and decoding for forward indexes. However, the PR doesn't
enable this feature to our users. This current PR aims
to allow forward indexes on JSONB columns. The presence
of a lexicographical ordering, as described in #97928,
shall now allow primary and secondary indexes on JSONB
columns along with the ability to use `ORDER BY` filter
in their queries.

Additionally, JSON values consist of decimal numbers
and containers, such as Arrays and Objects, which can
contain these decimal numbers. In order to preserve
the values after the decimal, JSONB columns are now
required to be composite in nature. This shall enable
such values to be stored in both the key and the value
side of a K/V pair in hopes of receiving the exact value.

Fixes: #35706

Release note (sql change): This PR adds support for enabling
forward indexes and ordering on JSON values.

Epic: [CRDB-24501](https://cockroachlabs.atlassian.net/browse/CRDB-24501)

100942: kvserver: add metrics to track snapshot queue size r=kvoli a=miraradeva

Previously, we had metrics to track the number of snapshots waiting in
the snapshot queue; however, snapshots may be of different sizes, so it
is also helpful to track the size of all snapshots in the queue. This
change adds the following metrics to track the total size of all
snapshots waiting in the queue:

    range.snapshots.send-queue-bytes
    range.snapshots.recv-queue-bytes

Informs: #85528
Release note (ops change): Added two new metrics,
range.snapshots.(send|recv)-queue-bytes, to track the total size of all
snapshots waiting in the snapshot queue.

101220: roachtest: prevent shared mutable state across c2c roachtest runs r=benbardin a=msbutler

Previously, all `c2c/*` roachtests run with `--count` would provide incomprehensible results because multiple roachtest runs of the same test would override each other's state. Specifically, the latest call of `test_spec.Run()`, would override the `test.Test` harness, and `syncedCluster.Cluster` used by all other tests with the same registration.

This patch fixes this problem by moving all fields in `replicationSpec` that are set during test execution (i.e. a `test_spec.Run` call), to a new `replicationDriver` struct. Now, `replicationSpec` gets defined during test registration and is shared across test runs, while `replicationDriver` gets set within a test run.

Epic: None
Release note: None

Co-authored-by: Shivam Saraf <[email protected]>
Co-authored-by: Mira Radeva <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
4 people committed Apr 11, 2023
4 parents 5fd45da + 5cdb625 + 9fc0ef6 + c8ddf08 commit 1abff58
Show file tree
Hide file tree
Showing 42 changed files with 1,788 additions and 327 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

467 changes: 243 additions & 224 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotSendQueueSize = metric.Metadata{
Name: "range.snapshots.send-queue-bytes",
Help: "Total size of all snapshots in the snapshot send queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotRecvQueueSize = metric.Metadata{
Name: "range.snapshots.recv-queue-bytes",
Help: "Total size of all snapshots in the snapshot receive queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Expand Down Expand Up @@ -2046,6 +2058,8 @@ type StoreMetrics struct {
RangeSnapshotRecvInProgress *metric.Gauge
RangeSnapshotSendTotalInProgress *metric.Gauge
RangeSnapshotRecvTotalInProgress *metric.Gauge
RangeSnapshotSendQueueSize *metric.Gauge
RangeSnapshotRecvQueueSize *metric.Gauge

// Delegate snapshot metrics. These don't count self-delegated snapshots.
DelegateSnapshotSendBytes *metric.Counter
Expand Down Expand Up @@ -2645,6 +2659,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRecvInProgress: metric.NewGauge(metaRangeSnapshotRecvInProgress),
RangeSnapshotSendTotalInProgress: metric.NewGauge(metaRangeSnapshotSendTotalInProgress),
RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress),
RangeSnapshotSendQueueSize: metric.NewGauge(metaRangeSnapshotSendQueueSize),
RangeSnapshotRecvQueueSize: metric.NewGauge(metaRangeSnapshotRecvQueueSize),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries),
DelegateSnapshotSendBytes: metric.NewCounter(metaDelegateSnapshotSendBytes),
Expand Down
53 changes: 39 additions & 14 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ var snapshotPrioritizationEnabled = settings.RegisterBoolSetting(
true,
)

// snapshotMetrics contains metrics on the number and size of snapshots in
// progress or in the snapshot queue.
type snapshotMetrics struct {
QueueLen *metric.Gauge
QueueSize *metric.Gauge
InProgress *metric.Gauge
TotalInProgress *metric.Gauge
}

// incomingSnapshotStream is the minimal interface on a GRPC stream required
// to receive a snapshot over the network.
type incomingSnapshotStream interface {
Expand Down Expand Up @@ -678,13 +687,19 @@ func (s *Store) reserveReceiveSnapshot(
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot")
defer sp.Finish()

return s.throttleSnapshot(ctx, s.snapshotApplyQueue,
int(header.SenderQueueName), header.SenderQueuePriority,
return s.throttleSnapshot(ctx,
s.snapshotApplyQueue,
int(header.SenderQueueName),
header.SenderQueuePriority,
-1,
header.RangeSize,
header.RaftMessageRequest.RangeID,
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvQueueSize,
s.metrics.RangeSnapshotRecvInProgress,
s.metrics.RangeSnapshotRecvTotalInProgress,
},
)
}

Expand All @@ -698,14 +713,19 @@ func (s *Store) reserveSendSnapshot(
fn()
}

return s.throttleSnapshot(ctx, s.snapshotSendQueue,
return s.throttleSnapshot(ctx,
s.snapshotSendQueue,
int(req.SenderQueueName),
req.SenderQueuePriority,
req.QueueOnDelegateLen,
rangeSize,
req.RangeID,
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendQueueSize,
s.metrics.RangeSnapshotSendInProgress,
s.metrics.RangeSnapshotSendTotalInProgress,
},
)
}

Expand All @@ -720,7 +740,7 @@ func (s *Store) throttleSnapshot(
maxQueueLength int64,
rangeSize int64,
rangeID roachpb.RangeID,
waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge,
snapshotMetrics snapshotMetrics,
) (cleanup func(), funcErr error) {

tBegin := timeutil.Now()
Expand All @@ -742,8 +762,13 @@ func (s *Store) throttleSnapshot(
}
}()

waitingSnapshotMetric.Inc(1)
defer waitingSnapshotMetric.Dec(1)
// Total bytes of snapshots waiting in the snapshot queue
snapshotMetrics.QueueSize.Inc(rangeSize)
defer snapshotMetrics.QueueSize.Dec(rangeSize)
// Total number of snapshots waiting in the snapshot queue
snapshotMetrics.QueueLen.Inc(1)
defer snapshotMetrics.QueueLen.Dec(1)

queueCtx := ctx
if deadline, ok := queueCtx.Deadline(); ok {
// Enforce a more strict timeout for acquiring the snapshot reservation to
Expand Down Expand Up @@ -778,10 +803,10 @@ func (s *Store) throttleSnapshot(
}

// Counts non-empty in-progress snapshots.
inProgressSnapshotMetric.Inc(1)
snapshotMetrics.InProgress.Inc(1)
}
// Counts all in-progress snapshots.
totalInProgressSnapshotMetric.Inc(1)
snapshotMetrics.TotalInProgress.Inc(1)

// The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the
// Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot.
Expand All @@ -804,10 +829,10 @@ func (s *Store) throttleSnapshot(
return func() {
s.metrics.ReservedReplicaCount.Dec(1)
s.metrics.Reserved.Dec(rangeSize)
totalInProgressSnapshotMetric.Dec(1)
snapshotMetrics.TotalInProgress.Dec(1)

if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
inProgressSnapshotMetric.Dec(1)
snapshotMetrics.InProgress.Dec(1)
snapshotQueue.Release(permit)
}
}, nil
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3157,7 +3157,7 @@ func TestReserveSnapshotThrottling(t *testing.T) {
s := tc.store

cleanupNonEmpty1, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
Expand All @@ -3167,6 +3167,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3184,6 +3186,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(2), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3205,6 +3209,10 @@ func TestReserveSnapshotThrottling(t *testing.T) {
t.Errorf("unexpected snapshot queue length; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueLength.Value())
}
if s.Metrics().RangeSnapshotRecvQueueSize.Value() != int64(10) {
t.Errorf("unexplected snapshot queue size; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueSize.Value())
}
if s.Metrics().RangeSnapshotRecvInProgress.Value() != int64(1) {
t.Errorf("unexpected snapshots in progress; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvInProgress.Value())
Expand All @@ -3216,14 +3224,16 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}()

cleanupNonEmpty3, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
}
atomic.StoreInt32(&boom, 1)
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/catalog/colinfo/col_type_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func ColumnTypeIsIndexable(t *types.T) bool {
// using an inverted index.
func ColumnTypeIsInvertedIndexable(t *types.T) bool {
switch t.Family() {
case types.ArrayFamily, types.StringFamily:
case types.JsonFamily, types.ArrayFamily, types.StringFamily:
return true
}
return ColumnTypeIsOnlyInvertedIndexable(t)
Expand All @@ -162,7 +162,6 @@ func ColumnTypeIsOnlyInvertedIndexable(t *types.T) bool {
t = t.ArrayContents()
}
switch t.Family() {
case types.JsonFamily:
case types.GeographyFamily:
case types.GeometryFamily:
case types.TSVectorFamily:
Expand All @@ -183,7 +182,7 @@ func MustBeValueEncoded(semanticType *types.T) bool {
default:
return MustBeValueEncoded(semanticType.ArrayContents())
}
case types.JsonFamily, types.TupleFamily, types.GeographyFamily, types.GeometryFamily:
case types.TupleFamily, types.GeographyFamily, types.GeometryFamily:
return true
case types.TSVectorFamily, types.TSQueryFamily:
return true
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/colinfo/column_type_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func CanHaveCompositeKeyEncoding(typ *types.T) bool {
switch typ.Family() {
case types.FloatFamily,
types.DecimalFamily,
types.JsonFamily,
types.CollatedStringFamily:
return true
case types.ArrayFamily:
Expand All @@ -75,7 +76,6 @@ func CanHaveCompositeKeyEncoding(typ *types.T) bool {
types.UuidFamily,
types.INetFamily,
types.TimeFamily,
types.JsonFamily,
types.TimeTZFamily,
types.BitFamily,
types.GeometryFamily,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/colinfo/column_type_properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestCanHaveCompositeKeyEncoding(t *testing.T) {
{types.IntArray, false},
{types.Interval, false},
{types.IntervalArray, false},
{types.Jsonb, false},
{types.Jsonb, true},
{types.Name, false},
{types.Oid, false},
{types.String, false},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/table_col_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (s TableColSet) ForEach(f func(col descpb.ColumnID)) {
s.set.ForEach(func(i int) { f(descpb.ColumnID(i)) })
}

// Copy returns a copy of s which can be modified independently.
func (s TableColSet) Copy() TableColSet { return TableColSet{set: s.set.Copy()} }

// SubsetOf returns true if s is a subset of other.
func (s TableColSet) SubsetOf(other TableColSet) bool {
return s.set.SubsetOf(other.set)
Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,14 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er
colIDs = idx.CollectKeyColumnIDs()
}

// Inverted indexes don't store composite values in the individual
// paths present. The composite values will be encoded in
// the primary index itself.
compositeColIDsLocal := compositeColIDs.Copy()
if isInverted {
compositeColIDsLocal.Remove(invID)
}

// StoreColumnIDs are derived from StoreColumnNames just like KeyColumnIDs
// derives from KeyColumnNames.
// For primary indexes this set of columns is typically defined as the set
Expand Down Expand Up @@ -755,12 +763,12 @@ func (desc *Mutable) allocateIndexIDs(columnNames map[string]descpb.ColumnID) er
// or in the primary key whose type has a composite encoding, like DECIMAL
// for instance.
for _, colID := range idx.IndexDesc().KeyColumnIDs {
if compositeColIDs.Contains(colID) {
if compositeColIDsLocal.Contains(colID) {
idx.IndexDesc().CompositeColumnIDs = append(idx.IndexDesc().CompositeColumnIDs, colID)
}
}
for _, colID := range idx.IndexDesc().KeySuffixColumnIDs {
if compositeColIDs.Contains(colID) {
if compositeColIDsLocal.Contains(colID) {
idx.IndexDesc().CompositeColumnIDs = append(idx.IndexDesc().CompositeColumnIDs, colID)
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colenc/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,9 @@ func isComposite(vec coldata.Vec, row int) bool {
case types.DecimalFamily:
d := tree.DDecimal{Decimal: vec.Decimal()[row]}
return d.IsComposite()
case types.JsonFamily:
j := tree.DJSON{JSON: vec.JSON().Get(row)}
return j.IsComposite()
default:
d := vec.Datum().Get(row)
if cdatum, ok := d.(tree.CompositeDatum); ok {
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/colenc/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ func encodeKeys[T []byte | roachpb.Key](
}
kys[r] = b
}
case types.JsonFamily:
jsonVector := vec.JSON()
for r := 0; r < count; r++ {
b := kys[r]
if partialIndexAndNullCheck(kys, r, start, nulls, dir) {
continue
}
var err error
jsonObj := jsonVector.Get(r + start)
b, err = jsonObj.EncodeForwardIndex(b, dir)
if err != nil {
return err
}
kys[r] = b
}
default:
if buildutil.CrdbTestBuild {
if typeconv.TypeFamilyToCanonicalTypeFamily(typ.Family()) != typeconv.DatumVecCanonicalTypeFamily {
Expand Down
21 changes: 15 additions & 6 deletions pkg/sql/colencoding/key_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,21 @@ func decodeTableKeyToCol(
}
vecs.IntervalCols[colIdx][rowIdx] = d
case types.JsonFamily:
// Don't attempt to decode the JSON value. Instead, just return the
// remaining bytes of the key.
var jsonLen int
jsonLen, err = encoding.PeekLength(key)
vecs.JSONCols[colIdx].Bytes.Set(rowIdx, key[:jsonLen])
rkey = key[jsonLen:]
// Decode the JSON, and then store the bytes in the
// vector in the value-encoded format.
// TODO (shivam): Make it possible for the vector to store
// key-encoded JSONs instead of value-encoded JSONs.
var d tree.Datum
encDir := encoding.Ascending
if dir == catenumpb.IndexColumn_DESC {
encDir = encoding.Descending
}
d, rkey, err = keyside.Decode(da, valType, key, encDir)
json, ok := d.(*tree.DJSON)
if !ok {
return nil, false, scratch, errors.AssertionFailedf("Could not type assert into DJSON")
}
vecs.JSONCols[colIdx].Set(rowIdx, json.JSON)
case types.EncodedKeyFamily:
// Don't attempt to decode the inverted key.
keyLen, err := encoding.PeekLength(key)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/distsql_stats
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ ORDER BY
statistics_name column_names row_count null_count has_histogram
s {a} 3 0 true
s {b} 3 0 true
s {j} 3 0 false
s {j} 3 0 true
s {rowid} 3 0 true

# Test that non-index columns have histograms collected for them, with
Expand Down Expand Up @@ -2348,7 +2348,7 @@ SHOW STATISTICS USING JSON FOR TABLE j;
statement ok
ALTER TABLE j INJECT STATISTICS '$j_stats'

statement error pq: cannot create partial statistics on an inverted index column
statement error pq: table j does not contain a non-partial forward index with j as a prefix column
CREATE STATISTICS j_partial ON j FROM j USING EXTREMES;

statement ok
Expand Down
Loading

0 comments on commit 1abff58

Please sign in to comment.