From 93194c8a6a2dde33031cb812af65acd4fada4662 Mon Sep 17 00:00:00 2001 From: Weili Gu <3451471+weiligu@users.noreply.github.com> Date: Fri, 16 Feb 2024 10:46:33 -0800 Subject: [PATCH] Log Service Setup (#1721) ## Description of changes https://linear.app/trychroma/issue/CHR-241/stand-up-log-service - Stand up Log Service in Dev - stand up postgres DB - stand up migration: atlas - depend on postgres - stand up logservice - depend on migration - stand up coordinator - depend on migration - database migration - change env name - change database name - add definition for reccord log (we can test perf for this later, not hard to change) - log service: go - entry point: main with Cmd - grpc service: with proto change - coordinator - connect to docker postgres - reorganize packages to accommodate with logservice - rename bin to coordinator instead of chroma - tests connect to local postgres instead of sqlite - fix a bug from segment delete - system_test fix will be in a separate PR --- .../workflows/chroma-coordinator-test.yaml | 17 +++ Tiltfile | 13 +- bin/cluster-test.sh | 3 + chromadb/proto/chroma_pb2.py | 38 +++--- chromadb/proto/coordinator_pb2.py | 8 +- chromadb/proto/coordinator_pb2.pyi | 12 ++ chromadb/proto/logservice_pb2.py | 31 +++++ chromadb/proto/logservice_pb2.pyi | 4 + chromadb/proto/logservice_pb2_grpc.py | 31 +++++ go/coordinator/Dockerfile | 5 +- go/coordinator/Dockerfile.migration | 4 + go/coordinator/Makefile | 3 +- go/coordinator/atlas.hcl | 4 +- .../{grpccoordinator => coordinator}/cmd.go | 27 ++-- go/coordinator/cmd/{ => coordinator}/main.go | 3 +- go/coordinator/cmd/logservice/cmd.go | 46 +++++++ go/coordinator/cmd/logservice/main.go | 36 ++++++ go/coordinator/go.sum | 3 + .../grpc}/collection_service.go | 2 +- .../grpc}/collection_service_test.go | 4 +- .../grpc}/proto_model_convert.go | 2 +- .../grpc}/proto_model_convert_test.go | 2 +- .../grpc}/segment_service.go | 2 +- .../grpc}/server.go | 26 +--- .../grpc}/tenant_database_service.go | 2 +- .../{grpccoordinator => }/grpcutils/config.go | 0 .../grpcutils/config_test.go | 0 .../grpcutils/service.go | 0 go/coordinator/internal/logservice/apis.go | 11 ++ .../internal/logservice/grpc/server.go | 104 ++++++++++++++++ .../internal/logservice/recordlog.go | 33 +++++ .../internal/metastore/db/dao/common.go | 4 + .../internal/metastore/db/dao/record_log.go | 9 ++ .../metastore/db/dao/segment_metadata.go | 2 +- .../internal/metastore/db/dbcore/core.go | 39 ++++-- .../internal/metastore/db/dbmodel/common.go | 1 + .../metastore/db/dbmodel/mocks/IMetaDomain.go | 15 +++ .../metastore/db/dbmodel/record_log.go | 16 +++ .../internal/proto/coordinatorpb/chroma.pb.go | 10 +- .../proto/coordinatorpb/chroma_grpc.pb.go | 17 +-- .../proto/coordinatorpb/coordinator.pb.go | 4 +- .../coordinatorpb/coordinator_grpc.pb.go | 72 +++++------ .../proto/logservicepb/logservice.pb.go | 67 ++++++++++ .../proto/logservicepb/logservice_grpc.pb.go | 65 ++++++++++ go/coordinator/migrations/20231129183041.sql | 8 -- ...{20231116210409.sql => 20240215010425.sql} | 16 +++ go/coordinator/migrations/atlas.sum | 5 +- idl/chromadb/proto/logservice.proto | 8 ++ idl/makefile | 1 + k8s/deployment/kubernetes.yaml | 116 +++++++++++++++++- k8s/dev/coordinator.yaml | 4 +- k8s/dev/logservice.yaml | 39 ++++++ k8s/dev/migration.yaml | 22 ++++ k8s/dev/postgres.yaml | 41 +++++++ 54 files changed, 894 insertions(+), 163 deletions(-) create mode 100644 chromadb/proto/logservice_pb2.py create mode 100644 chromadb/proto/logservice_pb2.pyi create mode 100644 chromadb/proto/logservice_pb2_grpc.py create mode 100644 go/coordinator/Dockerfile.migration rename go/coordinator/cmd/{grpccoordinator => coordinator}/cmd.go (64%) rename go/coordinator/cmd/{ => coordinator}/main.go (85%) create mode 100644 go/coordinator/cmd/logservice/cmd.go create mode 100644 go/coordinator/cmd/logservice/main.go rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/collection_service.go (99%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/collection_service_test.go (97%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/proto_model_convert.go (99%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/proto_model_convert_test.go (99%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/segment_service.go (99%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/server.go (90%) rename go/coordinator/internal/{grpccoordinator => coordinator/grpc}/tenant_database_service.go (99%) rename go/coordinator/internal/{grpccoordinator => }/grpcutils/config.go (100%) rename go/coordinator/internal/{grpccoordinator => }/grpcutils/config_test.go (100%) rename go/coordinator/internal/{grpccoordinator => }/grpcutils/service.go (100%) create mode 100644 go/coordinator/internal/logservice/apis.go create mode 100644 go/coordinator/internal/logservice/grpc/server.go create mode 100644 go/coordinator/internal/logservice/recordlog.go create mode 100644 go/coordinator/internal/metastore/db/dao/record_log.go create mode 100644 go/coordinator/internal/metastore/db/dbmodel/record_log.go create mode 100644 go/coordinator/internal/proto/logservicepb/logservice.pb.go create mode 100644 go/coordinator/internal/proto/logservicepb/logservice_grpc.pb.go delete mode 100644 go/coordinator/migrations/20231129183041.sql rename go/coordinator/migrations/{20231116210409.sql => 20240215010425.sql} (86%) create mode 100644 idl/chromadb/proto/logservice.proto create mode 100644 k8s/dev/logservice.yaml create mode 100644 k8s/dev/migration.yaml create mode 100644 k8s/dev/postgres.yaml diff --git a/.github/workflows/chroma-coordinator-test.yaml b/.github/workflows/chroma-coordinator-test.yaml index 629a9dfb146..e62ab2a5d0d 100644 --- a/.github/workflows/chroma-coordinator-test.yaml +++ b/.github/workflows/chroma-coordinator-test.yaml @@ -16,8 +16,25 @@ jobs: matrix: platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} + services: + postgres: + image: postgres + env: + POSTGRES_USER: chroma + POSTGRES_PASSWORD: chroma + POSTGRES_DB: chroma + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 steps: - name: Checkout uses: actions/checkout@v3 - name: Build and test run: cd go/coordinator && make test + env: + POSTGRES_HOST: localhost + POSTGRES_PORT: 5432 diff --git a/Tiltfile b/Tiltfile index 7be3d4ca594..f1fa96af2ec 100644 --- a/Tiltfile +++ b/Tiltfile @@ -1,3 +1,8 @@ +docker_build('migration', + context='.', + dockerfile='./go/coordinator/Dockerfile.migration' +) + docker_build('coordinator', context='.', dockerfile='./go/coordinator/Dockerfile' @@ -22,9 +27,15 @@ k8s_resource( ) k8s_yaml(['k8s/dev/pulsar.yaml']) k8s_resource('pulsar', resource_deps=['k8s_setup'], labels=["infrastructure"]) +k8s_yaml(['k8s/dev/postgres.yaml']) +k8s_resource('postgres', resource_deps=['k8s_setup'], labels=["infrastructure"]) +k8s_yaml(['k8s/dev/migration.yaml']) +k8s_resource('migration', resource_deps=['postgres'], labels=["chroma"]) k8s_yaml(['k8s/dev/server.yaml']) k8s_resource('server', resource_deps=['k8s_setup'],labels=["chroma"], port_forwards=8000 ) k8s_yaml(['k8s/dev/coordinator.yaml']) -k8s_resource('coordinator', resource_deps=['pulsar', 'server'], labels=["chroma"]) +k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"]) +k8s_yaml(['k8s/dev/logservice.yaml']) +k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"]) k8s_yaml(['k8s/dev/worker.yaml']) k8s_resource('worker', resource_deps=['coordinator'],labels=["chroma"]) diff --git a/bin/cluster-test.sh b/bin/cluster-test.sh index 10c48781c07..d18185b8c02 100755 --- a/bin/cluster-test.sh +++ b/bin/cluster-test.sh @@ -25,6 +25,7 @@ minikube addons enable ingress-dns -p chroma-test # Setup docker to build inside the minikube cluster and build the image eval $(minikube -p chroma-test docker-env) docker build -t server:latest -f Dockerfile . +docker build -t migration -f go/coordinator/Dockerfile.migration . docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile . docker build -t worker -f rust/worker/Dockerfile . --build-arg CHROMA_KUBERNETES_INTEGRATION=1 @@ -35,6 +36,8 @@ kubectl apply -f k8s/cr kubectl apply -f k8s/test # Wait for the pods in the chroma namespace to be ready +kubectl wait --for=condition=complete --timeout=100s job/migration -n chroma +kubectl delete job migration -n chroma kubectl wait --namespace chroma --for=condition=Ready pods --all --timeout=400s # Run mini kube tunnel in the background to expose the service diff --git a/chromadb/proto/chroma_pb2.py b/chromadb/proto/chroma_pb2.py index 84a3ba9b13d..bc8d43e57ec 100644 --- a/chromadb/proto/chroma_pb2.py +++ b/chromadb/proto/chroma_pb2.py @@ -13,7 +13,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x63hromadb/proto/chroma.proto\x12\x06\x63hroma\"&\n\x06Status\x12\x0e\n\x06reason\x18\x01 \x01(\t\x12\x0c\n\x04\x63ode\x18\x02 \x01(\x05\"0\n\x0e\x43hromaResponse\x12\x1e\n\x06status\x18\x01 \x01(\x0b\x32\x0e.chroma.Status\"U\n\x06Vector\x12\x11\n\tdimension\x18\x01 \x01(\x05\x12\x0e\n\x06vector\x18\x02 \x01(\x0c\x12(\n\x08\x65ncoding\x18\x03 \x01(\x0e\x32\x16.chroma.ScalarEncoding\"\xca\x01\n\x07Segment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12#\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScope\x12\x12\n\x05topic\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x01\x88\x01\x01\x12-\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x88\x01\x01\x42\x08\n\x06_topicB\r\n\x0b_collectionB\x0b\n\t_metadata\"\xb9\x01\n\nCollection\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12-\n\x08metadata\x18\x04 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x88\x01\x01\x12\x16\n\tdimension\x18\x05 \x01(\x05H\x01\x88\x01\x01\x12\x0e\n\x06tenant\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x0b\n\t_metadataB\x0c\n\n_dimension\"4\n\x08\x44\x61tabase\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06tenant\x18\x03 \x01(\t\"\x16\n\x06Tenant\x12\x0c\n\x04name\x18\x01 \x01(\t\"b\n\x13UpdateMetadataValue\x12\x16\n\x0cstring_value\x18\x01 \x01(\tH\x00\x12\x13\n\tint_value\x18\x02 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x01H\x00\x42\x07\n\x05value\"\x96\x01\n\x0eUpdateMetadata\x12\x36\n\x08metadata\x18\x01 \x03(\x0b\x32$.chroma.UpdateMetadata.MetadataEntry\x1aL\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0b\x32\x1b.chroma.UpdateMetadataValue:\x02\x38\x01\"\xcc\x01\n\x15SubmitEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12#\n\x06vector\x18\x02 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x01\x88\x01\x01\x12$\n\toperation\x18\x04 \x01(\x0e\x32\x11.chroma.Operation\x12\x15\n\rcollection_id\x18\x05 \x01(\tB\t\n\x07_vectorB\x0b\n\t_metadata\"S\n\x15VectorEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x1e\n\x06vector\x18\x03 \x01(\x0b\x32\x0e.chroma.Vector\"q\n\x11VectorQueryResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x10\n\x08\x64istance\x18\x03 \x01(\x01\x12#\n\x06vector\x18\x04 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x42\t\n\x07_vector\"@\n\x12VectorQueryResults\x12*\n\x07results\x18\x01 \x03(\x0b\x32\x19.chroma.VectorQueryResult\"(\n\x15SegmentServerResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\"4\n\x11GetVectorsRequest\x12\x0b\n\x03ids\x18\x01 \x03(\t\x12\x12\n\nsegment_id\x18\x02 \x01(\t\"D\n\x12GetVectorsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.VectorEmbeddingRecord\"\x86\x01\n\x13QueryVectorsRequest\x12\x1f\n\x07vectors\x18\x01 \x03(\x0b\x32\x0e.chroma.Vector\x12\t\n\x01k\x18\x02 \x01(\x05\x12\x13\n\x0b\x61llowed_ids\x18\x03 \x03(\t\x12\x1a\n\x12include_embeddings\x18\x04 \x01(\x08\x12\x12\n\nsegment_id\x18\x05 \x01(\t\"C\n\x14QueryVectorsResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.chroma.VectorQueryResults*8\n\tOperation\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\n\n\x06UPDATE\x10\x01\x12\n\n\x06UPSERT\x10\x02\x12\n\n\x06\x44\x45LETE\x10\x03*(\n\x0eScalarEncoding\x12\x0b\n\x07\x46LOAT32\x10\x00\x12\t\n\x05INT32\x10\x01*(\n\x0cSegmentScope\x12\n\n\x06VECTOR\x10\x00\x12\x0c\n\x08METADATA\x10\x01\x32\x94\x01\n\rSegmentServer\x12?\n\x0bLoadSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse\"\x00\x12\x42\n\x0eReleaseSegment\x12\x0f.chroma.Segment\x1a\x1d.chroma.SegmentServerResponse\"\x00\x32\xa2\x01\n\x0cVectorReader\x12\x45\n\nGetVectors\x12\x19.chroma.GetVectorsRequest\x1a\x1a.chroma.GetVectorsResponse\"\x00\x12K\n\x0cQueryVectors\x12\x1b.chroma.QueryVectorsRequest\x1a\x1c.chroma.QueryVectorsResponse\"\x00\x42\x43ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpbb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x63hromadb/proto/chroma.proto\x12\x06\x63hroma\"&\n\x06Status\x12\x0e\n\x06reason\x18\x01 \x01(\t\x12\x0c\n\x04\x63ode\x18\x02 \x01(\x05\"0\n\x0e\x43hromaResponse\x12\x1e\n\x06status\x18\x01 \x01(\x0b\x32\x0e.chroma.Status\"U\n\x06Vector\x12\x11\n\tdimension\x18\x01 \x01(\x05\x12\x0e\n\x06vector\x18\x02 \x01(\x0c\x12(\n\x08\x65ncoding\x18\x03 \x01(\x0e\x32\x16.chroma.ScalarEncoding\"\xca\x01\n\x07Segment\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\x12#\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScope\x12\x12\n\x05topic\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x01\x88\x01\x01\x12-\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x88\x01\x01\x42\x08\n\x06_topicB\r\n\x0b_collectionB\x0b\n\t_metadata\"\xb9\x01\n\nCollection\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05topic\x18\x03 \x01(\t\x12-\n\x08metadata\x18\x04 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x88\x01\x01\x12\x16\n\tdimension\x18\x05 \x01(\x05H\x01\x88\x01\x01\x12\x0e\n\x06tenant\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x0b\n\t_metadataB\x0c\n\n_dimension\"4\n\x08\x44\x61tabase\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06tenant\x18\x03 \x01(\t\"\x16\n\x06Tenant\x12\x0c\n\x04name\x18\x01 \x01(\t\"b\n\x13UpdateMetadataValue\x12\x16\n\x0cstring_value\x18\x01 \x01(\tH\x00\x12\x13\n\tint_value\x18\x02 \x01(\x03H\x00\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x01H\x00\x42\x07\n\x05value\"\x96\x01\n\x0eUpdateMetadata\x12\x36\n\x08metadata\x18\x01 \x03(\x0b\x32$.chroma.UpdateMetadata.MetadataEntry\x1aL\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0b\x32\x1b.chroma.UpdateMetadataValue:\x02\x38\x01\"\xcc\x01\n\x15SubmitEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12#\n\x06vector\x18\x02 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x01\x88\x01\x01\x12$\n\toperation\x18\x04 \x01(\x0e\x32\x11.chroma.Operation\x12\x15\n\rcollection_id\x18\x05 \x01(\tB\t\n\x07_vectorB\x0b\n\t_metadata\"S\n\x15VectorEmbeddingRecord\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x1e\n\x06vector\x18\x03 \x01(\x0b\x32\x0e.chroma.Vector\"q\n\x11VectorQueryResult\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06seq_id\x18\x02 \x01(\x0c\x12\x10\n\x08\x64istance\x18\x03 \x01(\x02\x12#\n\x06vector\x18\x04 \x01(\x0b\x32\x0e.chroma.VectorH\x00\x88\x01\x01\x42\t\n\x07_vector\"@\n\x12VectorQueryResults\x12*\n\x07results\x18\x01 \x03(\x0b\x32\x19.chroma.VectorQueryResult\"4\n\x11GetVectorsRequest\x12\x0b\n\x03ids\x18\x01 \x03(\t\x12\x12\n\nsegment_id\x18\x02 \x01(\t\"D\n\x12GetVectorsResponse\x12.\n\x07records\x18\x01 \x03(\x0b\x32\x1d.chroma.VectorEmbeddingRecord\"\x86\x01\n\x13QueryVectorsRequest\x12\x1f\n\x07vectors\x18\x01 \x03(\x0b\x32\x0e.chroma.Vector\x12\t\n\x01k\x18\x02 \x01(\x05\x12\x13\n\x0b\x61llowed_ids\x18\x03 \x03(\t\x12\x1a\n\x12include_embeddings\x18\x04 \x01(\x08\x12\x12\n\nsegment_id\x18\x05 \x01(\t\"C\n\x14QueryVectorsResponse\x12+\n\x07results\x18\x01 \x03(\x0b\x32\x1a.chroma.VectorQueryResults*8\n\tOperation\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\n\n\x06UPDATE\x10\x01\x12\n\n\x06UPSERT\x10\x02\x12\n\n\x06\x44\x45LETE\x10\x03*(\n\x0eScalarEncoding\x12\x0b\n\x07\x46LOAT32\x10\x00\x12\t\n\x05INT32\x10\x01*(\n\x0cSegmentScope\x12\n\n\x06VECTOR\x10\x00\x12\x0c\n\x08METADATA\x10\x01\x32\xa2\x01\n\x0cVectorReader\x12\x45\n\nGetVectors\x12\x19.chroma.GetVectorsRequest\x1a\x1a.chroma.GetVectorsResponse\"\x00\x12K\n\x0cQueryVectors\x12\x1b.chroma.QueryVectorsRequest\x1a\x1c.chroma.QueryVectorsResponse\"\x00\x42\x43ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpbb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,12 +23,12 @@ DESCRIPTOR._serialized_options = b'ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpb' _UPDATEMETADATA_METADATAENTRY._options = None _UPDATEMETADATA_METADATAENTRY._serialized_options = b'8\001' - _globals['_OPERATION']._serialized_start=1785 - _globals['_OPERATION']._serialized_end=1841 - _globals['_SCALARENCODING']._serialized_start=1843 - _globals['_SCALARENCODING']._serialized_end=1883 - _globals['_SEGMENTSCOPE']._serialized_start=1885 - _globals['_SEGMENTSCOPE']._serialized_end=1925 + _globals['_OPERATION']._serialized_start=1743 + _globals['_OPERATION']._serialized_end=1799 + _globals['_SCALARENCODING']._serialized_start=1801 + _globals['_SCALARENCODING']._serialized_end=1841 + _globals['_SEGMENTSCOPE']._serialized_start=1843 + _globals['_SEGMENTSCOPE']._serialized_end=1883 _globals['_STATUS']._serialized_start=39 _globals['_STATUS']._serialized_end=77 _globals['_CHROMARESPONSE']._serialized_start=79 @@ -57,18 +57,14 @@ _globals['_VECTORQUERYRESULT']._serialized_end=1345 _globals['_VECTORQUERYRESULTS']._serialized_start=1347 _globals['_VECTORQUERYRESULTS']._serialized_end=1411 - _globals['_SEGMENTSERVERRESPONSE']._serialized_start=1413 - _globals['_SEGMENTSERVERRESPONSE']._serialized_end=1453 - _globals['_GETVECTORSREQUEST']._serialized_start=1455 - _globals['_GETVECTORSREQUEST']._serialized_end=1507 - _globals['_GETVECTORSRESPONSE']._serialized_start=1509 - _globals['_GETVECTORSRESPONSE']._serialized_end=1577 - _globals['_QUERYVECTORSREQUEST']._serialized_start=1580 - _globals['_QUERYVECTORSREQUEST']._serialized_end=1714 - _globals['_QUERYVECTORSRESPONSE']._serialized_start=1716 - _globals['_QUERYVECTORSRESPONSE']._serialized_end=1783 - _globals['_SEGMENTSERVER']._serialized_start=1928 - _globals['_SEGMENTSERVER']._serialized_end=2076 - _globals['_VECTORREADER']._serialized_start=2079 - _globals['_VECTORREADER']._serialized_end=2241 + _globals['_GETVECTORSREQUEST']._serialized_start=1413 + _globals['_GETVECTORSREQUEST']._serialized_end=1465 + _globals['_GETVECTORSRESPONSE']._serialized_start=1467 + _globals['_GETVECTORSRESPONSE']._serialized_end=1535 + _globals['_QUERYVECTORSREQUEST']._serialized_start=1538 + _globals['_QUERYVECTORSREQUEST']._serialized_end=1672 + _globals['_QUERYVECTORSRESPONSE']._serialized_start=1674 + _globals['_QUERYVECTORSRESPONSE']._serialized_end=1741 + _globals['_VECTORREADER']._serialized_start=1886 + _globals['_VECTORREADER']._serialized_end=2048 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/coordinator_pb2.py b/chromadb/proto/coordinator_pb2.py index fda6a099867..888aece9285 100644 --- a/chromadb/proto/coordinator_pb2.py +++ b/chromadb/proto/coordinator_pb2.py @@ -15,7 +15,7 @@ from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n chromadb/proto/coordinator.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\x1a\x1bgoogle/protobuf/empty.proto\"A\n\x15\x43reateDatabaseRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06tenant\x18\x03 \x01(\t\"2\n\x12GetDatabaseRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06tenant\x18\x02 \x01(\t\"Y\n\x13GetDatabaseResponse\x12\"\n\x08\x64\x61tabase\x18\x01 \x01(\x0b\x32\x10.chroma.Database\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"#\n\x13\x43reateTenantRequest\x12\x0c\n\x04name\x18\x02 \x01(\t\" \n\x10GetTenantRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"S\n\x11GetTenantResponse\x12\x1e\n\x06tenant\x18\x01 \x01(\x0b\x32\x0e.chroma.Tenant\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"8\n\x14\x43reateSegmentRequest\x12 \n\x07segment\x18\x01 \x01(\x0b\x32\x0f.chroma.Segment\"\"\n\x14\x44\x65leteSegmentRequest\x12\n\n\x02id\x18\x01 \x01(\t\"\xc2\x01\n\x12GetSegmentsRequest\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x11\n\x04type\x18\x02 \x01(\tH\x01\x88\x01\x01\x12(\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScopeH\x02\x88\x01\x01\x12\x12\n\x05topic\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x05\n\x03_idB\x07\n\x05_typeB\x08\n\x06_scopeB\x08\n\x06_topicB\r\n\x0b_collection\"X\n\x13GetSegmentsResponse\x12!\n\x08segments\x18\x01 \x03(\x0b\x32\x0f.chroma.Segment\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"\xfa\x01\n\x14UpdateSegmentRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x05topic\x18\x02 \x01(\tH\x00\x12\x15\n\x0breset_topic\x18\x03 \x01(\x08H\x00\x12\x14\n\ncollection\x18\x04 \x01(\tH\x01\x12\x1a\n\x10reset_collection\x18\x05 \x01(\x08H\x01\x12*\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x12\x18\n\x0ereset_metadata\x18\x07 \x01(\x08H\x02\x42\x0e\n\x0ctopic_updateB\x13\n\x11\x63ollection_updateB\x11\n\x0fmetadata_update\"\xe5\x01\n\x17\x43reateCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x88\x01\x01\x12\x16\n\tdimension\x18\x04 \x01(\x05H\x01\x88\x01\x01\x12\x1a\n\rget_or_create\x18\x05 \x01(\x08H\x02\x88\x01\x01\x12\x0e\n\x06tenant\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x0b\n\t_metadataB\x0c\n\n_dimensionB\x10\n\x0e_get_or_create\"s\n\x18\x43reateCollectionResponse\x12&\n\ncollection\x18\x01 \x01(\x0b\x32\x12.chroma.Collection\x12\x0f\n\x07\x63reated\x18\x02 \x01(\x08\x12\x1e\n\x06status\x18\x03 \x01(\x0b\x32\x0e.chroma.Status\"G\n\x17\x44\x65leteCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06tenant\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\"\x8b\x01\n\x15GetCollectionsRequest\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x12\n\x05topic\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x0e\n\x06tenant\x18\x04 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x05 \x01(\tB\x05\n\x03_idB\x07\n\x05_nameB\x08\n\x06_topic\"a\n\x16GetCollectionsResponse\x12\'\n\x0b\x63ollections\x18\x01 \x03(\x0b\x32\x12.chroma.Collection\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"\xde\x01\n\x17UpdateCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\x05topic\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tdimension\x18\x04 \x01(\x05H\x03\x88\x01\x01\x12*\n\x08metadata\x18\x05 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x12\x18\n\x0ereset_metadata\x18\x06 \x01(\x08H\x00\x42\x11\n\x0fmetadata_updateB\x08\n\x06_topicB\x07\n\x05_nameB\x0c\n\n_dimension2\xd6\x07\n\x05SysDB\x12I\n\x0e\x43reateDatabase\x12\x1d.chroma.CreateDatabaseRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12H\n\x0bGetDatabase\x12\x1a.chroma.GetDatabaseRequest\x1a\x1b.chroma.GetDatabaseResponse\"\x00\x12\x45\n\x0c\x43reateTenant\x12\x1b.chroma.CreateTenantRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12\x42\n\tGetTenant\x12\x18.chroma.GetTenantRequest\x1a\x19.chroma.GetTenantResponse\"\x00\x12G\n\rCreateSegment\x12\x1c.chroma.CreateSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12G\n\rDeleteSegment\x12\x1c.chroma.DeleteSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12H\n\x0bGetSegments\x12\x1a.chroma.GetSegmentsRequest\x1a\x1b.chroma.GetSegmentsResponse\"\x00\x12G\n\rUpdateSegment\x12\x1c.chroma.UpdateSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12W\n\x10\x43reateCollection\x12\x1f.chroma.CreateCollectionRequest\x1a .chroma.CreateCollectionResponse\"\x00\x12M\n\x10\x44\x65leteCollection\x12\x1f.chroma.DeleteCollectionRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12Q\n\x0eGetCollections\x12\x1d.chroma.GetCollectionsRequest\x1a\x1e.chroma.GetCollectionsResponse\"\x00\x12M\n\x10UpdateCollection\x12\x1f.chroma.UpdateCollectionRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12>\n\nResetState\x12\x16.google.protobuf.Empty\x1a\x16.chroma.ChromaResponse\"\x00\x42\x43ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpbb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n chromadb/proto/coordinator.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\x1a\x1bgoogle/protobuf/empty.proto\"A\n\x15\x43reateDatabaseRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06tenant\x18\x03 \x01(\t\"2\n\x12GetDatabaseRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06tenant\x18\x02 \x01(\t\"Y\n\x13GetDatabaseResponse\x12\"\n\x08\x64\x61tabase\x18\x01 \x01(\x0b\x32\x10.chroma.Database\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"#\n\x13\x43reateTenantRequest\x12\x0c\n\x04name\x18\x02 \x01(\t\" \n\x10GetTenantRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"S\n\x11GetTenantResponse\x12\x1e\n\x06tenant\x18\x01 \x01(\x0b\x32\x0e.chroma.Tenant\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"8\n\x14\x43reateSegmentRequest\x12 \n\x07segment\x18\x01 \x01(\x0b\x32\x0f.chroma.Segment\"\"\n\x14\x44\x65leteSegmentRequest\x12\n\n\x02id\x18\x01 \x01(\t\"\xc2\x01\n\x12GetSegmentsRequest\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x11\n\x04type\x18\x02 \x01(\tH\x01\x88\x01\x01\x12(\n\x05scope\x18\x03 \x01(\x0e\x32\x14.chroma.SegmentScopeH\x02\x88\x01\x01\x12\x12\n\x05topic\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\ncollection\x18\x05 \x01(\tH\x04\x88\x01\x01\x42\x05\n\x03_idB\x07\n\x05_typeB\x08\n\x06_scopeB\x08\n\x06_topicB\r\n\x0b_collection\"X\n\x13GetSegmentsResponse\x12!\n\x08segments\x18\x01 \x03(\x0b\x32\x0f.chroma.Segment\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"\xfa\x01\n\x14UpdateSegmentRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x05topic\x18\x02 \x01(\tH\x00\x12\x15\n\x0breset_topic\x18\x03 \x01(\x08H\x00\x12\x14\n\ncollection\x18\x04 \x01(\tH\x01\x12\x1a\n\x10reset_collection\x18\x05 \x01(\x08H\x01\x12*\n\x08metadata\x18\x06 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x02\x12\x18\n\x0ereset_metadata\x18\x07 \x01(\x08H\x02\x42\x0e\n\x0ctopic_updateB\x13\n\x11\x63ollection_updateB\x11\n\x0fmetadata_update\"\xe5\x01\n\x17\x43reateCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12-\n\x08metadata\x18\x03 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x88\x01\x01\x12\x16\n\tdimension\x18\x04 \x01(\x05H\x01\x88\x01\x01\x12\x1a\n\rget_or_create\x18\x05 \x01(\x08H\x02\x88\x01\x01\x12\x0e\n\x06tenant\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x0b\n\t_metadataB\x0c\n\n_dimensionB\x10\n\x0e_get_or_create\"s\n\x18\x43reateCollectionResponse\x12&\n\ncollection\x18\x01 \x01(\x0b\x32\x12.chroma.Collection\x12\x0f\n\x07\x63reated\x18\x02 \x01(\x08\x12\x1e\n\x06status\x18\x03 \x01(\x0b\x32\x0e.chroma.Status\"G\n\x17\x44\x65leteCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0e\n\x06tenant\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\"\x8b\x01\n\x15GetCollectionsRequest\x12\x0f\n\x02id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x11\n\x04name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x12\n\x05topic\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x0e\n\x06tenant\x18\x04 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x05 \x01(\tB\x05\n\x03_idB\x07\n\x05_nameB\x08\n\x06_topic\"a\n\x16GetCollectionsResponse\x12\'\n\x0b\x63ollections\x18\x01 \x03(\x0b\x32\x12.chroma.Collection\x12\x1e\n\x06status\x18\x02 \x01(\x0b\x32\x0e.chroma.Status\"\xde\x01\n\x17UpdateCollectionRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x12\n\x05topic\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x11\n\x04name\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x16\n\tdimension\x18\x04 \x01(\x05H\x03\x88\x01\x01\x12*\n\x08metadata\x18\x05 \x01(\x0b\x32\x16.chroma.UpdateMetadataH\x00\x12\x18\n\x0ereset_metadata\x18\x06 \x01(\x08H\x00\x42\x11\n\x0fmetadata_updateB\x08\n\x06_topicB\x07\n\x05_nameB\x0c\n\n_dimension\"O\n\x0cNotification\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x15\n\rcollection_id\x18\x02 \x01(\t\x12\x0c\n\x04type\x18\x03 \x01(\t\x12\x0e\n\x06status\x18\x04 \x01(\t2\xd6\x07\n\x05SysDB\x12I\n\x0e\x43reateDatabase\x12\x1d.chroma.CreateDatabaseRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12H\n\x0bGetDatabase\x12\x1a.chroma.GetDatabaseRequest\x1a\x1b.chroma.GetDatabaseResponse\"\x00\x12\x45\n\x0c\x43reateTenant\x12\x1b.chroma.CreateTenantRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12\x42\n\tGetTenant\x12\x18.chroma.GetTenantRequest\x1a\x19.chroma.GetTenantResponse\"\x00\x12G\n\rCreateSegment\x12\x1c.chroma.CreateSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12G\n\rDeleteSegment\x12\x1c.chroma.DeleteSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12H\n\x0bGetSegments\x12\x1a.chroma.GetSegmentsRequest\x1a\x1b.chroma.GetSegmentsResponse\"\x00\x12G\n\rUpdateSegment\x12\x1c.chroma.UpdateSegmentRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12W\n\x10\x43reateCollection\x12\x1f.chroma.CreateCollectionRequest\x1a .chroma.CreateCollectionResponse\"\x00\x12M\n\x10\x44\x65leteCollection\x12\x1f.chroma.DeleteCollectionRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12Q\n\x0eGetCollections\x12\x1d.chroma.GetCollectionsRequest\x1a\x1e.chroma.GetCollectionsResponse\"\x00\x12M\n\x10UpdateCollection\x12\x1f.chroma.UpdateCollectionRequest\x1a\x16.chroma.ChromaResponse\"\x00\x12>\n\nResetState\x12\x16.google.protobuf.Empty\x1a\x16.chroma.ChromaResponse\"\x00\x42\x43ZAgithub.com/chroma/chroma-coordinator/internal/proto/coordinatorpbb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -57,6 +57,8 @@ _globals['_GETCOLLECTIONSRESPONSE']._serialized_end=1763 _globals['_UPDATECOLLECTIONREQUEST']._serialized_start=1766 _globals['_UPDATECOLLECTIONREQUEST']._serialized_end=1988 - _globals['_SYSDB']._serialized_start=1991 - _globals['_SYSDB']._serialized_end=2973 + _globals['_NOTIFICATION']._serialized_start=1990 + _globals['_NOTIFICATION']._serialized_end=2069 + _globals['_SYSDB']._serialized_start=2072 + _globals['_SYSDB']._serialized_end=3054 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/coordinator_pb2.pyi b/chromadb/proto/coordinator_pb2.pyi index 81545e4e283..ec926340cdf 100644 --- a/chromadb/proto/coordinator_pb2.pyi +++ b/chromadb/proto/coordinator_pb2.pyi @@ -180,3 +180,15 @@ class UpdateCollectionRequest(_message.Message): metadata: _chroma_pb2.UpdateMetadata reset_metadata: bool def __init__(self, id: _Optional[str] = ..., topic: _Optional[str] = ..., name: _Optional[str] = ..., dimension: _Optional[int] = ..., metadata: _Optional[_Union[_chroma_pb2.UpdateMetadata, _Mapping]] = ..., reset_metadata: bool = ...) -> None: ... + +class Notification(_message.Message): + __slots__ = ["id", "collection_id", "type", "status"] + ID_FIELD_NUMBER: _ClassVar[int] + COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] + TYPE_FIELD_NUMBER: _ClassVar[int] + STATUS_FIELD_NUMBER: _ClassVar[int] + id: int + collection_id: str + type: str + status: str + def __init__(self, id: _Optional[int] = ..., collection_id: _Optional[str] = ..., type: _Optional[str] = ..., status: _Optional[str] = ...) -> None: ... diff --git a/chromadb/proto/logservice_pb2.py b/chromadb/proto/logservice_pb2.py new file mode 100644 index 00000000000..f7dd81efc1b --- /dev/null +++ b/chromadb/proto/logservice_pb2.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: chromadb/proto/logservice.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b"\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma2\x0c\n\nLogServiceBBZ@github.com/chroma/chroma-coordinator/internal/proto/logservicepbb\x06proto3" +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "chromadb.proto.logservice_pb2", _globals +) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = ( + b"Z@github.com/chroma/chroma-coordinator/internal/proto/logservicepb" + ) + _globals["_LOGSERVICE"]._serialized_start = 43 + _globals["_LOGSERVICE"]._serialized_end = 55 +# @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/logservice_pb2.pyi b/chromadb/proto/logservice_pb2.pyi new file mode 100644 index 00000000000..869ab9d2d1e --- /dev/null +++ b/chromadb/proto/logservice_pb2.pyi @@ -0,0 +1,4 @@ +from google.protobuf import descriptor as _descriptor +from typing import ClassVar as _ClassVar + +DESCRIPTOR: _descriptor.FileDescriptor diff --git a/chromadb/proto/logservice_pb2_grpc.py b/chromadb/proto/logservice_pb2_grpc.py new file mode 100644 index 00000000000..d98303113da --- /dev/null +++ b/chromadb/proto/logservice_pb2_grpc.py @@ -0,0 +1,31 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + + +class LogServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + + +class LogServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + +def add_LogServiceServicer_to_server(servicer, server): + rpc_method_handlers = {} + generic_handler = grpc.method_handlers_generic_handler( + "chroma.LogService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class LogService(object): + """Missing associated documentation comment in .proto file.""" diff --git a/go/coordinator/Dockerfile b/go/coordinator/Dockerfile index a86f5cc258f..554da75f93a 100644 --- a/go/coordinator/Dockerfile +++ b/go/coordinator/Dockerfile @@ -23,9 +23,8 @@ RUN apk add \ RUN mkdir /chroma-coordinator WORKDIR /chroma-coordinator -COPY --from=build /src/chroma-coordinator/bin/chroma /chroma-coordinator/bin/chroma +COPY --from=build /src/chroma-coordinator/bin/coordinator /chroma-coordinator/bin/coordinator +COPY --from=build /src/chroma-coordinator/bin/logservice /chroma-coordinator/bin/logservice ENV PATH=$PATH:/chroma-coordinator/bin -COPY --from=build /src/chroma-coordinator/migrations /chroma-coordinator/migrations - CMD /bin/bash diff --git a/go/coordinator/Dockerfile.migration b/go/coordinator/Dockerfile.migration new file mode 100644 index 00000000000..092f2629540 --- /dev/null +++ b/go/coordinator/Dockerfile.migration @@ -0,0 +1,4 @@ +FROM arigaio/atlas:latest +workdir /app +COPY ./go/coordinator/migrations migrations +COPY ./go/coordinator/atlas.hcl atlas.hcl diff --git a/go/coordinator/Makefile b/go/coordinator/Makefile index 8fb52e4bb74..f1a440e4744 100644 --- a/go/coordinator/Makefile +++ b/go/coordinator/Makefile @@ -1,6 +1,7 @@ .PHONY: build build: - go build -v -o bin/chroma ./cmd + go build -v -o bin/coordinator ./cmd/coordinator/ + go build -v -o bin/logservice ./cmd/logservice/ test: build go test -cover -race ./... diff --git a/go/coordinator/atlas.hcl b/go/coordinator/atlas.hcl index 2883c58d65e..f2c17f57c19 100644 --- a/go/coordinator/atlas.hcl +++ b/go/coordinator/atlas.hcl @@ -10,9 +10,9 @@ data "external_schema" "gorm" { ] } -env "gorm" { +env "dev" { src = data.external_schema.gorm.url - dev = "postgres://localhost:5432/dev?sslmode=disable" + dev = "postgres://localhost:5432/chroma?sslmode=disable" migration { dir = "file://migrations" } diff --git a/go/coordinator/cmd/grpccoordinator/cmd.go b/go/coordinator/cmd/coordinator/cmd.go similarity index 64% rename from go/coordinator/cmd/grpccoordinator/cmd.go rename to go/coordinator/cmd/coordinator/cmd.go index 8859790b56c..a1dadfc5cdc 100644 --- a/go/coordinator/cmd/grpccoordinator/cmd.go +++ b/go/coordinator/cmd/coordinator/cmd.go @@ -1,18 +1,18 @@ -package grpccoordinator +package main import ( + "github.com/chroma/chroma-coordinator/internal/coordinator/grpc" + "github.com/chroma/chroma-coordinator/internal/grpcutils" "io" "time" "github.com/chroma/chroma-coordinator/cmd/flag" - "github.com/chroma/chroma-coordinator/internal/grpccoordinator" - "github.com/chroma/chroma-coordinator/internal/grpccoordinator/grpcutils" "github.com/chroma/chroma-coordinator/internal/utils" "github.com/spf13/cobra" ) var ( - conf = grpccoordinator.Config{ + conf = grpc.Config{ GrpcConfig: &grpcutils.GrpcConfig{}, } @@ -30,14 +30,15 @@ func init() { flag.GRPCAddr(Cmd, &conf.GrpcConfig.BindAddress) // System Catalog - Cmd.Flags().StringVar(&conf.SystemCatalogProvider, "system-catalog-provider", "memory", "System catalog provider") - Cmd.Flags().StringVar(&conf.Username, "username", "root", "MetaTable username") - Cmd.Flags().StringVar(&conf.Password, "password", "", "MetaTable password") - Cmd.Flags().StringVar(&conf.Address, "db-address", "127.0.0.1", "MetaTable db address") - Cmd.Flags().IntVar(&conf.Port, "db-port", 5432, "MetaTable db port") - Cmd.Flags().StringVar(&conf.DBName, "db-name", "", "MetaTable db name") - Cmd.Flags().IntVar(&conf.MaxIdleConns, "max-idle-conns", 10, "MetaTable max idle connections") - Cmd.Flags().IntVar(&conf.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections") + Cmd.Flags().StringVar(&conf.SystemCatalogProvider, "system-catalog-provider", "database", "System catalog provider") + Cmd.Flags().StringVar(&conf.DBConfig.Username, "username", "chroma", "MetaTable username") + Cmd.Flags().StringVar(&conf.DBConfig.Password, "password", "chroma", "MetaTable password") + Cmd.Flags().StringVar(&conf.DBConfig.Address, "db-address", "postgres", "MetaTable db address") + Cmd.Flags().IntVar(&conf.DBConfig.Port, "db-port", 5432, "MetaTable db port") + Cmd.Flags().StringVar(&conf.DBConfig.DBName, "db-name", "chroma", "MetaTable db name") + Cmd.Flags().IntVar(&conf.DBConfig.MaxIdleConns, "max-idle-conns", 10, "MetaTable max idle connections") + Cmd.Flags().IntVar(&conf.DBConfig.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections") + Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection") // Pulsar Cmd.Flags().StringVar(&conf.PulsarAdminURL, "pulsar-admin-url", "http://localhost:8080", "Pulsar admin url") @@ -59,6 +60,6 @@ func init() { func exec(*cobra.Command, []string) { utils.RunProcess(func() (io.Closer, error) { - return grpccoordinator.New(conf) + return grpc.New(conf) }) } diff --git a/go/coordinator/cmd/main.go b/go/coordinator/cmd/coordinator/main.go similarity index 85% rename from go/coordinator/cmd/main.go rename to go/coordinator/cmd/coordinator/main.go index 0b7cfa7b54d..bfa31c8c9be 100644 --- a/go/coordinator/cmd/main.go +++ b/go/coordinator/cmd/coordinator/main.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/chroma/chroma-coordinator/cmd/grpccoordinator" "github.com/chroma/chroma-coordinator/internal/utils" "github.com/rs/zerolog" "github.com/spf13/cobra" @@ -20,7 +19,7 @@ var ( ) func init() { - rootCmd.AddCommand(grpccoordinator.Cmd) + rootCmd.AddCommand(Cmd) } func main() { diff --git a/go/coordinator/cmd/logservice/cmd.go b/go/coordinator/cmd/logservice/cmd.go new file mode 100644 index 00000000000..721067bb3b2 --- /dev/null +++ b/go/coordinator/cmd/logservice/cmd.go @@ -0,0 +1,46 @@ +package main + +import ( + "github.com/chroma/chroma-coordinator/cmd/flag" + "github.com/chroma/chroma-coordinator/internal/grpcutils" + "github.com/chroma/chroma-coordinator/internal/logservice/grpc" + "github.com/chroma/chroma-coordinator/internal/utils" + "github.com/spf13/cobra" + "io" +) + +var ( + conf = grpc.Config{ + GrpcConfig: &grpcutils.GrpcConfig{}, + } + + Cmd = &cobra.Command{ + Use: "logservice", + Short: "Start a logservice service", + Long: `RecordLog root command`, + Run: exec, + } +) + +func init() { + // GRPC + flag.GRPCAddr(Cmd, &conf.GrpcConfig.BindAddress) + Cmd.Flags().BoolVar(&conf.StartGrpc, "start-grpc", true, "start grpc server or not") + + // DB provider + Cmd.Flags().StringVar(&conf.DBProvider, "db-provider", "postgres", "DB provider") + + // DB dev + Cmd.Flags().StringVar(&conf.DBConfig.Address, "db-host", "postgres", "DB host") + Cmd.Flags().IntVar(&conf.DBConfig.Port, "db-port", 5432, "DB port") + Cmd.Flags().StringVar(&conf.DBConfig.Username, "db-user", "chroma", "DB user") + Cmd.Flags().StringVar(&conf.DBConfig.Password, "db-password", "chroma", "DB password") + Cmd.Flags().StringVar(&conf.DBConfig.DBName, "db-name", "chroma", "DB name") + Cmd.Flags().StringVar(&conf.DBConfig.SslMode, "ssl-mode", "disable", "SSL mode for database connection") +} + +func exec(*cobra.Command, []string) { + utils.RunProcess(func() (io.Closer, error) { + return grpc.New(conf) + }) +} diff --git a/go/coordinator/cmd/logservice/main.go b/go/coordinator/cmd/logservice/main.go new file mode 100644 index 00000000000..d88c70ec61e --- /dev/null +++ b/go/coordinator/cmd/logservice/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + "os" + + "github.com/chroma/chroma-coordinator/internal/utils" + "github.com/rs/zerolog" + "github.com/spf13/cobra" + "go.uber.org/automaxprocs/maxprocs" +) + +var ( + rootCmd = &cobra.Command{ + Use: "logservice", + Short: "RecordLog root command", + Long: `RecordLog root command`, + } +) + +func init() { + rootCmd.AddCommand(Cmd) +} + +func main() { + utils.LogLevel = zerolog.DebugLevel + utils.ConfigureLogger() + if _, err := maxprocs.Set(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + if err := rootCmd.Execute(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/go/coordinator/go.sum b/go/coordinator/go.sum index 15390626451..1977a366523 100644 --- a/go/coordinator/go.sum +++ b/go/coordinator/go.sum @@ -12,6 +12,8 @@ github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGf github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/alecthomas/kong v0.7.1 h1:azoTh0IOfwlAX3qN9sHWTxACE2oV8Bg2gAwBsMwDQY4= +github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -344,6 +346,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= +golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/go/coordinator/internal/grpccoordinator/collection_service.go b/go/coordinator/internal/coordinator/grpc/collection_service.go similarity index 99% rename from go/coordinator/internal/grpccoordinator/collection_service.go rename to go/coordinator/internal/coordinator/grpc/collection_service.go index faaf6b4dbf9..9276f140107 100644 --- a/go/coordinator/internal/grpccoordinator/collection_service.go +++ b/go/coordinator/internal/coordinator/grpc/collection_service.go @@ -1,4 +1,4 @@ -package grpccoordinator +package grpc import ( "context" diff --git a/go/coordinator/internal/grpccoordinator/collection_service_test.go b/go/coordinator/internal/coordinator/grpc/collection_service_test.go similarity index 97% rename from go/coordinator/internal/grpccoordinator/collection_service_test.go rename to go/coordinator/internal/coordinator/grpc/collection_service_test.go index 390b08f7607..c4f02a0682c 100644 --- a/go/coordinator/internal/grpccoordinator/collection_service_test.go +++ b/go/coordinator/internal/coordinator/grpc/collection_service_test.go @@ -1,11 +1,11 @@ -package grpccoordinator +package grpc import ( "context" + "github.com/chroma/chroma-coordinator/internal/grpcutils" "testing" "github.com/chroma/chroma-coordinator/internal/common" - "github.com/chroma/chroma-coordinator/internal/grpccoordinator/grpcutils" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" "pgregory.net/rapid" diff --git a/go/coordinator/internal/grpccoordinator/proto_model_convert.go b/go/coordinator/internal/coordinator/grpc/proto_model_convert.go similarity index 99% rename from go/coordinator/internal/grpccoordinator/proto_model_convert.go rename to go/coordinator/internal/coordinator/grpc/proto_model_convert.go index 18c4fd307ab..9b47f1f33ce 100644 --- a/go/coordinator/internal/grpccoordinator/proto_model_convert.go +++ b/go/coordinator/internal/coordinator/grpc/proto_model_convert.go @@ -1,4 +1,4 @@ -package grpccoordinator +package grpc import ( "github.com/chroma/chroma-coordinator/internal/common" diff --git a/go/coordinator/internal/grpccoordinator/proto_model_convert_test.go b/go/coordinator/internal/coordinator/grpc/proto_model_convert_test.go similarity index 99% rename from go/coordinator/internal/grpccoordinator/proto_model_convert_test.go rename to go/coordinator/internal/coordinator/grpc/proto_model_convert_test.go index 9cfa2f0632f..2586151d3c7 100644 --- a/go/coordinator/internal/grpccoordinator/proto_model_convert_test.go +++ b/go/coordinator/internal/coordinator/grpc/proto_model_convert_test.go @@ -1,4 +1,4 @@ -package grpccoordinator +package grpc import ( "testing" diff --git a/go/coordinator/internal/grpccoordinator/segment_service.go b/go/coordinator/internal/coordinator/grpc/segment_service.go similarity index 99% rename from go/coordinator/internal/grpccoordinator/segment_service.go rename to go/coordinator/internal/coordinator/grpc/segment_service.go index b2d3be5e4ff..6e63e384ef1 100644 --- a/go/coordinator/internal/grpccoordinator/segment_service.go +++ b/go/coordinator/internal/coordinator/grpc/segment_service.go @@ -1,4 +1,4 @@ -package grpccoordinator +package grpc import ( "context" diff --git a/go/coordinator/internal/grpccoordinator/server.go b/go/coordinator/internal/coordinator/grpc/server.go similarity index 90% rename from go/coordinator/internal/grpccoordinator/server.go rename to go/coordinator/internal/coordinator/grpc/server.go index 4205a47153b..578298719a7 100644 --- a/go/coordinator/internal/grpccoordinator/server.go +++ b/go/coordinator/internal/coordinator/grpc/server.go @@ -1,13 +1,13 @@ -package grpccoordinator +package grpc import ( "context" "errors" + "github.com/chroma/chroma-coordinator/internal/grpcutils" "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/chroma/chroma-coordinator/internal/coordinator" - "github.com/chroma/chroma-coordinator/internal/grpccoordinator/grpcutils" "github.com/chroma/chroma-coordinator/internal/memberlist_manager" "github.com/chroma/chroma-coordinator/internal/metastore/db/dao" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" @@ -29,13 +29,7 @@ type Config struct { SystemCatalogProvider string // MetaTable config - Username string - Password string - Address string - Port int - DBName string - MaxIdleConns int - MaxOpenConns int + DBConfig dbcore.DBConfig // Notification config NotificationStoreProvider string @@ -77,16 +71,8 @@ func New(config Config) (*Server, error) { if config.SystemCatalogProvider == "memory" { return NewWithGrpcProvider(config, grpcutils.Default, nil) } else if config.SystemCatalogProvider == "database" { - dBConfig := dbcore.DBConfig{ - Username: config.Username, - Password: config.Password, - Address: config.Address, - Port: config.Port, - DBName: config.DBName, - MaxIdleConns: config.MaxIdleConns, - MaxOpenConns: config.MaxOpenConns, - } - db, err := dbcore.Connect(dBConfig) + dBConfig := config.DBConfig + db, err := dbcore.ConnectPostgres(dBConfig) if err != nil { return nil, err } @@ -175,7 +161,7 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor return nil, err } - s.grpcServer, err = provider.StartGrpcServer("coordinator", config.GrpcConfig, func(registrar grpc.ServiceRegistrar) { + s.grpcServer, err = provider.StartGrpcServer("coordinator", config.GrpcConfig, func(registrar grpc.ServiceRegistrar) { coordinatorpb.RegisterSysDBServer(registrar, s) }) if err != nil { diff --git a/go/coordinator/internal/grpccoordinator/tenant_database_service.go b/go/coordinator/internal/coordinator/grpc/tenant_database_service.go similarity index 99% rename from go/coordinator/internal/grpccoordinator/tenant_database_service.go rename to go/coordinator/internal/coordinator/grpc/tenant_database_service.go index eb36b3de949..5ec1045c5ec 100644 --- a/go/coordinator/internal/grpccoordinator/tenant_database_service.go +++ b/go/coordinator/internal/coordinator/grpc/tenant_database_service.go @@ -1,4 +1,4 @@ -package grpccoordinator +package grpc import ( "context" diff --git a/go/coordinator/internal/grpccoordinator/grpcutils/config.go b/go/coordinator/internal/grpcutils/config.go similarity index 100% rename from go/coordinator/internal/grpccoordinator/grpcutils/config.go rename to go/coordinator/internal/grpcutils/config.go diff --git a/go/coordinator/internal/grpccoordinator/grpcutils/config_test.go b/go/coordinator/internal/grpcutils/config_test.go similarity index 100% rename from go/coordinator/internal/grpccoordinator/grpcutils/config_test.go rename to go/coordinator/internal/grpcutils/config_test.go diff --git a/go/coordinator/internal/grpccoordinator/grpcutils/service.go b/go/coordinator/internal/grpcutils/service.go similarity index 100% rename from go/coordinator/internal/grpccoordinator/grpcutils/service.go rename to go/coordinator/internal/grpcutils/service.go diff --git a/go/coordinator/internal/logservice/apis.go b/go/coordinator/internal/logservice/apis.go new file mode 100644 index 00000000000..2eba78b20f6 --- /dev/null +++ b/go/coordinator/internal/logservice/apis.go @@ -0,0 +1,11 @@ +package logservice + +import ( + "github.com/chroma/chroma-coordinator/internal/common" +) + +type ( + IRecordLog interface { + common.Component + } +) diff --git a/go/coordinator/internal/logservice/grpc/server.go b/go/coordinator/internal/logservice/grpc/server.go new file mode 100644 index 00000000000..e3fb1980f78 --- /dev/null +++ b/go/coordinator/internal/logservice/grpc/server.go @@ -0,0 +1,104 @@ +package grpc + +import ( + "context" + "errors" + "github.com/chroma/chroma-coordinator/internal/grpcutils" + "github.com/chroma/chroma-coordinator/internal/logservice" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbcore" + "github.com/chroma/chroma-coordinator/internal/proto/logservicepb" + "github.com/pingcap/log" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" +) + +type Config struct { + // GrpcConfig config + GrpcConfig *grpcutils.GrpcConfig + + // System catalog provider + DBProvider string + + // Postgres config + DBConfig dbcore.DBConfig + + // whether to start grpc service + StartGrpc bool +} + +type Server struct { + logservicepb.UnimplementedLogServiceServer + logService logservice.IRecordLog + grpcServer grpcutils.GrpcServer + healthServer *health.Server +} + +func New(config Config) (*Server, error) { + log.Info("New Log Service...") + + if config.DBProvider == "postgres" { + dBConfig := config.DBConfig + _, err := dbcore.ConnectPostgres(dBConfig) + if err != nil { + log.Error("Error connecting to Postgres DB.", zap.Error(err)) + panic(err) + } + } else { + log.Error("invalid DB provider, only postgres is supported") + return nil, errors.New("invalid DB provider, only postgres is supported") + } + + s := startLogService() + if config.StartGrpc { + s.grpcServer = startGrpcService(s, config.GrpcConfig) + } + + log.Info("New Log Service Completed.") + return s, nil +} + +func startLogService() *Server { + log.Info("Staring Log Service...") + ctx := context.Background() + s := &Server{ + healthServer: health.NewServer(), + } + + logService, err := logservice.NewLogService(ctx) + if err != nil { + log.Error("Error creating Log Service.", zap.Error(err)) + panic(err) + } + s.logService = logService + err = s.logService.Start() + if err != nil { + log.Error("Error starting Log Service.", zap.Error(err)) + panic(err) + } + log.Info("Log Service Started.") + return s +} + +func startGrpcService(s *Server, grpcConfig *grpcutils.GrpcConfig) grpcutils.GrpcServer { + log.Info("Staring Grpc Service...") + server, err := grpcutils.Default.StartGrpcServer("logservice", grpcConfig, func(registrar grpc.ServiceRegistrar) { + logservicepb.RegisterLogServiceServer(registrar, s) + }) + if err != nil { + log.Error("Error starting grpc Service.", zap.Error(err)) + panic(err) + } + return server +} + +func (s *Server) Close() error { + s.healthServer.Shutdown() + err := s.logService.Stop() + if err != nil { + log.Error("Failed to stop log service", zap.Error(err)) + return err + } + log.Info("Server closed") + return nil +} diff --git a/go/coordinator/internal/logservice/recordlog.go b/go/coordinator/internal/logservice/recordlog.go new file mode 100644 index 00000000000..78729128de6 --- /dev/null +++ b/go/coordinator/internal/logservice/recordlog.go @@ -0,0 +1,33 @@ +package logservice + +import ( + "context" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dao" + "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" + "github.com/pingcap/log" +) + +var _ IRecordLog = (*RecordLog)(nil) + +type RecordLog struct { + ctx context.Context + recordLogDb dbmodel.IRecordLogDb +} + +func NewLogService(ctx context.Context) (*RecordLog, error) { + s := &RecordLog{ + ctx: ctx, + recordLogDb: dao.NewMetaDomain().RecordLogDb(ctx), + } + return s, nil +} + +func (s *RecordLog) Start() error { + log.Info("RecordLog start") + return nil +} + +func (s *RecordLog) Stop() error { + log.Info("RecordLog stop") + return nil +} diff --git a/go/coordinator/internal/metastore/db/dao/common.go b/go/coordinator/internal/metastore/db/dao/common.go index c67cea6c759..771def6f99f 100644 --- a/go/coordinator/internal/metastore/db/dao/common.go +++ b/go/coordinator/internal/metastore/db/dao/common.go @@ -40,3 +40,7 @@ func (*metaDomain) SegmentMetadataDb(ctx context.Context) dbmodel.ISegmentMetada func (*metaDomain) NotificationDb(ctx context.Context) dbmodel.INotificationDb { return ¬ificationDb{dbcore.GetDB(ctx)} } + +func (*metaDomain) RecordLogDb(ctx context.Context) dbmodel.IRecordLogDb { + return &recordLogDb{dbcore.GetDB(ctx)} +} diff --git a/go/coordinator/internal/metastore/db/dao/record_log.go b/go/coordinator/internal/metastore/db/dao/record_log.go new file mode 100644 index 00000000000..d1601e503c8 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dao/record_log.go @@ -0,0 +1,9 @@ +package dao + +import ( + "gorm.io/gorm" +) + +type recordLogDb struct { + db *gorm.DB +} diff --git a/go/coordinator/internal/metastore/db/dao/segment_metadata.go b/go/coordinator/internal/metastore/db/dao/segment_metadata.go index 14d4d2ec2d0..97800c78d8d 100644 --- a/go/coordinator/internal/metastore/db/dao/segment_metadata.go +++ b/go/coordinator/internal/metastore/db/dao/segment_metadata.go @@ -21,7 +21,7 @@ func (s *segmentMetadataDb) DeleteBySegmentID(segmentID string) error { func (s *segmentMetadataDb) DeleteBySegmentIDAndKeys(segmentID string, keys []string) error { return s.db. Where("segment_id = ?", segmentID). - Where("`key` IN ?", keys). + Where("key IN ?", keys). Delete(&dbmodel.SegmentMetadata{}).Error } diff --git a/go/coordinator/internal/metastore/db/dbcore/core.go b/go/coordinator/internal/metastore/db/dbcore/core.go index 95d2885dfc4..ce05a1b4ca1 100644 --- a/go/coordinator/internal/metastore/db/dbcore/core.go +++ b/go/coordinator/internal/metastore/db/dbcore/core.go @@ -3,7 +3,9 @@ package dbcore import ( "context" "fmt" + "os" "reflect" + "strconv" "github.com/chroma/chroma-coordinator/internal/common" "github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel" @@ -11,7 +13,6 @@ import ( "github.com/pingcap/log" "go.uber.org/zap" "gorm.io/driver/postgres" - "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/logger" ) @@ -28,11 +29,13 @@ type DBConfig struct { DBName string MaxIdleConns int MaxOpenConns int + SslMode string } -func Connect(cfg DBConfig) (*gorm.DB, error) { - dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=require", - cfg.Address, cfg.Username, cfg.Password, cfg.DBName, cfg.Port) +func ConnectPostgres(cfg DBConfig) (*gorm.DB, error) { + log.Info("ConnectPostgres", zap.String("host", cfg.Address), zap.String("database", cfg.DBName), zap.Int("port", cfg.Port)) + dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%d sslmode=%s", + cfg.Address, cfg.Username, cfg.Password, cfg.DBName, cfg.Port, cfg.SslMode) ormLogger := logger.Default ormLogger.LogMode(logger.Info) @@ -61,7 +64,7 @@ func Connect(cfg DBConfig) (*gorm.DB, error) { globalDB = db - log.Info("db connected success", + log.Info("Postgres connected success", zap.String("host", cfg.Address), zap.String("database", cfg.DBName), zap.Error(err)) @@ -114,14 +117,7 @@ func GetDB(ctx context.Context) *gorm.DB { return globalDB.WithContext(ctx) } -func ConfigDatabaseForTesting() *gorm.DB { - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ - Logger: logger.Default.LogMode(logger.Info), - }) - if err != nil { - panic("failed to connect database") - } - SetGlobalDB(db) +func CreateTestTables(db *gorm.DB) { // Setup tenant related tables db.Migrator().DropTable(&dbmodel.Tenant{}) db.Migrator().CreateTable(&dbmodel.Tenant{}) @@ -154,5 +150,22 @@ func ConfigDatabaseForTesting() *gorm.DB { // Setup notification related tables db.Migrator().DropTable(&dbmodel.Notification{}) db.Migrator().CreateTable(&dbmodel.Notification{}) +} + +func ConfigDatabaseForTesting() *gorm.DB { + dbAddress := os.Getenv("POSTGRES_HOST") + dbPort, err := strconv.Atoi(os.Getenv("POSTGRES_PORT")) + db, err := ConnectPostgres(DBConfig{ + Username: "chroma", + Password: "chroma", + Address: dbAddress, + Port: dbPort, + DBName: "chroma", + }) + if err != nil { + panic("failed to connect database") + } + SetGlobalDB(db) + CreateTestTables(db) return db } diff --git a/go/coordinator/internal/metastore/db/dbmodel/common.go b/go/coordinator/internal/metastore/db/dbmodel/common.go index d188193ae18..d90b7df55e6 100644 --- a/go/coordinator/internal/metastore/db/dbmodel/common.go +++ b/go/coordinator/internal/metastore/db/dbmodel/common.go @@ -15,6 +15,7 @@ type IMetaDomain interface { SegmentDb(ctx context.Context) ISegmentDb SegmentMetadataDb(ctx context.Context) ISegmentMetadataDb NotificationDb(ctx context.Context) INotificationDb + RecordLogDb(ctx context.Context) IRecordLogDb } //go:generate mockery --name=ITransaction diff --git a/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go b/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go index 0ee94c373e9..50c33f10e6f 100644 --- a/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go +++ b/go/coordinator/internal/metastore/db/dbmodel/mocks/IMetaDomain.go @@ -126,6 +126,21 @@ func (_m *IMetaDomain) TenantDb(ctx context.Context) dbmodel.ITenantDb { return r0 } +func (_m *IMetaDomain) RecordLogDb(ctx context.Context) dbmodel.IRecordLogDb { + ret := _m.Called(ctx) + + var r0 dbmodel.IRecordLogDb + if rf, ok := ret.Get(0).(func(context.Context) dbmodel.IRecordLogDb); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dbmodel.IRecordLogDb) + } + } + + return r0 +} + // NewIMetaDomain creates a new instance of IMetaDomain. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIMetaDomain(t interface { diff --git a/go/coordinator/internal/metastore/db/dbmodel/record_log.go b/go/coordinator/internal/metastore/db/dbmodel/record_log.go new file mode 100644 index 00000000000..de8aeaa75b7 --- /dev/null +++ b/go/coordinator/internal/metastore/db/dbmodel/record_log.go @@ -0,0 +1,16 @@ +package dbmodel + +type RecordLog struct { + CollectionID *string `gorm:"collection_id;primaryKey;autoIncrement:false"` + ID int64 `gorm:"id;primaryKey;"` // auto_increment id + Timestamp int64 `gorm:"timestamp;"` + Record *[]byte `gorm:"record;type:bytea"` +} + +func (v RecordLog) TableName() string { + return "record_logs" +} + +//go:generate mockery --name=IRecordLogDb +type IRecordLogDb interface { +} diff --git a/go/coordinator/internal/proto/coordinatorpb/chroma.pb.go b/go/coordinator/internal/proto/coordinatorpb/chroma.pb.go index 3cec5eefe06..d130dd11af3 100644 --- a/go/coordinator/internal/proto/coordinatorpb/chroma.pb.go +++ b/go/coordinator/internal/proto/coordinatorpb/chroma.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.23.4 +// protoc-gen-go v1.32.0 +// protoc v3.20.3 // source: chromadb/proto/chroma.proto package coordinatorpb @@ -914,7 +914,7 @@ type VectorQueryResult struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` SeqId []byte `protobuf:"bytes,2,opt,name=seq_id,json=seqId,proto3" json:"seq_id,omitempty"` - Distance float64 `protobuf:"fixed64,3,opt,name=distance,proto3" json:"distance,omitempty"` + Distance float32 `protobuf:"fixed32,3,opt,name=distance,proto3" json:"distance,omitempty"` Vector *Vector `protobuf:"bytes,4,opt,name=vector,proto3,oneof" json:"vector,omitempty"` } @@ -964,7 +964,7 @@ func (x *VectorQueryResult) GetSeqId() []byte { return nil } -func (x *VectorQueryResult) GetDistance() float64 { +func (x *VectorQueryResult) GetDistance() float32 { if x != nil { return x.Distance } @@ -1356,7 +1356,7 @@ var file_chromadb_proto_chroma_proto_rawDesc = []byte{ 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x73, 0x65, 0x71, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x65, 0x71, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x69, 0x73, 0x74, - 0x61, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, 0x52, 0x08, 0x64, 0x69, 0x73, 0x74, + 0x61, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02, 0x52, 0x08, 0x64, 0x69, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x12, 0x2b, 0x0a, 0x06, 0x76, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x56, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x06, 0x76, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x88, 0x01, diff --git a/go/coordinator/internal/proto/coordinatorpb/chroma_grpc.pb.go b/go/coordinator/internal/proto/coordinatorpb/chroma_grpc.pb.go index 09283123121..b2d9a178149 100644 --- a/go/coordinator/internal/proto/coordinatorpb/chroma_grpc.pb.go +++ b/go/coordinator/internal/proto/coordinatorpb/chroma_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.20.3 // source: chromadb/proto/chroma.proto package coordinatorpb @@ -18,11 +18,6 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - VectorReader_GetVectors_FullMethodName = "/chroma.VectorReader/GetVectors" - VectorReader_QueryVectors_FullMethodName = "/chroma.VectorReader/QueryVectors" -) - // VectorReaderClient is the client API for VectorReader service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -41,7 +36,7 @@ func NewVectorReaderClient(cc grpc.ClientConnInterface) VectorReaderClient { func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsRequest, opts ...grpc.CallOption) (*GetVectorsResponse, error) { out := new(GetVectorsResponse) - err := c.cc.Invoke(ctx, VectorReader_GetVectors_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.VectorReader/GetVectors", in, out, opts...) if err != nil { return nil, err } @@ -50,7 +45,7 @@ func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsReque func (c *vectorReaderClient) QueryVectors(ctx context.Context, in *QueryVectorsRequest, opts ...grpc.CallOption) (*QueryVectorsResponse, error) { out := new(QueryVectorsResponse) - err := c.cc.Invoke(ctx, VectorReader_QueryVectors_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.VectorReader/QueryVectors", in, out, opts...) if err != nil { return nil, err } @@ -99,7 +94,7 @@ func _VectorReader_GetVectors_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: VectorReader_GetVectors_FullMethodName, + FullMethod: "/chroma.VectorReader/GetVectors", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(VectorReaderServer).GetVectors(ctx, req.(*GetVectorsRequest)) @@ -117,7 +112,7 @@ func _VectorReader_QueryVectors_Handler(srv interface{}, ctx context.Context, de } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: VectorReader_QueryVectors_FullMethodName, + FullMethod: "/chroma.VectorReader/QueryVectors", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(VectorReaderServer).QueryVectors(ctx, req.(*QueryVectorsRequest)) diff --git a/go/coordinator/internal/proto/coordinatorpb/coordinator.pb.go b/go/coordinator/internal/proto/coordinatorpb/coordinator.pb.go index be93392c304..1b5347462e2 100644 --- a/go/coordinator/internal/proto/coordinatorpb/coordinator.pb.go +++ b/go/coordinator/internal/proto/coordinatorpb/coordinator.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.23.4 +// protoc-gen-go v1.32.0 +// protoc v3.20.3 // source: chromadb/proto/coordinator.proto package coordinatorpb diff --git a/go/coordinator/internal/proto/coordinatorpb/coordinator_grpc.pb.go b/go/coordinator/internal/proto/coordinatorpb/coordinator_grpc.pb.go index ed123f9f3a6..74f79e0711d 100644 --- a/go/coordinator/internal/proto/coordinatorpb/coordinator_grpc.pb.go +++ b/go/coordinator/internal/proto/coordinatorpb/coordinator_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.20.3 // source: chromadb/proto/coordinator.proto package coordinatorpb @@ -19,22 +19,6 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 -const ( - SysDB_CreateDatabase_FullMethodName = "/chroma.SysDB/CreateDatabase" - SysDB_GetDatabase_FullMethodName = "/chroma.SysDB/GetDatabase" - SysDB_CreateTenant_FullMethodName = "/chroma.SysDB/CreateTenant" - SysDB_GetTenant_FullMethodName = "/chroma.SysDB/GetTenant" - SysDB_CreateSegment_FullMethodName = "/chroma.SysDB/CreateSegment" - SysDB_DeleteSegment_FullMethodName = "/chroma.SysDB/DeleteSegment" - SysDB_GetSegments_FullMethodName = "/chroma.SysDB/GetSegments" - SysDB_UpdateSegment_FullMethodName = "/chroma.SysDB/UpdateSegment" - SysDB_CreateCollection_FullMethodName = "/chroma.SysDB/CreateCollection" - SysDB_DeleteCollection_FullMethodName = "/chroma.SysDB/DeleteCollection" - SysDB_GetCollections_FullMethodName = "/chroma.SysDB/GetCollections" - SysDB_UpdateCollection_FullMethodName = "/chroma.SysDB/UpdateCollection" - SysDB_ResetState_FullMethodName = "/chroma.SysDB/ResetState" -) - // SysDBClient is the client API for SysDB service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -64,7 +48,7 @@ func NewSysDBClient(cc grpc.ClientConnInterface) SysDBClient { func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_CreateDatabase_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateDatabase", in, out, opts...) if err != nil { return nil, err } @@ -73,7 +57,7 @@ func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequ func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, opts ...grpc.CallOption) (*GetDatabaseResponse, error) { out := new(GetDatabaseResponse) - err := c.cc.Invoke(ctx, SysDB_GetDatabase_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/GetDatabase", in, out, opts...) if err != nil { return nil, err } @@ -82,7 +66,7 @@ func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, o func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_CreateTenant_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateTenant", in, out, opts...) if err != nil { return nil, err } @@ -91,7 +75,7 @@ func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts ...grpc.CallOption) (*GetTenantResponse, error) { out := new(GetTenantResponse) - err := c.cc.Invoke(ctx, SysDB_GetTenant_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/GetTenant", in, out, opts...) if err != nil { return nil, err } @@ -100,7 +84,7 @@ func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_CreateSegment_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateSegment", in, out, opts...) if err != nil { return nil, err } @@ -109,7 +93,7 @@ func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentReques func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_DeleteSegment_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/DeleteSegment", in, out, opts...) if err != nil { return nil, err } @@ -118,7 +102,7 @@ func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentReques func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, opts ...grpc.CallOption) (*GetSegmentsResponse, error) { out := new(GetSegmentsResponse) - err := c.cc.Invoke(ctx, SysDB_GetSegments_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/GetSegments", in, out, opts...) if err != nil { return nil, err } @@ -127,7 +111,7 @@ func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, o func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_UpdateSegment_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/UpdateSegment", in, out, opts...) if err != nil { return nil, err } @@ -136,7 +120,7 @@ func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentReques func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) { out := new(CreateCollectionResponse) - err := c.cc.Invoke(ctx, SysDB_CreateCollection_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateCollection", in, out, opts...) if err != nil { return nil, err } @@ -145,7 +129,7 @@ func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollection func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_DeleteCollection_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/DeleteCollection", in, out, opts...) if err != nil { return nil, err } @@ -154,7 +138,7 @@ func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollection func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequest, opts ...grpc.CallOption) (*GetCollectionsResponse, error) { out := new(GetCollectionsResponse) - err := c.cc.Invoke(ctx, SysDB_GetCollections_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/GetCollections", in, out, opts...) if err != nil { return nil, err } @@ -163,7 +147,7 @@ func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequ func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollectionRequest, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_UpdateCollection_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/UpdateCollection", in, out, opts...) if err != nil { return nil, err } @@ -172,7 +156,7 @@ func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollection func (c *sysDBClient) ResetState(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ChromaResponse, error) { out := new(ChromaResponse) - err := c.cc.Invoke(ctx, SysDB_ResetState_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, "/chroma.SysDB/ResetState", in, out, opts...) if err != nil { return nil, err } @@ -265,7 +249,7 @@ func _SysDB_CreateDatabase_Handler(srv interface{}, ctx context.Context, dec fun } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_CreateDatabase_FullMethodName, + FullMethod: "/chroma.SysDB/CreateDatabase", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateDatabase(ctx, req.(*CreateDatabaseRequest)) @@ -283,7 +267,7 @@ func _SysDB_GetDatabase_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_GetDatabase_FullMethodName, + FullMethod: "/chroma.SysDB/GetDatabase", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetDatabase(ctx, req.(*GetDatabaseRequest)) @@ -301,7 +285,7 @@ func _SysDB_CreateTenant_Handler(srv interface{}, ctx context.Context, dec func( } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_CreateTenant_FullMethodName, + FullMethod: "/chroma.SysDB/CreateTenant", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateTenant(ctx, req.(*CreateTenantRequest)) @@ -319,7 +303,7 @@ func _SysDB_GetTenant_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_GetTenant_FullMethodName, + FullMethod: "/chroma.SysDB/GetTenant", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetTenant(ctx, req.(*GetTenantRequest)) @@ -337,7 +321,7 @@ func _SysDB_CreateSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_CreateSegment_FullMethodName, + FullMethod: "/chroma.SysDB/CreateSegment", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateSegment(ctx, req.(*CreateSegmentRequest)) @@ -355,7 +339,7 @@ func _SysDB_DeleteSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_DeleteSegment_FullMethodName, + FullMethod: "/chroma.SysDB/DeleteSegment", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).DeleteSegment(ctx, req.(*DeleteSegmentRequest)) @@ -373,7 +357,7 @@ func _SysDB_GetSegments_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_GetSegments_FullMethodName, + FullMethod: "/chroma.SysDB/GetSegments", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetSegments(ctx, req.(*GetSegmentsRequest)) @@ -391,7 +375,7 @@ func _SysDB_UpdateSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_UpdateSegment_FullMethodName, + FullMethod: "/chroma.SysDB/UpdateSegment", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).UpdateSegment(ctx, req.(*UpdateSegmentRequest)) @@ -409,7 +393,7 @@ func _SysDB_CreateCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_CreateCollection_FullMethodName, + FullMethod: "/chroma.SysDB/CreateCollection", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateCollection(ctx, req.(*CreateCollectionRequest)) @@ -427,7 +411,7 @@ func _SysDB_DeleteCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_DeleteCollection_FullMethodName, + FullMethod: "/chroma.SysDB/DeleteCollection", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).DeleteCollection(ctx, req.(*DeleteCollectionRequest)) @@ -445,7 +429,7 @@ func _SysDB_GetCollections_Handler(srv interface{}, ctx context.Context, dec fun } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_GetCollections_FullMethodName, + FullMethod: "/chroma.SysDB/GetCollections", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetCollections(ctx, req.(*GetCollectionsRequest)) @@ -463,7 +447,7 @@ func _SysDB_UpdateCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_UpdateCollection_FullMethodName, + FullMethod: "/chroma.SysDB/UpdateCollection", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).UpdateCollection(ctx, req.(*UpdateCollectionRequest)) @@ -481,7 +465,7 @@ func _SysDB_ResetState_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SysDB_ResetState_FullMethodName, + FullMethod: "/chroma.SysDB/ResetState", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).ResetState(ctx, req.(*emptypb.Empty)) diff --git a/go/coordinator/internal/proto/logservicepb/logservice.pb.go b/go/coordinator/internal/proto/logservicepb/logservice.pb.go new file mode 100644 index 00000000000..6eaa51a4349 --- /dev/null +++ b/go/coordinator/internal/proto/logservicepb/logservice.pb.go @@ -0,0 +1,67 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v3.20.3 +// source: chromadb/proto/logservice.proto + +package logservicepb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var File_chromadb_proto_logservice_proto protoreflect.FileDescriptor + +var file_chromadb_proto_logservice_proto_rawDesc = []byte{ + 0x0a, 0x1f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x64, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x06, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x32, 0x0c, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2f, 0x63, 0x68, 0x72, + 0x6f, 0x6d, 0x61, 0x2d, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, + 0x6f, 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var file_chromadb_proto_logservice_proto_goTypes = []interface{}{} +var file_chromadb_proto_logservice_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_chromadb_proto_logservice_proto_init() } +func file_chromadb_proto_logservice_proto_init() { + if File_chromadb_proto_logservice_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_chromadb_proto_logservice_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_chromadb_proto_logservice_proto_goTypes, + DependencyIndexes: file_chromadb_proto_logservice_proto_depIdxs, + }.Build() + File_chromadb_proto_logservice_proto = out.File + file_chromadb_proto_logservice_proto_rawDesc = nil + file_chromadb_proto_logservice_proto_goTypes = nil + file_chromadb_proto_logservice_proto_depIdxs = nil +} diff --git a/go/coordinator/internal/proto/logservicepb/logservice_grpc.pb.go b/go/coordinator/internal/proto/logservicepb/logservice_grpc.pb.go new file mode 100644 index 00000000000..5a89141fa81 --- /dev/null +++ b/go/coordinator/internal/proto/logservicepb/logservice_grpc.pb.go @@ -0,0 +1,65 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.20.3 +// source: chromadb/proto/logservice.proto + +package logservicepb + +import ( + grpc "google.golang.org/grpc" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// LogServiceClient is the client API for LogService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type LogServiceClient interface { +} + +type logServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewLogServiceClient(cc grpc.ClientConnInterface) LogServiceClient { + return &logServiceClient{cc} +} + +// LogServiceServer is the server API for LogService service. +// All implementations must embed UnimplementedLogServiceServer +// for forward compatibility +type LogServiceServer interface { + mustEmbedUnimplementedLogServiceServer() +} + +// UnimplementedLogServiceServer must be embedded to have forward compatible implementations. +type UnimplementedLogServiceServer struct { +} + +func (UnimplementedLogServiceServer) mustEmbedUnimplementedLogServiceServer() {} + +// UnsafeLogServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to LogServiceServer will +// result in compilation errors. +type UnsafeLogServiceServer interface { + mustEmbedUnimplementedLogServiceServer() +} + +func RegisterLogServiceServer(s grpc.ServiceRegistrar, srv LogServiceServer) { + s.RegisterService(&LogService_ServiceDesc, srv) +} + +// LogService_ServiceDesc is the grpc.ServiceDesc for LogService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var LogService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "chroma.LogService", + HandlerType: (*LogServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "chromadb/proto/logservice.proto", +} diff --git a/go/coordinator/migrations/20231129183041.sql b/go/coordinator/migrations/20231129183041.sql deleted file mode 100644 index 2a31ebb4877..00000000000 --- a/go/coordinator/migrations/20231129183041.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Create "notifications" table -CREATE TABLE "public"."notifications" ( - "id" bigserial NOT NULL, - "collection_id" text NULL, - "type" text NULL, - "status" text NULL, - PRIMARY KEY ("id") -); diff --git a/go/coordinator/migrations/20231116210409.sql b/go/coordinator/migrations/20240215010425.sql similarity index 86% rename from go/coordinator/migrations/20231116210409.sql rename to go/coordinator/migrations/20240215010425.sql index bb9c8d8a00c..378c5d630e5 100644 --- a/go/coordinator/migrations/20231116210409.sql +++ b/go/coordinator/migrations/20240215010425.sql @@ -38,6 +38,22 @@ CREATE TABLE "public"."databases" ( ); -- Create index "idx_tenantid_name" to table: "databases" CREATE UNIQUE INDEX "idx_tenantid_name" ON "public"."databases" ("name", "tenant_id"); +-- Create "notifications" table +CREATE TABLE "public"."notifications" ( + "id" bigserial NOT NULL, + "collection_id" text NULL, + "type" text NULL, + "status" text NULL, + PRIMARY KEY ("id") +); +-- Create "record_logs" table +CREATE TABLE "public"."record_logs" ( + "collection_id" text NOT NULL, + "id" bigserial NOT NULL, + "timestamp" bigint NULL, + "record" bytea NULL, + PRIMARY KEY ("collection_id", "id") +); -- Create "segment_metadata" table CREATE TABLE "public"."segment_metadata" ( "segment_id" text NOT NULL, diff --git a/go/coordinator/migrations/atlas.sum b/go/coordinator/migrations/atlas.sum index d4ee513fa90..624c7eabe3a 100644 --- a/go/coordinator/migrations/atlas.sum +++ b/go/coordinator/migrations/atlas.sum @@ -1,3 +1,2 @@ -h1:j28ectYxexGfQz/LClD7yYVUHAfIcPHlboAJ1Qw0G7I= -20231116210409.sql h1:vwZRvrXrUMOuDykEaheyEzsnNCpmH73x0QEefzUtf8o= -20231129183041.sql h1:FglI5Hjf7kqvjCsSYWkK2IGS2aThQBaVhpg9WekhNEA= +h1:OoMkQddKcFi1jQ4pCp2i8IJAIEDHjQpI3mw+sHoQ1fI= +20240215010425.sql h1:U4h0i9epzZOrFesFlcMJ8250n3SoY5Uv0AejgcZCTTw= diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto new file mode 100644 index 00000000000..18c32a6a0d4 --- /dev/null +++ b/idl/chromadb/proto/logservice.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package chroma; +option go_package = "github.com/chroma/chroma-coordinator/internal/proto/logservicepb"; + +service LogService { + +} diff --git a/idl/makefile b/idl/makefile index 18cbc1977ba..183fd24a198 100644 --- a/idl/makefile +++ b/idl/makefile @@ -17,6 +17,7 @@ proto_go: --go-grpc_opt paths=source_relative \ --plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" \ chromadb/proto/*.proto + @mv ../go/coordinator/internal/proto/coordinatorpb/chromadb/proto/logservice*.go ../go/coordinator/internal/proto/logservicepb/ @mv ../go/coordinator/internal/proto/coordinatorpb/chromadb/proto/*.go ../go/coordinator/internal/proto/coordinatorpb/ @rm -rf ../go/coordinator/internal/proto/coordinatorpb/chromadb @echo "Done" diff --git a/k8s/deployment/kubernetes.yaml b/k8s/deployment/kubernetes.yaml index b1f9baabdd0..5b5ec4a7a84 100644 --- a/k8s/deployment/kubernetes.yaml +++ b/k8s/deployment/kubernetes.yaml @@ -77,6 +77,76 @@ spec: --- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: chroma +spec: + ports: + - name: postgres-port + port: 5432 + targetPort: 5432 + selector: + app: postgres + type: ClusterIP + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: postgres:14.1-alpine + env: + - name: POSTGRES_DB + value: chroma + - name: POSTGRES_USER + value: chroma + - name: POSTGRES_PASSWORD + value: chroma + ports: + - containerPort: 5432 + +--- + +apiVersion: batch/v1 +kind: Job +metadata: + name: migration + namespace: chroma +spec: + template: + metadata: + labels: + app: migration + spec: + restartPolicy: OnFailure + containers: + - args: + - 'migrate' + - 'apply' + - '--url' + - 'postgres://chroma:chroma@postgres:5432/chroma?sslmode=disable' + image: migration + imagePullPolicy: IfNotPresent + name: migration + +--- + apiVersion: v1 kind: Service metadata: @@ -188,7 +258,7 @@ spec: spec: containers: - command: - - "chroma" + - "coordinator" - "coordinator" - "--pulsar-admin-url=http://pulsar.chroma:8080" - "--pulsar-url=pulsar://pulsar.chroma:6650" @@ -219,3 +289,47 @@ spec: selector: app: coordinator type: ClusterIP + +--- + +apiVersion: v1 +kind: Service +metadata: + name: logservice + namespace: chroma +spec: + ports: + - name: grpc + port: 50051 + targetPort: grpc + selector: + app: logservice + type: ClusterIP + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: logservice + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: logservice + template: + metadata: + labels: + app: logservice + spec: + containers: + - command: + - "logservice" + - "logservice" + image: chroma-coordinator + imagePullPolicy: IfNotPresent + name: logservice + ports: + - containerPort: 50051 + name: grpc diff --git a/k8s/dev/coordinator.yaml b/k8s/dev/coordinator.yaml index ce897d44c82..f7f8c122bd4 100644 --- a/k8s/dev/coordinator.yaml +++ b/k8s/dev/coordinator.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - command: - - "chroma" + - "coordinator" - "coordinator" - "--pulsar-admin-url=http://pulsar.chroma:8080" - "--pulsar-url=pulsar://pulsar.chroma:6650" @@ -39,4 +39,4 @@ spec: targetPort: grpc selector: app: coordinator - type: ClusterIP \ No newline at end of file + type: ClusterIP diff --git a/k8s/dev/logservice.yaml b/k8s/dev/logservice.yaml new file mode 100644 index 00000000000..a4b491116ee --- /dev/null +++ b/k8s/dev/logservice.yaml @@ -0,0 +1,39 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: logservice + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: logservice + template: + metadata: + labels: + app: logservice + spec: + containers: + - command: + - "logservice" + - "logservice" + image: coordinator + imagePullPolicy: IfNotPresent + name: logservice + ports: + - containerPort: 50051 + name: grpc +--- +apiVersion: v1 +kind: Service +metadata: + name: logservice + namespace: chroma +spec: + ports: + - name: grpc + port: 50051 + targetPort: grpc + selector: + app: logservice + type: ClusterIP diff --git a/k8s/dev/migration.yaml b/k8s/dev/migration.yaml new file mode 100644 index 00000000000..df4ac881740 --- /dev/null +++ b/k8s/dev/migration.yaml @@ -0,0 +1,22 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: migration + namespace: chroma +spec: + template: + metadata: + labels: + app: migration + spec: + restartPolicy: OnFailure + containers: + - args: + - 'migrate' + - 'apply' + - '--url' + - 'postgres://chroma:chroma@postgres:5432/chroma?sslmode=disable' + image: migration + imagePullPolicy: IfNotPresent + name: migration +--- diff --git a/k8s/dev/postgres.yaml b/k8s/dev/postgres.yaml new file mode 100644 index 00000000000..e2b8fad3159 --- /dev/null +++ b/k8s/dev/postgres.yaml @@ -0,0 +1,41 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: chroma +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: postgres:14.1-alpine + env: + - name: POSTGRES_DB + value: chroma + - name: POSTGRES_USER + value: chroma + - name: POSTGRES_PASSWORD + value: chroma + ports: + - containerPort: 5432 +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: chroma +spec: + ports: + - name: postgres-port + port: 5432 + targetPort: 5432 + selector: + app: postgres + type: ClusterIP