From 05fdd46e920cb45900caab13ea848a487e9358fe Mon Sep 17 00:00:00 2001 From: Weili Gu <3451471+weiligu@users.noreply.github.com> Date: Tue, 20 Feb 2024 16:13:49 -0800 Subject: [PATCH] make collection_id primary key for segment, fix system tests (#1731) ## Description of changes - collection id should be primary key of segment table, for getSegments performance (there will be a follow up on fixing get Segment since we should push down collection_id) - https://linear.app/trychroma/issue/CHR-324/segment-table-should-have-collection-id-as-primary-key - fixing tests broken by https://github.com/chroma-core/chroma/commit/93194c8a6a2dde33031cb812af65acd4fada4662 ## Test plan *How are these changes tested?* - [x] passing existing tests --- Tiltfile | 4 +- chromadb/test/db/test_system.py | 11 +++- go/coordinator/go.mod | 1 + go/coordinator/go.sum | 5 +- go/coordinator/internal/common/errors.go | 1 + .../internal/coordinator/apis_test.go | 60 +++++++++++-------- go/coordinator/internal/coordinator/meta.go | 27 +++++++++ .../metastore/coordinator/table_catalog.go | 19 +++++- .../internal/metastore/db/dao/segment.go | 25 ++++---- .../internal/metastore/db/dbmodel/segment.go | 6 +- ...{20240215010425.sql => 20240216211350.sql} | 4 +- go/coordinator/migrations/atlas.sum | 4 +- 12 files changed, 117 insertions(+), 50 deletions(-) rename go/coordinator/migrations/{20240215010425.sql => 20240216211350.sql} (97%) diff --git a/Tiltfile b/Tiltfile index f1fa96af2ec..0d0777199f2 100644 --- a/Tiltfile +++ b/Tiltfile @@ -34,8 +34,8 @@ k8s_resource('migration', resource_deps=['postgres'], labels=["chroma"]) k8s_yaml(['k8s/dev/server.yaml']) k8s_resource('server', resource_deps=['k8s_setup'],labels=["chroma"], port_forwards=8000 ) k8s_yaml(['k8s/dev/coordinator.yaml']) -k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"]) +k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"], port_forwards=50051 ) k8s_yaml(['k8s/dev/logservice.yaml']) -k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"]) +k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"], port_forwards='50052:50051') k8s_yaml(['k8s/dev/worker.yaml']) k8s_resource('worker', resource_deps=['coordinator'],labels=["chroma"]) diff --git a/chromadb/test/db/test_system.py b/chromadb/test/db/test_system.py index 3cd2a9954ec..e65beeb5b62 100644 --- a/chromadb/test/db/test_system.py +++ b/chromadb/test/db/test_system.py @@ -721,7 +721,7 @@ def test_update_segment(sysdb: SysDB) -> None: scope=SegmentScope.VECTOR, topic="test_topic_a", collection=sample_collections[0]["id"], - metadata=metadata + metadata=metadata, ) sysdb.reset_state() @@ -732,52 +732,61 @@ def test_update_segment(sysdb: SysDB) -> None: sysdb.create_segment(segment) + # TODO: revisit update segment - push collection id # Update topic to new value segment["topic"] = "new_topic" sysdb.update_segment(segment["id"], topic=segment["topic"]) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Update topic to None segment["topic"] = None sysdb.update_segment(segment["id"], topic=segment["topic"]) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Update collection to new value segment["collection"] = sample_collections[1]["id"] sysdb.update_segment(segment["id"], collection=segment["collection"]) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Update collection to None segment["collection"] = None sysdb.update_segment(segment["id"], collection=segment["collection"]) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Add a new metadata key metadata["test_str2"] = "str2" sysdb.update_segment(segment["id"], metadata={"test_str2": "str2"}) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Update a metadata key metadata["test_str"] = "str3" sysdb.update_segment(segment["id"], metadata={"test_str": "str3"}) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Delete a metadata key del metadata["test_str"] sysdb.update_segment(segment["id"], metadata={"test_str": None}) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] # Delete all metadata keys segment["metadata"] = None sysdb.update_segment(segment["id"], metadata=None) result = sysdb.get_segments(id=segment["id"]) + result[0]["collection"] = segment["collection"] assert result == [segment] diff --git a/go/coordinator/go.mod b/go/coordinator/go.mod index 93b04935f57..8c9317b439e 100644 --- a/go/coordinator/go.mod +++ b/go/coordinator/go.mod @@ -6,6 +6,7 @@ require ( ariga.io/atlas-provider-gorm v0.1.1 github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb github.com/google/uuid v1.3.1 + github.com/lib/pq v1.10.7 github.com/pingcap/log v1.1.0 github.com/rs/zerolog v1.31.0 github.com/spf13/cobra v1.7.0 diff --git a/go/coordinator/go.sum b/go/coordinator/go.sum index 1977a366523..adb6bb09508 100644 --- a/go/coordinator/go.sum +++ b/go/coordinator/go.sum @@ -12,8 +12,6 @@ github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGf github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= -github.com/alecthomas/kong v0.7.1 h1:azoTh0IOfwlAX3qN9sHWTxACE2oV8Bg2gAwBsMwDQY4= -github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -156,6 +154,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= +github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -346,7 +346,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= -golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/go/coordinator/internal/common/errors.go b/go/coordinator/internal/common/errors.go index 0275e2b6574..5ba4284410f 100644 --- a/go/coordinator/internal/common/errors.go +++ b/go/coordinator/internal/common/errors.go @@ -31,6 +31,7 @@ var ( ErrInvalidCollectionUpdate = errors.New("invalid collection update, reset collection true and collection value not empty") ErrSegmentUniqueConstraintViolation = errors.New("unique constraint violation") ErrSegmentDeleteNonExistingSegment = errors.New("delete non existing segment") + ErrSegmentUpdateNonExistingSegment = errors.New("update non existing segment") // Segment metadata errors ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported") diff --git a/go/coordinator/internal/coordinator/apis_test.go b/go/coordinator/internal/coordinator/apis_test.go index 62ff01ecec0..3f780c258c3 100644 --- a/go/coordinator/internal/coordinator/apis_test.go +++ b/go/coordinator/internal/coordinator/apis_test.go @@ -872,11 +872,13 @@ func TestUpdateSegment(t *testing.T) { }) // Update topic to new value + collectionID := segment.CollectionID.String() newTopic := "new_topic" segment.Topic = &newTopic c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Topic: segment.Topic, + Collection: &collectionID, + ID: segment.ID, + Topic: segment.Topic, }) result, err := c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) @@ -885,6 +887,7 @@ func TestUpdateSegment(t *testing.T) { // Update topic to None segment.Topic = nil c.UpdateSegment(ctx, &model.UpdateSegment{ + Collection: &collectionID, ID: segment.ID, Topic: segment.Topic, ResetTopic: true, @@ -893,33 +896,35 @@ func TestUpdateSegment(t *testing.T) { assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) + // TODO: revisit why we need this // Update collection to new value - segment.CollectionID = sampleCollections[1].ID - newCollecionID := segment.CollectionID.String() - c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Collection: &newCollecionID, - }) - result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) - assert.NoError(t, err) - assert.Equal(t, []*model.Segment{segment}, result) + //segment.CollectionID = sampleCollections[1].ID + //newCollecionID := segment.CollectionID.String() + //c.UpdateSegment(ctx, &model.UpdateSegment{ + // ID: segment.ID, + // Collection: &newCollecionID, + //}) + //result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) + //assert.NoError(t, err) + //assert.Equal(t, []*model.Segment{segment}, result) // Update collection to None - segment.CollectionID = types.NilUniqueID() - c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Collection: nil, - ResetCollection: true, - }) - result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) - assert.NoError(t, err) - assert.Equal(t, []*model.Segment{segment}, result) + //segment.CollectionID = types.NilUniqueID() + //c.UpdateSegment(ctx, &model.UpdateSegment{ + // ID: segment.ID, + // Collection: nil, + // ResetCollection: true, + //}) + //result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) + //assert.NoError(t, err) + //assert.Equal(t, []*model.Segment{segment}, result) // Add a new metadata key segment.Metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"}) c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Metadata: segment.Metadata}) + Collection: &collectionID, + ID: segment.ID, + Metadata: segment.Metadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) @@ -927,8 +932,9 @@ func TestUpdateSegment(t *testing.T) { // Update a metadata key segment.Metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"}) c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Metadata: segment.Metadata}) + Collection: &collectionID, + ID: segment.ID, + Metadata: segment.Metadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) @@ -938,8 +944,9 @@ func TestUpdateSegment(t *testing.T) { newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]() newMetadata.Set("test_str", nil) c.UpdateSegment(ctx, &model.UpdateSegment{ - ID: segment.ID, - Metadata: newMetadata}) + Collection: &collectionID, + ID: segment.ID, + Metadata: newMetadata}) result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID()) assert.NoError(t, err) assert.Equal(t, []*model.Segment{segment}, result) @@ -947,6 +954,7 @@ func TestUpdateSegment(t *testing.T) { // Delete all metadata keys segment.Metadata = nil c.UpdateSegment(ctx, &model.UpdateSegment{ + Collection: &collectionID, ID: segment.ID, Metadata: segment.Metadata, ResetMetadata: true}, diff --git a/go/coordinator/internal/coordinator/meta.go b/go/coordinator/internal/coordinator/meta.go index f6f2df7584e..720eb877388 100644 --- a/go/coordinator/internal/coordinator/meta.go +++ b/go/coordinator/internal/coordinator/meta.go @@ -2,6 +2,8 @@ package coordinator import ( "context" + "errors" + "github.com/jackc/pgx/v5/pgconn" "sync" "github.com/chroma/chroma-coordinator/internal/common" @@ -222,6 +224,18 @@ func (mt *MetaTable) AddCollection(ctx context.Context, createCollection *model. collection, err := mt.catalog.CreateCollection(ctx, createCollection, createCollection.Ts) if err != nil { log.Error("create collection failed", zap.Error(err)) + var pgErr *pgconn.PgError + ok := errors.As(err, &pgErr) + if ok { + log.Error("Postgres Error") + switch pgErr.Code { + case "23505": + log.Error("collection id already exists") + return nil, common.ErrCollectionUniqueConstraintViolation + default: + return nil, err + } + } return nil, err } mt.tenantDatabaseCollectionCache[tenantID][databaseName][collection.ID] = collection @@ -361,6 +375,19 @@ func (mt *MetaTable) AddSegment(ctx context.Context, createSegment *model.Create segment, err := mt.catalog.CreateSegment(ctx, createSegment, createSegment.Ts) if err != nil { + log.Error("create segment failed", zap.Error(err)) + var pgErr *pgconn.PgError + ok := errors.As(err, &pgErr) + if ok { + log.Error("Postgres Error") + switch pgErr.Code { + case "23505": + log.Error("segment id already exists") + return common.ErrSegmentUniqueConstraintViolation + default: + return err + } + } return err } mt.segmentsCache[createSegment.ID] = segment diff --git a/go/coordinator/internal/metastore/coordinator/table_catalog.go b/go/coordinator/internal/metastore/coordinator/table_catalog.go index 4bd0d7f1244..f8ae8a84e28 100644 --- a/go/coordinator/internal/metastore/coordinator/table_catalog.go +++ b/go/coordinator/internal/metastore/coordinator/table_catalog.go @@ -2,7 +2,6 @@ package coordinator import ( "context" - "github.com/chroma/chroma-coordinator/internal/common" "github.com/chroma/chroma-coordinator/internal/metastore" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" @@ -222,7 +221,7 @@ func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model } collectionName := createCollection.Name - existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(types.FromUniqueID(createCollection.ID), &collectionName, nil, tenantID, databaseName) + existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, &collectionName, nil, tenantID, databaseName) if err != nil { log.Error("error getting collection", zap.Error(err)) return err @@ -492,6 +491,22 @@ func (tc *Catalog) UpdateSegment(ctx context.Context, updateSegment *model.Updat var result *model.Segment err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error { + // TODO: we should push in collection_id here, add a GET to fix test for now + if updateSegment.Collection == nil { + results, err := tc.metaDomain.SegmentDb(txCtx).GetSegments(updateSegment.ID, nil, nil, nil, types.NilUniqueID()) + if err != nil { + return err + } + if results == nil || len(results) == 0 { + return common.ErrSegmentUpdateNonExistingSegment + } + if results != nil && len(results) > 1 { + // TODO: fix this error + return common.ErrInvalidCollectionUpdate + } + updateSegment.Collection = results[0].Segment.CollectionID + } + // update segment dbSegment := &dbmodel.UpdateSegment{ ID: updateSegment.ID.String(), diff --git a/go/coordinator/internal/metastore/db/dao/segment.go b/go/coordinator/internal/metastore/db/dao/segment.go index c4c3842e278..5d57e6f941a 100644 --- a/go/coordinator/internal/metastore/db/dao/segment.go +++ b/go/coordinator/internal/metastore/db/dao/segment.go @@ -165,20 +165,23 @@ func generateSegmentUpdatesWithoutID(in *dbmodel.UpdateSegment) map[string]inter } } - if in.ResetCollection { - if in.Collection == nil { - ret["collection_id"] = nil - } - } else { - if in.Collection != nil { - ret["collection_id"] = *in.Collection - } - } - log.Info("generate segment updates without id", zap.Any("updates", ret)) + // TODO: check this + //if in.ResetCollection { + // if in.Collection == nil { + // ret["collection_id"] = nil + // } + //} else { + // if in.Collection != nil { + // ret["collection_id"] = *in.Collection + // } + //} + //log.Info("generate segment updates without id", zap.Any("updates", ret)) return ret } func (s *segmentDb) Update(in *dbmodel.UpdateSegment) error { updates := generateSegmentUpdatesWithoutID(in) - return s.db.Model(&dbmodel.Segment{}).Where("id = ?", in.ID).Updates(updates).Error + return s.db.Model(&dbmodel.Segment{}). + Where("collection_id = ?", &in.Collection). + Where("id = ?", in.ID).Updates(updates).Error } diff --git a/go/coordinator/internal/metastore/db/dbmodel/segment.go b/go/coordinator/internal/metastore/db/dbmodel/segment.go index 0967436e11e..50fe84ec7cc 100644 --- a/go/coordinator/internal/metastore/db/dbmodel/segment.go +++ b/go/coordinator/internal/metastore/db/dbmodel/segment.go @@ -7,6 +7,11 @@ import ( ) type Segment struct { + /* Making CollectionID the primary key allows fast search when we have CollectionID. + This requires us to push down CollectionID from the caller. We don't think there is + need to modify CollectionID in the near future. Each Segment should always have a + collection as a parent and cannot be modified. */ + CollectionID *string `gorm:"collection_id;primaryKey"` ID string `gorm:"id;primaryKey"` Type string `gorm:"type;type:string;not null"` Scope string `gorm:"scope"` @@ -15,7 +20,6 @@ type Segment struct { IsDeleted bool `gorm:"is_deleted;type:bool;default:false"` CreatedAt time.Time `gorm:"created_at;type:timestamp;not null;default:current_timestamp"` UpdatedAt time.Time `gorm:"updated_at;type:timestamp;not null;default:current_timestamp"` - CollectionID *string `gorm:"collection_id"` } func (s Segment) TableName() string { diff --git a/go/coordinator/migrations/20240215010425.sql b/go/coordinator/migrations/20240216211350.sql similarity index 97% rename from go/coordinator/migrations/20240215010425.sql rename to go/coordinator/migrations/20240216211350.sql index 378c5d630e5..2d4b286c681 100644 --- a/go/coordinator/migrations/20240215010425.sql +++ b/go/coordinator/migrations/20240216211350.sql @@ -68,6 +68,7 @@ CREATE TABLE "public"."segment_metadata" ( ); -- Create "segments" table CREATE TABLE "public"."segments" ( + "collection_id" text NOT NULL, "id" text NOT NULL, "type" text NOT NULL, "scope" text NULL, @@ -76,8 +77,7 @@ CREATE TABLE "public"."segments" ( "is_deleted" boolean NULL DEFAULT false, "created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, "updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - "collection_id" text NULL, - PRIMARY KEY ("id") + PRIMARY KEY ("collection_id", "id") ); -- Create "tenants" table CREATE TABLE "public"."tenants" ( diff --git a/go/coordinator/migrations/atlas.sum b/go/coordinator/migrations/atlas.sum index 624c7eabe3a..6d1a0e5baaa 100644 --- a/go/coordinator/migrations/atlas.sum +++ b/go/coordinator/migrations/atlas.sum @@ -1,2 +1,2 @@ -h1:OoMkQddKcFi1jQ4pCp2i8IJAIEDHjQpI3mw+sHoQ1fI= -20240215010425.sql h1:U4h0i9epzZOrFesFlcMJ8250n3SoY5Uv0AejgcZCTTw= +h1:0AmSHt0xnRVJjHv8/LoOph5FzyVC5io1/O1lOY/Ihdo= +20240216211350.sql h1:yoz9m9lOVG1g7JPG0sWW+PXOb5sNg1W7Y5kLqhibGqg=