diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index e2f353f2d20e..1722662e630d 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -1653,37 +1653,6 @@ func mvccPutUsingIter( return err } -// maybeGetValue returns either value (if valueFn is nil) or else the -// result of calling valueFn on the data read at readTimestamp. The -// function uses a non-transactional read, so uncertainty does not apply -// and any intents (even the caller's own if the caller is operating on -// behalf of a transaction), will result in a LockConflictError. Because -// of this, the function is only called from places where intents have -// already been considered. -func maybeGetValue( - ctx context.Context, - iter MVCCIterator, - key roachpb.Key, - value roachpb.Value, - exists bool, - readTimestamp hlc.Timestamp, - valueFn func(optionalValue) (roachpb.Value, error), -) (roachpb.Value, error) { - // If a valueFn is specified, read existing value using the iter. - if valueFn == nil { - return value, nil - } - var exVal optionalValue - if exists { - var err error - exVal, _, err = mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{Tombstones: true}) - if err != nil { - return roachpb.Value{}, err - } - } - return valueFn(exVal) -} - // MVCCScanDecodeKeyValue decodes a key/value pair returned in an MVCCScan // "batch" (this is not the RocksDB batch repr format), returning both the // key/value and the suffix of data remaining in the batch. @@ -1900,10 +1869,16 @@ func mvccPutInternal( if opts.Txn != nil { return false, errors.Errorf("%q: inline writes not allowed within transactions", metaKey) } - var metaKeySize, metaValSize int64 - if value, err = maybeGetValue(ctx, iter, key, value, ok, timestamp, valueFn); err != nil { - return false, err + if valueFn != nil { + var inlineVal optionalValue + if ok { + inlineVal = makeOptionalValue(roachpb.Value{RawBytes: buf.meta.RawBytes}) + } + if value, err = valueFn(inlineVal); err != nil { + return false, err + } } + var metaKeySize, metaValSize int64 if !value.IsPresent() { metaKeySize, metaValSize, err = 0, 0, writer.ClearUnversioned(metaKey.Key, ClearOptions{ // NB: origMetaValSize is only populated by mvccGetMetadata if @@ -1912,8 +1887,8 @@ func mvccPutInternal( ValueSize: uint32(origMetaValSize), }) } else { - buf.meta = enginepb.MVCCMetadata{RawBytes: value.RawBytes} - metaKeySize, metaValSize, err = buf.putInlineMeta(writer, metaKey, &buf.meta) + buf.newMeta = enginepb.MVCCMetadata{RawBytes: value.RawBytes} + metaKeySize, metaValSize, err = buf.putInlineMeta(writer, metaKey, &buf.newMeta) } if opts.Stats != nil { updateStatsForInline(opts.Stats, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize) @@ -2187,9 +2162,21 @@ func mvccPutInternal( writeTimestamp.Forward(metaTimestamp.Next()) writeTooOldErr := kvpb.NewWriteTooOldError(readTimestamp, writeTimestamp, key) return false, writeTooOldErr - } else { - if value, err = maybeGetValue(ctx, iter, key, value, ok, readTimestamp, valueFn); err != nil { - return false, err + } else /* meta.Txn == nil && metaTimestamp.Less(readTimestamp) */ { + // If a valueFn is specified, read the existing value using iter. + if valueFn != nil { + exVal, _, err := mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{ + Tombstones: true, + // Unlike above, we know that there are no intents on this + // key, so we don't need to perform an inconsistent read. + }) + if err != nil { + return false, err + } + value, err = valueFn(exVal) + if err != nil { + return false, err + } } } } else { diff --git a/pkg/storage/testdata/mvcc_histories/inline b/pkg/storage/testdata/mvcc_histories/inline new file mode 100644 index 000000000000..d93168b233f4 --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/inline @@ -0,0 +1,80 @@ +## A simple test of inline operations. + +run ok +put k=i1 v=inline1 +put k=i2 v=inline2 +put k=i3 v=inline3 +---- +>> at end: +meta: "i1"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline1 mergeTs= txnDidNotUpdateMeta=false +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false + +run ok +get k=i1 +get k=i2 +get k=i3 +---- +get: "i1" -> /BYTES/inline1 @0,0 +get: "i2" -> /BYTES/inline2 @0,0 +get: "i3" -> /BYTES/inline3 @0,0 + +run ok +scan k=i1 end=i4 +---- +scan: "i1" -> /BYTES/inline1 @0,0 +scan: "i2" -> /BYTES/inline2 @0,0 +scan: "i3" -> /BYTES/inline3 @0,0 + +run ok +del k=i1 +---- +del: "i1": found key true +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false + +run error +cput k=i2 v=inline2b cond=incorrect +---- +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2 mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false +error: (*kvpb.ConditionFailedError:) unexpected value: raw_bytes:"\000\000\000\000\003inline2" timestamp:<> + +run ok +cput k=i2 v=inline2b cond=inline2 +---- +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false + +run error +initput k=i3 v=inline3b +---- +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false +error: (*kvpb.ConditionFailedError:) unexpected value: raw_bytes:"\000\000\000\000\003inline3" timestamp:<> + +run ok +initput k=i3 v=inline3 +---- +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false + +run error +increment k=i3 +---- +>> at end: +meta: "i2"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline2b mergeTs= txnDidNotUpdateMeta=false +meta: "i3"/0,0 -> txn={} ts=0,0 del=false klen=0 vlen=0 raw=/BYTES/inline3 mergeTs= txnDidNotUpdateMeta=false +error: (*withstack.withStack:) key "i3" does not contain an integer value + +run ok +del_range k=i1 end=i4 +---- +del_range: "i1"-"i4" -> deleted 2 key(s) +>> at end: + diff --git a/pkg/ts/query.go b/pkg/ts/query.go index 9fe0fd1308d6..bd3f10fdb2b3 100644 --- a/pkg/ts/query.go +++ b/pkg/ts/query.go @@ -845,10 +845,14 @@ func (db *DB) readFromDatabase( kd := diskResolution.SlabDuration() for currentTimestamp := startTimestamp; currentTimestamp <= timespan.EndNanos; currentTimestamp += kd { for _, source := range sources { - // If a TenantID is specified and is not the system tenant, only query - // data for that tenant source. - if tenantID.IsSet() && !tenantID.IsSystem() { - source = tsutil.MakeTenantSource(source, tenantID.String()) + // If a TenantID is specified we may need to format the source in order to retrieve the correct data. + // e.g. if not system tenant we need the source to be of format nodeID-tenantID but if it is the + // system tenant we need the source to be of format nodeID. Otherwise we get all the data via the + // format nodeID- + if tenantID.IsSet() { + if !tenantID.IsSystem() { + source = tsutil.MakeTenantSource(source, tenantID.String()) + } key := MakeDataKey(seriesName, source, diskResolution, currentTimestamp) b.Get(key) } else { @@ -905,7 +909,7 @@ func (db *DB) readAllSourcesFromDatabase( return nil, err } - if !tenantID.IsSet() || tenantID.IsSystem() { + if !tenantID.IsSet() { return b.Results[0].Rows, nil } @@ -917,7 +921,7 @@ func (db *DB) readAllSourcesFromDatabase( return nil, err } _, tenantSource := tsutil.DecodeSource(source) - if tenantSource == tenantID.String() { + if tenantID.IsSystem() && tenantSource == "" || tenantSource == tenantID.String() { rows = append(rows, row) } } diff --git a/pkg/ts/server_test.go b/pkg/ts/server_test.go index 80858c46ea8a..1c8bbe02018d 100644 --- a/pkg/ts/server_test.go +++ b/pkg/ts/server_test.go @@ -368,8 +368,8 @@ func TestServerQueryTenant(t *testing.T) { t.Fatal(err) } - // System tenant should aggregate across all tenants. - expectedSystemResult := &tspb.TimeSeriesQueryResponse{ + // Undefined tenant ID should aggregate across all tenants. + expectedAggregatedResult := &tspb.TimeSeriesQueryResponse{ Results: []tspb.TimeSeriesQueryResponse_Result{ { Query: tspb.Query{ @@ -408,7 +408,7 @@ func TestServerQueryTenant(t *testing.T) { conn := s.RPCClientConn(t, username.RootUserName()) client := tspb.NewTimeSeriesClient(conn) - systemResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{ + aggregatedResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{ StartNanos: 400 * 1e9, EndNanos: 500 * 1e9, Queries: []tspb.Query{ @@ -417,6 +417,7 @@ func TestServerQueryTenant(t *testing.T) { Sources: []string{"1"}, }, { + // Not providing a source (nodeID or storeID) will aggregate across all sources. Name: "test.metric", }, }, @@ -424,6 +425,70 @@ func TestServerQueryTenant(t *testing.T) { if err != nil { t.Fatal(err) } + for _, r := range aggregatedResponse.Results { + sort.Strings(r.Sources) + } + require.Equal(t, expectedAggregatedResult, aggregatedResponse) + + // System tenant ID should provide system tenant ts data. + systemID := roachpb.MustMakeTenantID(1) + expectedSystemResult := &tspb.TimeSeriesQueryResponse{ + Results: []tspb.TimeSeriesQueryResponse_Result{ + { + Query: tspb.Query{ + Name: "test.metric", + Sources: []string{"1"}, + TenantID: systemID, + }, + Datapoints: []tspb.TimeSeriesDatapoint{ + { + TimestampNanos: 400 * 1e9, + Value: 100.0, + }, + { + TimestampNanos: 500 * 1e9, + Value: 200.0, + }, + }, + }, + { + Query: tspb.Query{ + Name: "test.metric", + Sources: []string{"1", "10"}, + TenantID: systemID, + }, + Datapoints: []tspb.TimeSeriesDatapoint{ + { + TimestampNanos: 400 * 1e9, + Value: 300.0, + }, + { + TimestampNanos: 500 * 1e9, + Value: 600.0, + }, + }, + }, + }, + } + + systemResponse, err := client.Query(context.Background(), &tspb.TimeSeriesQueryRequest{ + StartNanos: 400 * 1e9, + EndNanos: 500 * 1e9, + Queries: []tspb.Query{ + { + Name: "test.metric", + Sources: []string{"1"}, + TenantID: systemID, + }, + { + Name: "test.metric", + TenantID: systemID, + }, + }, + }) + if err != nil { + t.Fatal(err) + } for _, r := range systemResponse.Results { sort.Strings(r.Sources) } @@ -489,6 +554,7 @@ func TestServerQueryTenant(t *testing.T) { Sources: []string{"1"}, }, { + // Not providing a source (nodeID or storeID) will aggregate across all sources. Name: "test.metric", }, },