Skip to content

Commit

Permalink
feat: Add aggregate support to the bigtable emulator (#9530)
Browse files Browse the repository at this point in the history
Change-Id: I3348aea3101eb87bab43c219a5af59b654491486
  • Loading branch information
steveniemitz authored Mar 8, 2024
1 parent 5b6b8be commit c250928
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 32 deletions.
62 changes: 59 additions & 3 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,23 @@ const (
Unprotected
)

// TableConf contains all of the information necessary to create a table with column families.
// Family represents a column family with its optional GC policy and value type.
type Family struct {
GCPolicy GCPolicy
ValueType Type
}

// TableConf contains all the information necessary to create a table with column families.
type TableConf struct {
TableID string
SplitKeys []string
// Families is a map from family name to GCPolicy
// DEPRECATED: Use ColumnFamilies instead.
// Families is a map from family name to GCPolicy.
// Only one of Families or ColumnFamilies may be set.
Families map[string]GCPolicy
// ColumnFamilies is a map from family name to family configuration.
// Only one of Families or ColumnFamilies may be set.
ColumnFamilies map[string]Family
// DeletionProtection can be none, protected or unprotected
// set to protected to make the table protected against data loss
DeletionProtection DeletionProtection
Expand Down Expand Up @@ -283,7 +294,28 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration))
}
if conf.Families != nil {
if conf.Families != nil && conf.ColumnFamilies != nil {
return errors.New("only one of Families or ColumnFamilies may be set, not both")
}

if conf.ColumnFamilies != nil {
tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
for fam, config := range conf.ColumnFamilies {
var gcPolicy *btapb.GcRule
if config.GCPolicy != nil {
gcPolicy = config.GCPolicy.proto()
} else {
gcPolicy = &btapb.GcRule{}
}

var typeProto *btapb.Type = nil
if config.ValueType != nil {
typeProto = config.ValueType.proto()
}

tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: gcPolicy, ValueType: typeProto}
}
} else if conf.Families != nil {
tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily)
for fam, policy := range conf.Families {
tbl.ColumnFamilies[fam] = &btapb.ColumnFamily{GcRule: policy.proto()}
Expand Down Expand Up @@ -316,6 +348,30 @@ func (ac *AdminClient) CreateColumnFamily(ctx context.Context, table, family str
return err
}

// CreateColumnFamilyWithConfig creates a new column family in a table with an optional GC policy and value type.
func (ac *AdminClient) CreateColumnFamilyWithConfig(ctx context.Context, table, family string, config Family) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

cf := &btapb.ColumnFamily{}
if config.GCPolicy != nil {
cf.GcRule = config.GCPolicy.proto()
}
if config.ValueType != nil {
cf.ValueType = config.ValueType.proto()
}

req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: family,
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: cf},
}},
}
_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
return err
}

// UpdateTableConf contains all of the information necessary to update a table with column families.
type UpdateTableConf struct {
tableID string
Expand Down
15 changes: 15 additions & 0 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,21 @@ func (m *Mutation) DeleteRow() {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
}

// AddIntToCell adds an int64 value to a cell in an aggregate column family. The column family must
// have an input type of Int64 or this mutation will fail.
func (m *Mutation) AddIntToCell(family, column string, ts Timestamp, value int64) {
m.addToCell(family, column, ts, &btpb.Value{Kind: &btpb.Value_IntValue{IntValue: value}})
}

func (m *Mutation) addToCell(family, column string, ts Timestamp, value *btpb.Value) {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_AddToCell_{AddToCell: &btpb.Mutation_AddToCell{
FamilyName: family,
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte(column)}},
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: int64(ts.TruncateToMilliseconds())}},
Input: value,
}}})
}

// entryErr is a container that combines an entry with the error that was returned for it.
// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
type entryErr struct {
Expand Down
63 changes: 63 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package bigtable

import (
"bytes"
"context"
"encoding/binary"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -748,3 +750,64 @@ func TestHeaderPopulatedWithAppProfile(t *testing.T) {
t.Errorf("Incorrect value in resourcePrefixHeader. Got %s, want %s", got, want)
}
}

func TestMutateRowsWithAggregates(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()

tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

m := NewMutation()
m.AddIntToCell("f", "q", 0, 1000)
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.AddIntToCell("f", "q", 0, 2000)
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}

row, err := table.ReadRow(ctx, "row1")
if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) {
t.Error()
}
}
102 changes: 79 additions & 23 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,7 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu
if _, ok := tbl.families[mod.Id]; ok {
return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id)
}
newcf := &columnFamily{
name: req.Name + "/columnFamilies/" + mod.Id,
order: tbl.counter,
gcRule: create.GcRule,
}
newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, tbl.counter, create)
tbl.counter++
tbl.families[mod.Id] = newcf
} else if mod.GetDrop() {
Expand All @@ -324,10 +320,7 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu
if _, ok := tbl.families[mod.Id]; !ok {
return nil, fmt.Errorf("no such family %q", mod.Id)
}
newcf := &columnFamily{
name: req.Name + "/columnFamilies/" + mod.Id,
gcRule: modify.GcRule,
}
newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify)
// assume that we ALWAYS want to replace by the new setting
// we may need partial update through
tbl.families[mod.Id] = newcf
Expand Down Expand Up @@ -1085,7 +1078,8 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co
return fmt.Errorf("can't handle mutation type %T", mut)
case *btpb.Mutation_SetCell_:
set := mut.SetCell
if _, ok := fs[set.FamilyName]; !ok {
var cf, ok = fs[set.FamilyName]
if !ok {
return fmt.Errorf("unknown family %q", set.FamilyName)
}
ts := set.TimestampMicros
Expand All @@ -1100,7 +1094,36 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co

newCell := cell{ts: ts, value: set.Value}
f := r.getOrCreateFamily(fam, fs[fam].order)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)
case *btpb.Mutation_AddToCell_:
add := mut.AddToCell
var cf, ok = fs[add.FamilyName]
if !ok {
return fmt.Errorf("unknown family %q", add.FamilyName)
}
if cf.valueType == nil || cf.valueType.GetAggregateType() == nil {
return fmt.Errorf("illegal attempt to use AddToCell on non-aggregate cell")
}
ts := add.Timestamp.GetRawTimestampMicros()
if ts < 0 {
return fmt.Errorf("AddToCell must set timestamp >= 0")
}

