Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109727: ts: update server queries to account for system tenant id r=Santamaura a=Santamaura

Previously, ts queries would consider providing no tenant id and the system tenant id as the same and would return all the aggregated datapoints. This was likely due to the original implementation considering that the system tenant would always want to view all the aggregated data.

This is not the case anymore since the system tenant has the ability to view all the data, system tenant specific data or other tenants data. Therefore this commit adjusts the server query code so that if a system tenant id is provided, it returns data for only the system tenant.

Fixes #108929

Release note (bug fix): adjust ts server queries
to be able to return system tenant only metrics if tenant id is provided, this will fix an issue where some metrics graphs appear to double count.

Some screenshots after the change:
All
<img width="1422" alt="Screenshot 2023-08-30 at 10 59 50 AM" src="https://github.com/cockroachdb/cockroach/assets/17861665/2ddfb7b8-1980-4b88-9b92-ec2cba5e48f0">

System
<img width="1422" alt="Screenshot 2023-08-30 at 11 00 25 AM" src="https://github.com/cockroachdb/cockroach/assets/17861665/5e7b18d7-4b9d-48dd-881c-4417bab104b1">

Tenant
<img width="1422" alt="Screenshot 2023-08-30 at 11 00 13 AM" src="https://github.com/cockroachdb/cockroach/assets/17861665/b02a6683-5277-4fa7-a212-2999db935fd4">



109793: storage: don't reread value during inline conditional writes r=itsbilal a=nvanbenschoten

This commit removes the call to maybeGetValue when performing an inline conditional write. In such cases, we will have already read the value in the call to mvccGetMetadata, so we don't need to do so again.

I'm not aware of any workloads that are sensitive to the performance of conditional inline writes and I also suspect that the positioning of the Pebble iterator was avoiding some work during the second seek, so this is mainly just intended to be a code simplification.

Epic: None
Release note: None

Co-authored-by: Alex Santamaura <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Aug 31, 2023
3 parents c86e1a1 + 738d312 + 2ce342e commit d435960
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 48 deletions.
65 changes: 26 additions & 39 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,37 +1653,6 @@ func mvccPutUsingIter(
return err
}

// maybeGetValue returns either value (if valueFn is nil) or else the
// result of calling valueFn on the data read at readTimestamp. The
// function uses a non-transactional read, so uncertainty does not apply
// and any intents (even the caller's own if the caller is operating on
// behalf of a transaction), will result in a LockConflictError. Because
// of this, the function is only called from places where intents have
// already been considered.
func maybeGetValue(
ctx context.Context,
iter MVCCIterator,
key roachpb.Key,
value roachpb.Value,
exists bool,
readTimestamp hlc.Timestamp,
valueFn func(optionalValue) (roachpb.Value, error),
) (roachpb.Value, error) {
// If a valueFn is specified, read existing value using the iter.
if valueFn == nil {
return value, nil
}
var exVal optionalValue
if exists {
var err error
exVal, _, err = mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{Tombstones: true})
if err != nil {
return roachpb.Value{}, err
}
}
return valueFn(exVal)
}

