Skip to content

Commit

Permalink
make collection_id primary key for segment, fix system tests (chroma-…
Browse files Browse the repository at this point in the history
…core#1731)

## Description of changes

- collection id should be primary key of segment table, for getSegments
performance (there will be a follow up on fixing get Segment since we
should push down collection_id)
-
https://linear.app/trychroma/issue/CHR-324/segment-table-should-have-collection-id-as-primary-key
- fixing tests broken by
chroma-core@93194c8

## Test plan
*How are these changes tested?*

- [x] passing existing tests
  • Loading branch information
weiligu authored Feb 21, 2024
1 parent 8a0f67e commit 05fdd46
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 50 deletions.
4 changes: 2 additions & 2 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ k8s_resource('migration', resource_deps=['postgres'], labels=["chroma"])
k8s_yaml(['k8s/dev/server.yaml'])
k8s_resource('server', resource_deps=['k8s_setup'],labels=["chroma"], port_forwards=8000 )
k8s_yaml(['k8s/dev/coordinator.yaml'])
k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"])
k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"], port_forwards=50051 )
k8s_yaml(['k8s/dev/logservice.yaml'])
k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"])
k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"], port_forwards='50052:50051')
k8s_yaml(['k8s/dev/worker.yaml'])
k8s_resource('worker', resource_deps=['coordinator'],labels=["chroma"])
11 changes: 10 additions & 1 deletion chromadb/test/db/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_update_segment(sysdb: SysDB) -> None:
scope=SegmentScope.VECTOR,
topic="test_topic_a",
collection=sample_collections[0]["id"],
metadata=metadata
metadata=metadata,
)

sysdb.reset_state()
Expand All @@ -732,52 +732,61 @@ def test_update_segment(sysdb: SysDB) -> None:

sysdb.create_segment(segment)

