Skip to content

Commit

Permalink
Add GetPartitions function to emulator server (googleapis#5390)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Schmidt <[email protected]>
Co-authored-by: Baha Aiman <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 32cbf56 commit c10a4bd
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
60 changes: 60 additions & 0 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,15 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
return nil
}

func (s *server) GetPartitionsByTableName(name string) []*btpb.RowRange {
table, ok := s.tables[name]
if !ok {
return nil
}
return table.rowRanges()

}

// streamRow filters the given row and sends it via the given stream.
// Returns true if at least one cell matched the filter and was streamed, false otherwise.
func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter, s *btpb.ReadIterationStats, ff *btpb.FeatureFlags) error {
Expand Down Expand Up @@ -1461,6 +1470,7 @@ type table struct {
counter uint64 // increment by 1 when a new family is created
families map[string]*columnFamily // keyed by plain family name
rows *btree.BTree // indexed by row key
partitions []*btpb.RowRange // partitions used in change stream
isProtected bool // whether this table has deletion protection
}

Expand All @@ -1475,10 +1485,56 @@ func newTable(ctr *btapb.CreateTableRequest) *table {
c++
}
}

// Hard code the partitions for testing purpose.
rowRanges := []*btpb.RowRange{
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("c")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("d")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("e")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("f")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("g")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("h")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("i")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("j")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("k")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("l")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("m")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("n")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("o")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("p")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("q")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("r")},
},
{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("s")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("z")},
},
}

return &table{
families: fams,
counter: c,
rows: btree.New(btreeDegree),
partitions: rowRanges,
isProtected: ctr.GetTable().GetDeletionProtection(),
}
}
Expand Down Expand Up @@ -1577,6 +1633,10 @@ func (t *table) gcReadOnly() (toDelete []btree.Item) {
return toDelete
}

func (t *table) rowRanges() []*btpb.RowRange {
return t.partitions
}

type byRowKey []*row

func (b byRowKey) Len() int { return len(b) }
Expand Down
46 changes: 46 additions & 0 deletions bigtable/bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,52 @@ func TestCreateTableWithFamily(t *testing.T) {
}
}

func TestGetPartitionsByTableName(t *testing.T) {
s := &server{
tables: make(map[string]*table),
}
ctx := context.Background()
newTbl := btapb.Table{
ColumnFamilies: map[string]*btapb.ColumnFamily{
"cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 123}}},
"cf2": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 456}}},
},
}
_, err1 := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t1", Table: &newTbl})
if err1 != nil {
t.Fatalf("Creating table: %v", err1)
}

newTbl = btapb.Table{
ColumnFamilies: map[string]*btapb.ColumnFamily{
"cf3": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 567}}},
"cf4": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 890}}},
},
}
_, err2 := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t2", Table: &newTbl})
if err2 != nil {
t.Fatalf("Creating table: %v", err2)
}

tblNamePrefix := "cluster" + "/tables/"

// A random table name doesn't return partitions.
partitions := s.GetPartitionsByTableName(tblNamePrefix + "random")
if partitions != nil {
t.Fatalf("Getting partitions for table random")
}

partitions = s.GetPartitionsByTableName(tblNamePrefix + "t1")
if len(partitions) != 10 {
t.Fatalf("Getting partitions for table t1")
}

partitions = s.GetPartitionsByTableName(tblNamePrefix + "t2")
if len(partitions) != 10 {
t.Fatalf("Getting partitions for table t2")
}
}

type MockSampleRowKeysServer struct {
responses []*btpb.SampleRowKeysResponse
grpc.ServerStream
Expand Down
1 change: 1 addition & 0 deletions internal/generated/snippets/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ require (
cloud.google.com/go/edgenetwork v0.0.0-00010101000000-000000000000
cloud.google.com/go/identitytoolkit v0.0.0-00010101000000-000000000000
cloud.google.com/go/managedkafka v0.0.0-00010101000000-000000000000
cloud.google.com/go/memorystore v0.0.0-00010101000000-000000000000
cloud.google.com/go/migrationcenter v0.0.0-00010101000000-000000000000
cloud.google.com/go/netapp v0.0.0-00010101000000-000000000000
cloud.google.com/go/networkservices v0.0.0-00010101000000-000000000000
Expand Down

0 comments on commit c10a4bd

Please sign in to comment.