fam := add.FamilyName
col := string(add.GetColumnQualifier().GetRawValue())

var value []byte
switch v := add.Input.Kind.(type) {
case *btpb.Value_IntValue:
value = binary.BigEndian.AppendUint64(value, uint64(v.IntValue))
default:
return fmt.Errorf("only int64 values are supported")
}

newCell := cell{ts: ts, value: value}
f := r.getOrCreateFamily(fam, fs[fam].order)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)

case *btpb.Mutation_DeleteFromColumn_:
del := mut.DeleteFromColumn
if _, ok := fs[del.FamilyName]; !ok {
Expand Down Expand Up @@ -1176,16 +1199,18 @@ func newTimestamp() int64 {
return ts
}

func appendOrReplaceCell(cs []cell, newCell cell) []cell {
func appendOrReplaceCell(cs []cell, newCell cell, cf *columnFamily) []cell {
replaced := false
for i, cell := range cs {
if cell.ts == newCell.ts {
newCell.value = cf.updateFn(cs[i].value, newCell.value)
cs[i] = newCell
replaced = true
break
}
}
if !replaced {
newCell.value = cf.initFn(newCell.value)
cs = append(cs, newCell)
}
sort.Sort(byDescTS(cs))
Expand All @@ -1212,7 +1237,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
// Assume all mutations apply to the most recent version of the cell.
// TODO(dsymonds): Verify this assumption and document it in the proto.
for _, rule := range req.Rules {
if _, ok := fs[rule.FamilyName]; !ok {
var cf, ok = fs[rule.FamilyName]
if !ok {
return nil, fmt.Errorf("unknown family %q", rule.FamilyName)
}

Expand Down Expand Up @@ -1255,7 +1281,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
}

// Store the new cell
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)

// Store a copy for the result row
resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order)
Expand Down Expand Up @@ -1378,11 +1404,7 @@ func newTable(ctr *btapb.CreateTableRequest) *table {
c := uint64(0)
if ctr.Table != nil {
for id, cf := range ctr.Table.ColumnFamilies {
fams[id] = &columnFamily{
name: ctr.Parent + "/columnFamilies/" + id,
order: c,
gcRule: cf.GcRule,
}
fams[id] = newColumnFamily(ctr.Parent+"/columnFamilies/"+id, c, cf)
c++
}
}
Expand Down Expand Up @@ -1681,15 +1703,49 @@ func (b byDescTS) Len() int { return len(b) }
func (b byDescTS) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts }

func newColumnFamily(name string, order uint64, cf *btapb.ColumnFamily) *columnFamily {
var updateFn = func(_, newVal []byte) []byte {
return newVal
}
if cf.ValueType != nil {
switch v := cf.ValueType.Kind.(type) {
case *btapb.Type_AggregateType:
switch v.AggregateType.Aggregator.(type) {
case *btapb.Type_Aggregate_Sum_:
updateFn = func(existing, newVal []byte) []byte {
existingInt := int64(binary.BigEndian.Uint64(existing))
newInt := int64(binary.BigEndian.Uint64(newVal))
return binary.BigEndian.AppendUint64([]byte{}, uint64(existingInt+newInt))
}
}
default:
}
}
return &columnFamily{
name: name,
order: order,
gcRule: cf.GcRule,
valueType: cf.ValueType,
updateFn: updateFn,
initFn: func(newVal []byte) []byte {
return newVal
},
}
}

type columnFamily struct {
name string
order uint64 // Creation order of column family
gcRule *btapb.GcRule
name string
order uint64 // Creation order of column family
gcRule *btapb.GcRule
valueType *btapb.Type
updateFn func(existing, newVal []byte) []byte
initFn func(newVal []byte) []byte
}

func (c *columnFamily) proto() *btapb.ColumnFamily {
return &btapb.ColumnFamily{
GcRule: c.gcRule,
GcRule: c.gcRule,
ValueType: c.valueType,
}
}

Expand Down
Loading

0 comments on commit c250928

Please sign in to comment.