# TODO: revisit update segment - push collection id
# Update topic to new value
segment["topic"] = "new_topic"
sysdb.update_segment(segment["id"], topic=segment["topic"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update topic to None
segment["topic"] = None
sysdb.update_segment(segment["id"], topic=segment["topic"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update collection to new value
segment["collection"] = sample_collections[1]["id"]
sysdb.update_segment(segment["id"], collection=segment["collection"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update collection to None
segment["collection"] = None
sysdb.update_segment(segment["id"], collection=segment["collection"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Add a new metadata key
metadata["test_str2"] = "str2"
sysdb.update_segment(segment["id"], metadata={"test_str2": "str2"})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update a metadata key
metadata["test_str"] = "str3"
sysdb.update_segment(segment["id"], metadata={"test_str": "str3"})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Delete a metadata key
del metadata["test_str"]
sysdb.update_segment(segment["id"], metadata={"test_str": None})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Delete all metadata keys
segment["metadata"] = None
sysdb.update_segment(segment["id"], metadata=None)
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]


Expand Down
1 change: 1 addition & 0 deletions go/coordinator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
ariga.io/atlas-provider-gorm v0.1.1
github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb
github.com/google/uuid v1.3.1
github.com/lib/pq v1.10.7
github.com/pingcap/log v1.1.0
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.7.0
Expand Down
5 changes: 2 additions & 3 deletions go/coordinator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGf
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/alecthomas/kong v0.7.1 h1:azoTh0IOfwlAX3qN9sHWTxACE2oV8Bg2gAwBsMwDQY4=
github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -156,6 +154,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
Expand Down Expand Up @@ -346,7 +346,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
1 change: 1 addition & 0 deletions go/coordinator/internal/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
ErrInvalidCollectionUpdate = errors.New("invalid collection update, reset collection true and collection value not empty")
ErrSegmentUniqueConstraintViolation = errors.New("unique constraint violation")
ErrSegmentDeleteNonExistingSegment = errors.New("delete non existing segment")
ErrSegmentUpdateNonExistingSegment = errors.New("update non existing segment")

// Segment metadata errors
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")
Expand Down
60 changes: 34 additions & 26 deletions go/coordinator/internal/coordinator/apis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,13 @@ func TestUpdateSegment(t *testing.T) {
})

// Update topic to new value
collectionID := segment.CollectionID.String()
newTopic := "new_topic"
segment.Topic = &newTopic
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Topic: segment.Topic,
Collection: &collectionID,
ID: segment.ID,
Topic: segment.Topic,
})
result, err := c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
Expand All @@ -885,6 +887,7 @@ func TestUpdateSegment(t *testing.T) {
// Update topic to None
segment.Topic = nil
c.UpdateSegment(ctx, &model.UpdateSegment{
Collection: &collectionID,
ID: segment.ID,
Topic: segment.Topic,
ResetTopic: true,
Expand All @@ -893,42 +896,45 @@ func TestUpdateSegment(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// TODO: revisit why we need this
// Update collection to new value
segment.CollectionID = sampleCollections[1].ID
newCollecionID := segment.CollectionID.String()
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Collection: &newCollecionID,
})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
//segment.CollectionID = sampleCollections[1].ID
//newCollecionID := segment.CollectionID.String()
//c.UpdateSegment(ctx, &model.UpdateSegment{
// ID: segment.ID,
// Collection: &newCollecionID,
//})
//result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
//assert.NoError(t, err)
//assert.Equal(t, []*model.Segment{segment}, result)

// Update collection to None
segment.CollectionID = types.NilUniqueID()
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Collection: nil,
ResetCollection: true,
})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
//segment.CollectionID = types.NilUniqueID()
//c.UpdateSegment(ctx, &model.UpdateSegment{
// ID: segment.ID,
// Collection: nil,
// ResetCollection: true,
//})
//result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
//assert.NoError(t, err)
//assert.Equal(t, []*model.Segment{segment}, result)

// Add a new metadata key
segment.Metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: segment.Metadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Update a metadata key
segment.Metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: segment.Metadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
Expand All @@ -938,15 +944,17 @@ func TestUpdateSegment(t *testing.T) {
newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]()
newMetadata.Set("test_str", nil)
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: newMetadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: newMetadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Delete all metadata keys
segment.Metadata = nil
c.UpdateSegment(ctx, &model.UpdateSegment{
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata,
ResetMetadata: true},
Expand Down
27 changes: 27 additions & 0 deletions go/coordinator/internal/coordinator/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package coordinator

import (
"context"
"errors"
"github.com/jackc/pgx/v5/pgconn"
"sync"

"github.com/chroma/chroma-coordinator/internal/common"
Expand Down Expand Up @@ -222,6 +224,18 @@ func (mt *MetaTable) AddCollection(ctx context.Context, createCollection *model.
collection, err := mt.catalog.CreateCollection(ctx, createCollection, createCollection.Ts)
if err != nil {
log.Error("create collection failed", zap.Error(err))
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if ok {
log.Error("Postgres Error")
switch pgErr.Code {
case "23505":
log.Error("collection id already exists")
return nil, common.ErrCollectionUniqueConstraintViolation
default:
return nil, err
}
}
return nil, err
}
mt.tenantDatabaseCollectionCache[tenantID][databaseName][collection.ID] = collection
Expand Down Expand Up @@ -361,6 +375,19 @@ func (mt *MetaTable) AddSegment(ctx context.Context, createSegment *model.Create

segment, err := mt.catalog.CreateSegment(ctx, createSegment, createSegment.Ts)
if err != nil {
log.Error("create segment failed", zap.Error(err))
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if ok {
log.Error("Postgres Error")
switch pgErr.Code {
case "23505":
log.Error("segment id already exists")
return common.ErrSegmentUniqueConstraintViolation
default:
return err
}
}
return err
}
mt.segmentsCache[createSegment.ID] = segment
Expand Down
19 changes: 17 additions & 2 deletions go/coordinator/internal/metastore/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package coordinator

import (
"context"

"github.com/chroma/chroma-coordinator/internal/common"
"github.com/chroma/chroma-coordinator/internal/metastore"
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel"
Expand Down Expand Up @@ -222,7 +221,7 @@ func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model
}

collectionName := createCollection.Name
existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(types.FromUniqueID(createCollection.ID), &collectionName, nil, tenantID, databaseName)
existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, &collectionName, nil, tenantID, databaseName)
if err != nil {
log.Error("error getting collection", zap.Error(err))
return err
Expand Down Expand Up @@ -492,6 +491,22 @@ func (tc *Catalog) UpdateSegment(ctx context.Context, updateSegment *model.Updat
var result *model.Segment

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// TODO: we should push in collection_id here, add a GET to fix test for now
if updateSegment.Collection == nil {
results, err := tc.metaDomain.SegmentDb(txCtx).GetSegments(updateSegment.ID, nil, nil, nil, types.NilUniqueID())
if err != nil {
return err
}
if results == nil || len(results) == 0 {
return common.ErrSegmentUpdateNonExistingSegment
}
if results != nil && len(results) > 1 {
// TODO: fix this error
return common.ErrInvalidCollectionUpdate
}
updateSegment.Collection = results[0].Segment.CollectionID
}

// update segment
dbSegment := &dbmodel.UpdateSegment{
ID: updateSegment.ID.String(),
Expand Down
25 changes: 14 additions & 11 deletions go/coordinator/internal/metastore/db/dao/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,23 @@ func generateSegmentUpdatesWithoutID(in *dbmodel.UpdateSegment) map[string]inter
}
}

if in.ResetCollection {
if in.Collection == nil {
ret["collection_id"] = nil
}
} else {
if in.Collection != nil {
ret["collection_id"] = *in.Collection
}
}
log.Info("generate segment updates without id", zap.Any("updates", ret))
// TODO: check this
//if in.ResetCollection {
// if in.Collection == nil {
// ret["collection_id"] = nil
// }
//} else {
// if in.Collection != nil {
// ret["collection_id"] = *in.Collection
// }
//}
//log.Info("generate segment updates without id", zap.Any("updates", ret))
return ret
}

func (s *segmentDb) Update(in *dbmodel.UpdateSegment) error {
updates := generateSegmentUpdatesWithoutID(in)
return s.db.Model(&dbmodel.Segment{}).Where("id = ?", in.ID).Updates(updates).Error
return s.db.Model(&dbmodel.Segment{}).
Where("collection_id = ?", &in.Collection).
Where("id = ?", in.ID).Updates(updates).Error
}
6 changes: 5 additions & 1 deletion go/coordinator/internal/metastore/db/dbmodel/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
)

type Segment struct {
/* Making CollectionID the primary key allows fast search when we have CollectionID.
This requires us to push down CollectionID from the caller. We don't think there is
need to modify CollectionID in the near future. Each Segment should always have a
collection as a parent and cannot be modified. */
CollectionID *string `gorm:"collection_id;primaryKey"`
ID string `gorm:"id;primaryKey"`
Type string `gorm:"type;type:string;not null"`
Scope string `gorm:"scope"`
Expand All @@ -15,7 +20,6 @@ type Segment struct {
IsDeleted bool `gorm:"is_deleted;type:bool;default:false"`
CreatedAt time.Time `gorm:"created_at;type:timestamp;not null;default:current_timestamp"`
UpdatedAt time.Time `gorm:"updated_at;type:timestamp;not null;default:current_timestamp"`
CollectionID *string `gorm:"collection_id"`
}

func (s Segment) TableName() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ CREATE TABLE "public"."segment_metadata" (
);
-- Create "segments" table
CREATE TABLE "public"."segments" (
"collection_id" text NOT NULL,
"id" text NOT NULL,
"type" text NOT NULL,
"scope" text NULL,
Expand All @@ -76,8 +77,7 @@ CREATE TABLE "public"."segments" (
"is_deleted" boolean NULL DEFAULT false,
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"collection_id" text NULL,
PRIMARY KEY ("id")
PRIMARY KEY ("collection_id", "id")
);
-- Create "tenants" table
CREATE TABLE "public"."tenants" (
Expand Down
Loading

0 comments on commit 05fdd46

Please sign in to comment.