// MVCCScanDecodeKeyValue decodes a key/value pair returned in an MVCCScan
// "batch" (this is not the RocksDB batch repr format), returning both the
// key/value and the suffix of data remaining in the batch.
Expand Down Expand Up @@ -1900,10 +1869,16 @@ func mvccPutInternal(
if opts.Txn != nil {
return false, errors.Errorf("%q: inline writes not allowed within transactions", metaKey)
}
var metaKeySize, metaValSize int64
if value, err = maybeGetValue(ctx, iter, key, value, ok, timestamp, valueFn); err != nil {
return false, err
if valueFn != nil {
var inlineVal optionalValue
if ok {
inlineVal = makeOptionalValue(roachpb.Value{RawBytes: buf.meta.RawBytes})
}
if value, err = valueFn(inlineVal); err != nil {
return false, err
}
}
var metaKeySize, metaValSize int64
if !value.IsPresent() {
metaKeySize, metaValSize, err = 0, 0, writer.ClearUnversioned(metaKey.Key, ClearOptions{
// NB: origMetaValSize is only populated by mvccGetMetadata if
Expand All @@ -1912,8 +1887,8 @@ func mvccPutInternal(
ValueSize: uint32(origMetaValSize),
})
} else {
buf.meta = enginepb.MVCCMetadata{RawBytes: value.RawBytes}
metaKeySize, metaValSize, err = buf.putInlineMeta(writer, metaKey, &buf.meta)
buf.newMeta = enginepb.MVCCMetadata{RawBytes: value.RawBytes}
metaKeySize, metaValSize, err = buf.putInlineMeta(writer, metaKey, &buf.newMeta)
}
if opts.Stats != nil {
updateStatsForInline(opts.Stats, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize)
Expand Down Expand Up @@ -2187,9 +2162,21 @@ func mvccPutInternal(
writeTimestamp.Forward(metaTimestamp.Next())
writeTooOldErr := kvpb.NewWriteTooOldError(readTimestamp, writeTimestamp, key)
return false, writeTooOldErr
} else {
if value, err = maybeGetValue(ctx, iter, key, value, ok, readTimestamp, valueFn); err != nil {
return false, err
} else /* meta.Txn == nil && metaTimestamp.Less(readTimestamp) */ {
// If a valueFn is specified, read the existing value using iter.
if valueFn != nil {
exVal, _, err := mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{
Tombstones: true,
// Unlike above, we know that there are no intents on this
// key, so we don't need to perform an inconsistent read.
})
if err != nil {
return false, err
}
value, err = valueFn(exVal)
if err != nil {
return false, err
}
}
}
} else {
Expand Down
80 changes: 80 additions & 0 deletions pkg/storage/testdata/mvcc_histories/inline
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
## A simple test of inline operations.

run ok
put k=i1 v=inline1
put k=i2 v=inline2
put k=i3 v=inline3
----
>> at end:
meta: "i1"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline1 mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false

run ok
get k=i1
get k=i2
get k=i3
----
get: "i1" -> /BYTES/inline1 @0,0
get: "i2" -> /BYTES/inline2 @0,0
get: "i3" -> /BYTES/inline3 @0,0

run ok
scan k=i1 end=i4
----
scan: "i1" -> /BYTES/inline1 @0,0
scan: "i2" -> /BYTES/inline2 @0,0
scan: "i3" -> /BYTES/inline3 @0,0

run ok
del k=i1
----
del: "i1": found key true
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false

run error
cput k=i2 v=inline2b cond=incorrect
----
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false
error: (*kvpb.ConditionFailedError:) unexpected value: raw_bytes:"\000\000\000\000\003inline2" timestamp:<>

run ok
cput k=i2 v=inline2b cond=inline2
----
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false

run error
initput k=i3 v=inline3b
----
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false
error: (*kvpb.ConditionFailedError:) unexpected value: raw_bytes:"\000\000\000\000\003inline3" timestamp:<>

run ok
initput k=i3 v=inline3
----
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false

run error
increment k=i3
----
>> at end:
meta: "i2"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs=<nil> txnDidNotUpdateMeta=false
meta: "i3"/0,0 -> txn={<nil>} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs=<nil> txnDidNotUpdateMeta=false
error: (*withstack.withStack:) key "i3" does not contain an integer value

run ok
del_range k=i1 end=i4
----
del_range: "i1"-"i4" -> deleted 2 key(s)
>> at end:
<no data>
16 changes: 10 additions & 6 deletions pkg/ts/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,10 +845,14 @@ func (db *DB) readFromDatabase(
kd := diskResolution.SlabDuration()
for currentTimestamp := startTimestamp; currentTimestamp <= timespan.EndNanos; currentTimestamp += kd {
for _, source := range sources {
// If a TenantID is specified and is not the system tenant, only query
// data for that tenant source.
if tenantID.IsSet() && !tenantID.IsSystem() {
source = tsutil.MakeTenantSource(source, tenantID.String())
// If a TenantID is specified we may need to format the source in order to retrieve the correct data.
// e.g. if not system tenant we need the source to be of format nodeID-tenantID but if it is the
// system tenant we need the source to be of format nodeID. Otherwise we get all the data via the
// format nodeID-
if tenantID.IsSet() {
if !tenantID.IsSystem() {
source = tsutil.MakeTenantSource(source, tenantID.String())
}
key := MakeDataKey(seriesName, source, diskResolution, currentTimestamp)
b.Get(key)
} else {
Expand Down Expand Up @@ -905,7 +909,7 @@ func (db *DB) readAllSourcesFromDatabase(
return nil, err
}

if !tenantID.IsSet() || tenantID.IsSystem() {
if !tenantID.IsSet() {
return b.Results[0].Rows, nil
}

Expand All @@ -917,7 +921,7 @@ func (db *DB) readAllSourcesFromDatabase(
return nil, err
}
_, tenantSource := tsutil.DecodeSource(source)
if tenantSource == tenantID.String() {
if tenantID.IsSystem() && tenantSource == "" || tenantSource == tenantID.String() {
rows = append(rows, row)
}
}
Expand Down
72 changes: 69 additions & 3 deletions pkg/ts/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ func TestServerQueryTenant(t *testing.T) {
t.Fatal(err)
}

// System tenant should aggregate across all tenants.
expectedSystemResult := &tspb.TimeSeriesQueryResponse{
// Undefined tenant ID should aggregate across all tenants.
expectedAggregatedResult := &tspb.TimeSeriesQueryResponse{
Results: []tspb.TimeSeriesQueryResponse_Result{
{
Query: tspb.Query{
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestServerQueryTenant(t *testing.T) {

conn := s.RPCClientConn(t, username.RootUserName())
client := tspb.NewTimeSeriesClient(conn)
systemResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{
aggregatedResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{
StartNanos: 400 * 1e9,
EndNanos: 500 * 1e9,
Queries: []tspb.Query{
Expand All @@ -417,13 +417,78 @@ func TestServerQueryTenant(t *testing.T) {
Sources: []string{"1"},
},
{
// Not providing a source (nodeID or storeID) will aggregate across all sources.
Name: "test.metric",
},
},
})
if err != nil {
t.Fatal(err)
}
for _, r := range aggregatedResponse.Results {
sort.Strings(r.Sources)
}
require.Equal(t, expectedAggregatedResult, aggregatedResponse)

// System tenant ID should provide system tenant ts data.
systemID := roachpb.MustMakeTenantID(1)
expectedSystemResult := &tspb.TimeSeriesQueryResponse{
Results: []tspb.TimeSeriesQueryResponse_Result{
{
Query: tspb.Query{
Name: "test.metric",
Sources: []string{"1"},
TenantID: systemID,
},
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: 400 * 1e9,
Value: 100.0,
},
{
TimestampNanos: 500 * 1e9,
Value: 200.0,
},
},
},
{
Query: tspb.Query{
Name: "test.metric",
Sources: []string{"1", "10"},
TenantID: systemID,
},
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: 400 * 1e9,
Value: 300.0,
},
{
TimestampNanos: 500 * 1e9,
Value: 600.0,
},
},
},
},
}

systemResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{
StartNanos: 400 * 1e9,
EndNanos: 500 * 1e9,
Queries: []tspb.Query{
{
Name: "test.metric",
Sources: []string{"1"},
TenantID: systemID,
},
{
Name: "test.metric",
TenantID: systemID,
},
},
})
if err != nil {
t.Fatal(err)
}
for _, r := range systemResponse.Results {
sort.Strings(r.Sources)
}
Expand Down Expand Up @@ -489,6 +554,7 @@ func TestServerQueryTenant(t *testing.T) {
Sources: []string{"1"},
},
{
// Not providing a source (nodeID or storeID) will aggregate across all sources.
Name: "test.metric",
},
},
Expand Down

0 comments on commit d435960

Please sign in to comment.