From 82e3db68896d8fe42091e3d1529cd93a4e3b6c10 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 20 Mar 2024 12:52:14 -0700 Subject: [PATCH 1/5] [ENH] Add end_timestamp to PullLog API (#1897) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - ... - New functionality - This PR adds end_timestamp to the PullLog API ## Test plan *How are these changes tested?* - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- chromadb/logservice/logservice.py | 1 + chromadb/proto/logservice_pb2.py | 51 ++++--- chromadb/proto/logservice_pb2.pyi | 56 ++++++-- go/pkg/logservice/apis.go | 9 +- go/pkg/logservice/grpc/record_log_service.go | 3 +- go/pkg/metastore/db/dao/record_log.go | 26 +++- go/pkg/metastore/db/dao/record_log_test.go | 22 ++- go/pkg/metastore/db/dbmodel/record_log.go | 2 +- go/pkg/proto/coordinatorpb/chroma.pb.go | 4 +- go/pkg/proto/coordinatorpb/chroma_grpc.pb.go | 17 ++- go/pkg/proto/coordinatorpb/coordinator.pb.go | 4 +- .../coordinatorpb/coordinator_grpc.pb.go | 87 +++++++----- go/pkg/proto/logservicepb/logservice.pb.go | 128 ++++++++++-------- .../proto/logservicepb/logservice_grpc.pb.go | 22 +-- idl/chromadb/proto/logservice.proto | 1 + .../src/execution/operators/pull_log.rs | 1 + rust/worker/src/log/log.rs | 12 +- 17 files changed, 285 insertions(+), 161 deletions(-) diff --git a/chromadb/logservice/logservice.py b/chromadb/logservice/logservice.py index 9683d170a958..83161d519db0 100644 --- a/chromadb/logservice/logservice.py +++ b/chromadb/logservice/logservice.py @@ -166,6 +166,7 @@ def pull_logs( collection_id=str(collection_id), start_from_id=start_id, batch_size=batch_size, + end_timestamp=-1, ) response = self._log_service_stub.PullLogs(request) return response.records # type: ignore diff --git a/chromadb/proto/logservice_pb2.py b/chromadb/proto/logservice_pb2.py index f4a7b89cfff6..39f62f13aa7f 100644 --- a/chromadb/proto/logservice_pb2.py +++ b/chromadb/proto/logservice_pb2.py @@ -6,6 +6,7 @@ from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder + # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -14,30 +15,36 @@ from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\"X\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12.\n\x07records\x18\x02 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord\"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05\"S\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x15\n\rstart_from_id\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05\"J\n\tRecordLog\x12\x0e\n\x06log_id\x18\x01 \x01(\x03\x12-\n\x06record\x18\x02 \x01(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord\"6\n\x10PullLogsResponse\x12\"\n\x07records\x18\x01 \x03(\x0b\x32\x11.chroma.RecordLog\"V\n\x0e\x43ollectionInfo\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x14\n\x0c\x66irst_log_id\x18\x02 \x01(\x03\x12\x17\n\x0f\x66irst_log_id_ts\x18\x03 \x01(\x03\"&\n$GetAllCollectionInfoToCompactRequest\"\\\n%GetAllCollectionInfoToCompactResponse\x12\x33\n\x13\x61ll_collection_info\x18\x01 \x03(\x0b\x32\x16.chroma.CollectionInfo2\x8e\x02\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse\"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse\"\x00\x12~\n\x1dGetAllCollectionInfoToCompact\x12,.chroma.GetAllCollectionInfoToCompactRequest\x1a-.chroma.GetAllCollectionInfoToCompactResponse\"\x00\x42\x39Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepbb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto"X\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12.\n\x07records\x18\x02 \x03(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05"j\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x15\n\rstart_from_id\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05\x12\x15\n\rend_timestamp\x18\x04 \x01(\x03"J\n\tRecordLog\x12\x0e\n\x06log_id\x18\x01 \x01(\x03\x12-\n\x06record\x18\x02 \x01(\x0b\x32\x1d.chroma.SubmitEmbeddingRecord"6\n\x10PullLogsResponse\x12"\n\x07records\x18\x01 \x03(\x0b\x32\x11.chroma.RecordLog"V\n\x0e\x43ollectionInfo\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x14\n\x0c\x66irst_log_id\x18\x02 \x01(\x03\x12\x17\n\x0f\x66irst_log_id_ts\x18\x03 \x01(\x03"&\n$GetAllCollectionInfoToCompactRequest"\\\n%GetAllCollectionInfoToCompactResponse\x12\x33\n\x13\x61ll_collection_info\x18\x01 \x03(\x0b\x32\x16.chroma.CollectionInfo2\x8e\x02\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse"\x00\x12~\n\x1dGetAllCollectionInfoToCompact\x12,.chroma.GetAllCollectionInfoToCompactRequest\x1a-.chroma.GetAllCollectionInfoToCompactResponse"\x00\x42\x39Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepbb\x06proto3' +) _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'chromadb.proto.logservice_pb2', _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "chromadb.proto.logservice_pb2", _globals +) if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - DESCRIPTOR._serialized_options = b'Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepb' - _globals['_PUSHLOGSREQUEST']._serialized_start=72 - _globals['_PUSHLOGSREQUEST']._serialized_end=160 - _globals['_PUSHLOGSRESPONSE']._serialized_start=162 - _globals['_PUSHLOGSRESPONSE']._serialized_end=202 - _globals['_PULLLOGSREQUEST']._serialized_start=204 - _globals['_PULLLOGSREQUEST']._serialized_end=287 - _globals['_RECORDLOG']._serialized_start=289 - _globals['_RECORDLOG']._serialized_end=363 - _globals['_PULLLOGSRESPONSE']._serialized_start=365 - _globals['_PULLLOGSRESPONSE']._serialized_end=419 - _globals['_COLLECTIONINFO']._serialized_start=421 - _globals['_COLLECTIONINFO']._serialized_end=507 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTREQUEST']._serialized_start=509 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTREQUEST']._serialized_end=547 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_start=549 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_end=641 - _globals['_LOGSERVICE']._serialized_start=644 - _globals['_LOGSERVICE']._serialized_end=914 + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = ( + b"Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepb" + ) + _globals["_PUSHLOGSREQUEST"]._serialized_start = 72 + _globals["_PUSHLOGSREQUEST"]._serialized_end = 160 + _globals["_PUSHLOGSRESPONSE"]._serialized_start = 162 + _globals["_PUSHLOGSRESPONSE"]._serialized_end = 202 + _globals["_PULLLOGSREQUEST"]._serialized_start = 204 + _globals["_PULLLOGSREQUEST"]._serialized_end = 310 + _globals["_RECORDLOG"]._serialized_start = 312 + _globals["_RECORDLOG"]._serialized_end = 386 + _globals["_PULLLOGSRESPONSE"]._serialized_start = 388 + _globals["_PULLLOGSRESPONSE"]._serialized_end = 442 + _globals["_COLLECTIONINFO"]._serialized_start = 444 + _globals["_COLLECTIONINFO"]._serialized_end = 530 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTREQUEST"]._serialized_start = 532 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTREQUEST"]._serialized_end = 570 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE"]._serialized_start = 572 + _globals["_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE"]._serialized_end = 664 + _globals["_LOGSERVICE"]._serialized_start = 667 + _globals["_LOGSERVICE"]._serialized_end = 937 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/logservice_pb2.pyi b/chromadb/proto/logservice_pb2.pyi index e7e58ebe8a8f..f4be90b553a5 100644 --- a/chromadb/proto/logservice_pb2.pyi +++ b/chromadb/proto/logservice_pb2.pyi @@ -2,7 +2,13 @@ from chromadb.proto import chroma_pb2 as _chroma_pb2 from google.protobuf.internal import containers as _containers from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message -from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union +from typing import ( + ClassVar as _ClassVar, + Iterable as _Iterable, + Mapping as _Mapping, + Optional as _Optional, + Union as _Union, +) DESCRIPTOR: _descriptor.FileDescriptor @@ -11,8 +17,16 @@ class PushLogsRequest(_message.Message): COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] RECORDS_FIELD_NUMBER: _ClassVar[int] collection_id: str - records: _containers.RepeatedCompositeFieldContainer[_chroma_pb2.SubmitEmbeddingRecord] - def __init__(self, collection_id: _Optional[str] = ..., records: _Optional[_Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]]] = ...) -> None: ... + records: _containers.RepeatedCompositeFieldContainer[ + _chroma_pb2.SubmitEmbeddingRecord + ] + def __init__( + self, + collection_id: _Optional[str] = ..., + records: _Optional[ + _Iterable[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]] + ] = ..., + ) -> None: ... class PushLogsResponse(_message.Message): __slots__ = ["record_count"] @@ -21,14 +35,22 @@ class PushLogsResponse(_message.Message): def __init__(self, record_count: _Optional[int] = ...) -> None: ... class PullLogsRequest(_message.Message): - __slots__ = ["collection_id", "start_from_id", "batch_size"] + __slots__ = ["collection_id", "start_from_id", "batch_size", "end_timestamp"] COLLECTION_ID_FIELD_NUMBER: _ClassVar[int] START_FROM_ID_FIELD_NUMBER: _ClassVar[int] BATCH_SIZE_FIELD_NUMBER: _ClassVar[int] + END_TIMESTAMP_FIELD_NUMBER: _ClassVar[int] collection_id: str start_from_id: int batch_size: int - def __init__(self, collection_id: _Optional[str] = ..., start_from_id: _Optional[int] = ..., batch_size: _Optional[int] = ...) -> None: ... + end_timestamp: int + def __init__( + self, + collection_id: _Optional[str] = ..., + start_from_id: _Optional[int] = ..., + batch_size: _Optional[int] = ..., + end_timestamp: _Optional[int] = ..., + ) -> None: ... class RecordLog(_message.Message): __slots__ = ["log_id", "record"] @@ -36,13 +58,19 @@ class RecordLog(_message.Message): RECORD_FIELD_NUMBER: _ClassVar[int] log_id: int record: _chroma_pb2.SubmitEmbeddingRecord - def __init__(self, log_id: _Optional[int] = ..., record: _Optional[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]] = ...) -> None: ... + def __init__( + self, + log_id: _Optional[int] = ..., + record: _Optional[_Union[_chroma_pb2.SubmitEmbeddingRecord, _Mapping]] = ..., + ) -> None: ... class PullLogsResponse(_message.Message): __slots__ = ["records"] RECORDS_FIELD_NUMBER: _ClassVar[int] records: _containers.RepeatedCompositeFieldContainer[RecordLog] - def __init__(self, records: _Optional[_Iterable[_Union[RecordLog, _Mapping]]] = ...) -> None: ... + def __init__( + self, records: _Optional[_Iterable[_Union[RecordLog, _Mapping]]] = ... + ) -> None: ... class CollectionInfo(_message.Message): __slots__ = ["collection_id", "first_log_id", "first_log_id_ts"] @@ -52,7 +80,12 @@ class CollectionInfo(_message.Message): collection_id: str first_log_id: int first_log_id_ts: int - def __init__(self, collection_id: _Optional[str] = ..., first_log_id: _Optional[int] = ..., first_log_id_ts: _Optional[int] = ...) -> None: ... + def __init__( + self, + collection_id: _Optional[str] = ..., + first_log_id: _Optional[int] = ..., + first_log_id_ts: _Optional[int] = ..., + ) -> None: ... class GetAllCollectionInfoToCompactRequest(_message.Message): __slots__ = [] @@ -62,4 +95,9 @@ class GetAllCollectionInfoToCompactResponse(_message.Message): __slots__ = ["all_collection_info"] ALL_COLLECTION_INFO_FIELD_NUMBER: _ClassVar[int] all_collection_info: _containers.RepeatedCompositeFieldContainer[CollectionInfo] - def __init__(self, all_collection_info: _Optional[_Iterable[_Union[CollectionInfo, _Mapping]]] = ...) -> None: ... + def __init__( + self, + all_collection_info: _Optional[ + _Iterable[_Union[CollectionInfo, _Mapping]] + ] = ..., + ) -> None: ... diff --git a/go/pkg/logservice/apis.go b/go/pkg/logservice/apis.go index 778b03971524..40736ee6cfa0 100644 --- a/go/pkg/logservice/apis.go +++ b/go/pkg/logservice/apis.go @@ -2,6 +2,7 @@ package logservice import ( "context" + "github.com/chroma-core/chroma/go/pkg/common" "github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel" "github.com/chroma-core/chroma/go/pkg/types" @@ -11,17 +12,19 @@ type ( IRecordLog interface { common.Component PushLogs(ctx context.Context, collectionID types.UniqueID, recordContent [][]byte) (int, error) - PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) + PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error) GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error) } ) +var _ IRecordLog = &RecordLog{} + func (s *RecordLog) PushLogs(ctx context.Context, collectionID types.UniqueID, recordsContent [][]byte) (int, error) { return s.recordLogDb.PushLogs(collectionID, recordsContent) } -func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) { - return s.recordLogDb.PullLogs(collectionID, id, batchSize) +func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error) { + return s.recordLogDb.PullLogs(collectionID, id, batchSize, endTimestamp) } func (s *RecordLog) GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error) { diff --git a/go/pkg/logservice/grpc/record_log_service.go b/go/pkg/logservice/grpc/record_log_service.go index f68e141c0c6f..b1899a0b360c 100644 --- a/go/pkg/logservice/grpc/record_log_service.go +++ b/go/pkg/logservice/grpc/record_log_service.go @@ -2,6 +2,7 @@ package grpc import ( "context" + "github.com/chroma-core/chroma/go/pkg/grpcutils" "github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel" "github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb" @@ -57,7 +58,7 @@ func (s *Server) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequest return nil, err } records := make([]*logservicepb.RecordLog, 0) - recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize)) + recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize), req.GetEndTimestamp()) if err != nil { log.Error("error pulling logs", zap.Error(err)) return nil, grpcutils.BuildInternalGrpcError(err.Error()) diff --git a/go/pkg/metastore/db/dao/record_log.go b/go/pkg/metastore/db/dao/record_log.go index c7a15697df19..aa0c102929c4 100644 --- a/go/pkg/metastore/db/dao/record_log.go +++ b/go/pkg/metastore/db/dao/record_log.go @@ -3,18 +3,21 @@ package dao import ( "database/sql" "errors" + "time" + "github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel" "github.com/chroma-core/chroma/go/pkg/types" "github.com/pingcap/log" "go.uber.org/zap" "gorm.io/gorm" - "time" ) type recordLogDb struct { db *gorm.DB } +var _ dbmodel.IRecordLogDb = &recordLogDb{} + func (s *recordLogDb) PushLogs(collectionID types.UniqueID, recordsContent [][]byte) (int, error) { err := s.db.Transaction(func(tx *gorm.DB) error { var timestamp = time.Now().UnixNano() @@ -58,18 +61,27 @@ func (s *recordLogDb) PushLogs(collectionID types.UniqueID, recordsContent [][]b return len(recordsContent), nil } -func (s *recordLogDb) PullLogs(collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) { +func (s *recordLogDb) PullLogs(collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error) { var collectionIDStr = types.FromUniqueID(collectionID) log.Info("PullLogs", zap.String("collectionID", *collectionIDStr), zap.Int64("ID", id), - zap.Int("batch_size", batchSize)) + zap.Int("batch_size", batchSize), + zap.Int64("endTimestamp", endTimestamp)) var recordLogs []*dbmodel.RecordLog - result := s.db.Where("collection_id = ? AND id >= ?", collectionIDStr, id).Order("id").Limit(batchSize).Find(&recordLogs) - if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { - log.Error("PullLogs error", zap.Error(result.Error)) - return nil, result.Error + if endTimestamp > 0 { + result := s.db.Where("collection_id = ? AND id >= ? AND timestamp <= ?", collectionIDStr, id, endTimestamp).Order("id").Limit(batchSize).Find(&recordLogs) + if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { + log.Error("PullLogs error", zap.Error(result.Error)) + return nil, result.Error + } + } else { + result := s.db.Where("collection_id = ? AND id >= ?", collectionIDStr, id).Order("id").Limit(batchSize).Find(&recordLogs) + if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { + log.Error("PullLogs error", zap.Error(result.Error)) + return nil, result.Error + } } log.Info("PullLogs", zap.String("collectionID", *collectionIDStr), diff --git a/go/pkg/metastore/db/dao/record_log_test.go b/go/pkg/metastore/db/dao/record_log_test.go index 9edf8c149e41..b28f904a6d8a 100644 --- a/go/pkg/metastore/db/dao/record_log_test.go +++ b/go/pkg/metastore/db/dao/record_log_test.go @@ -1,6 +1,9 @@ package dao import ( + "testing" + "time" + "github.com/chroma-core/chroma/go/pkg/logservice/testutils" "github.com/chroma-core/chroma/go/pkg/metastore/db/dbcore" "github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel" @@ -8,7 +11,6 @@ import ( "github.com/pingcap/log" "github.com/stretchr/testify/suite" "gorm.io/gorm" - "testing" ) type RecordLogDbTestSuite struct { @@ -101,7 +103,8 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PushLogs() { func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() { // pull empty logs var recordLogs []*dbmodel.RecordLog - recordLogs, err := suite.Db.PullLogs(suite.collectionId1, 0, 3) + invalidEndTimestamp := int64(-1) + recordLogs, err := suite.Db.PullLogs(suite.collectionId1, 0, 3, invalidEndTimestamp) suite.NoError(err) suite.Len(recordLogs, 0) @@ -114,7 +117,7 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() { suite.Equal(2, count) // pull logs from id 0 batch_size 3 - recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 3) + recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 3, invalidEndTimestamp) suite.NoError(err) suite.Len(recordLogs, 3) for index := range recordLogs { @@ -123,7 +126,7 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() { } // pull logs from id 0 batch_size 6 - recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 6) + recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 6, invalidEndTimestamp) suite.NoError(err) suite.Len(recordLogs, 5) @@ -133,7 +136,16 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() { } // pull logs from id 3 batch_size 4 - recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4) + recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4, invalidEndTimestamp) + suite.NoError(err) + suite.Len(recordLogs, 3) + for index := range recordLogs { + suite.Equal(int64(index+3), recordLogs[index].ID) + suite.Equal(suite.records[index+2], *recordLogs[index].Record) + } + + // pull logs from id 3 batch_size 4 endTimestamp Now + recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4, time.Now().UnixNano()) suite.NoError(err) suite.Len(recordLogs, 3) for index := range recordLogs { diff --git a/go/pkg/metastore/db/dbmodel/record_log.go b/go/pkg/metastore/db/dbmodel/record_log.go index 72ff7f8f2f61..5bd4da0ca0a5 100644 --- a/go/pkg/metastore/db/dbmodel/record_log.go +++ b/go/pkg/metastore/db/dbmodel/record_log.go @@ -18,6 +18,6 @@ func (v RecordLog) TableName() string { //go:generate mockery --name=IRecordLogDb type IRecordLogDb interface { PushLogs(collectionID types.UniqueID, recordsContent [][]byte) (int, error) - PullLogs(collectionID types.UniqueID, id int64, batchSize int) ([]*RecordLog, error) + PullLogs(collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*RecordLog, error) GetAllCollectionsToCompact() ([]*RecordLog, error) } diff --git a/go/pkg/proto/coordinatorpb/chroma.pb.go b/go/pkg/proto/coordinatorpb/chroma.pb.go index 208d297e1c30..62566a9a0839 100644 --- a/go/pkg/proto/coordinatorpb/chroma.pb.go +++ b/go/pkg/proto/coordinatorpb/chroma.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.31.0 +// protoc v4.23.4 // source: chromadb/proto/chroma.proto package coordinatorpb diff --git a/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go b/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go index 0b45e03517f2..09283123121b 100644 --- a/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go +++ b/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.3 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/chroma.proto package coordinatorpb @@ -18,6 +18,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + VectorReader_GetVectors_FullMethodName = "/chroma.VectorReader/GetVectors" + VectorReader_QueryVectors_FullMethodName = "/chroma.VectorReader/QueryVectors" +) + // VectorReaderClient is the client API for VectorReader service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -36,7 +41,7 @@ func NewVectorReaderClient(cc grpc.ClientConnInterface) VectorReaderClient { func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsRequest, opts ...grpc.CallOption) (*GetVectorsResponse, error) { out := new(GetVectorsResponse) - err := c.cc.Invoke(ctx, "/chroma.VectorReader/GetVectors", in, out, opts...) + err := c.cc.Invoke(ctx, VectorReader_GetVectors_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -45,7 +50,7 @@ func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsReque func (c *vectorReaderClient) QueryVectors(ctx context.Context, in *QueryVectorsRequest, opts ...grpc.CallOption) (*QueryVectorsResponse, error) { out := new(QueryVectorsResponse) - err := c.cc.Invoke(ctx, "/chroma.VectorReader/QueryVectors", in, out, opts...) + err := c.cc.Invoke(ctx, VectorReader_QueryVectors_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -94,7 +99,7 @@ func _VectorReader_GetVectors_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.VectorReader/GetVectors", + FullMethod: VectorReader_GetVectors_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(VectorReaderServer).GetVectors(ctx, req.(*GetVectorsRequest)) @@ -112,7 +117,7 @@ func _VectorReader_QueryVectors_Handler(srv interface{}, ctx context.Context, de } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.VectorReader/QueryVectors", + FullMethod: VectorReader_QueryVectors_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(VectorReaderServer).QueryVectors(ctx, req.(*QueryVectorsRequest)) diff --git a/go/pkg/proto/coordinatorpb/coordinator.pb.go b/go/pkg/proto/coordinatorpb/coordinator.pb.go index 085f6988055b..6995099b9d19 100644 --- a/go/pkg/proto/coordinatorpb/coordinator.pb.go +++ b/go/pkg/proto/coordinatorpb/coordinator.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.31.0 +// protoc v4.23.4 // source: chromadb/proto/coordinator.proto package coordinatorpb diff --git a/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go b/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go index d6ae92167c39..1306dbc17930 100644 --- a/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go +++ b/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.3 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/coordinator.proto package coordinatorpb @@ -19,6 +19,25 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + SysDB_CreateDatabase_FullMethodName = "/chroma.SysDB/CreateDatabase" + SysDB_GetDatabase_FullMethodName = "/chroma.SysDB/GetDatabase" + SysDB_CreateTenant_FullMethodName = "/chroma.SysDB/CreateTenant" + SysDB_GetTenant_FullMethodName = "/chroma.SysDB/GetTenant" + SysDB_CreateSegment_FullMethodName = "/chroma.SysDB/CreateSegment" + SysDB_DeleteSegment_FullMethodName = "/chroma.SysDB/DeleteSegment" + SysDB_GetSegments_FullMethodName = "/chroma.SysDB/GetSegments" + SysDB_UpdateSegment_FullMethodName = "/chroma.SysDB/UpdateSegment" + SysDB_CreateCollection_FullMethodName = "/chroma.SysDB/CreateCollection" + SysDB_DeleteCollection_FullMethodName = "/chroma.SysDB/DeleteCollection" + SysDB_GetCollections_FullMethodName = "/chroma.SysDB/GetCollections" + SysDB_UpdateCollection_FullMethodName = "/chroma.SysDB/UpdateCollection" + SysDB_ResetState_FullMethodName = "/chroma.SysDB/ResetState" + SysDB_GetLastCompactionTimeForTenant_FullMethodName = "/chroma.SysDB/GetLastCompactionTimeForTenant" + SysDB_SetLastCompactionTimeForTenant_FullMethodName = "/chroma.SysDB/SetLastCompactionTimeForTenant" + SysDB_FlushCollectionCompaction_FullMethodName = "/chroma.SysDB/FlushCollectionCompaction" +) + // SysDBClient is the client API for SysDB service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -51,7 +70,7 @@ func NewSysDBClient(cc grpc.ClientConnInterface) SysDBClient { func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequest, opts ...grpc.CallOption) (*CreateDatabaseResponse, error) { out := new(CreateDatabaseResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateDatabase", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_CreateDatabase_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -60,7 +79,7 @@ func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequ func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, opts ...grpc.CallOption) (*GetDatabaseResponse, error) { out := new(GetDatabaseResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/GetDatabase", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_GetDatabase_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -69,7 +88,7 @@ func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, o func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, opts ...grpc.CallOption) (*CreateTenantResponse, error) { out := new(CreateTenantResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateTenant", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_CreateTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -78,7 +97,7 @@ func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts ...grpc.CallOption) (*GetTenantResponse, error) { out := new(GetTenantResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/GetTenant", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_GetTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -87,7 +106,7 @@ func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentRequest, opts ...grpc.CallOption) (*CreateSegmentResponse, error) { out := new(CreateSegmentResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateSegment", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_CreateSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -96,7 +115,7 @@ func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentReques func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentRequest, opts ...grpc.CallOption) (*DeleteSegmentResponse, error) { out := new(DeleteSegmentResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/DeleteSegment", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_DeleteSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -105,7 +124,7 @@ func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentReques func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, opts ...grpc.CallOption) (*GetSegmentsResponse, error) { out := new(GetSegmentsResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/GetSegments", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_GetSegments_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -114,7 +133,7 @@ func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, o func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentRequest, opts ...grpc.CallOption) (*UpdateSegmentResponse, error) { out := new(UpdateSegmentResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/UpdateSegment", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_UpdateSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -123,7 +142,7 @@ func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentReques func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) { out := new(CreateCollectionResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/CreateCollection", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_CreateCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -132,7 +151,7 @@ func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollection func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error) { out := new(DeleteCollectionResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/DeleteCollection", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_DeleteCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -141,7 +160,7 @@ func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollection func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequest, opts ...grpc.CallOption) (*GetCollectionsResponse, error) { out := new(GetCollectionsResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/GetCollections", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_GetCollections_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -150,7 +169,7 @@ func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequ func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollectionRequest, opts ...grpc.CallOption) (*UpdateCollectionResponse, error) { out := new(UpdateCollectionResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/UpdateCollection", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_UpdateCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -159,7 +178,7 @@ func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollection func (c *sysDBClient) ResetState(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResetStateResponse, error) { out := new(ResetStateResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/ResetState", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_ResetState_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -168,7 +187,7 @@ func (c *sysDBClient) ResetState(ctx context.Context, in *emptypb.Empty, opts .. func (c *sysDBClient) GetLastCompactionTimeForTenant(ctx context.Context, in *GetLastCompactionTimeForTenantRequest, opts ...grpc.CallOption) (*GetLastCompactionTimeForTenantResponse, error) { out := new(GetLastCompactionTimeForTenantResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/GetLastCompactionTimeForTenant", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_GetLastCompactionTimeForTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -177,7 +196,7 @@ func (c *sysDBClient) GetLastCompactionTimeForTenant(ctx context.Context, in *Ge func (c *sysDBClient) SetLastCompactionTimeForTenant(ctx context.Context, in *SetLastCompactionTimeForTenantRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, "/chroma.SysDB/SetLastCompactionTimeForTenant", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_SetLastCompactionTimeForTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -186,7 +205,7 @@ func (c *sysDBClient) SetLastCompactionTimeForTenant(ctx context.Context, in *Se func (c *sysDBClient) FlushCollectionCompaction(ctx context.Context, in *FlushCollectionCompactionRequest, opts ...grpc.CallOption) (*FlushCollectionCompactionResponse, error) { out := new(FlushCollectionCompactionResponse) - err := c.cc.Invoke(ctx, "/chroma.SysDB/FlushCollectionCompaction", in, out, opts...) + err := c.cc.Invoke(ctx, SysDB_FlushCollectionCompaction_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -291,7 +310,7 @@ func _SysDB_CreateDatabase_Handler(srv interface{}, ctx context.Context, dec fun } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/CreateDatabase", + FullMethod: SysDB_CreateDatabase_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateDatabase(ctx, req.(*CreateDatabaseRequest)) @@ -309,7 +328,7 @@ func _SysDB_GetDatabase_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/GetDatabase", + FullMethod: SysDB_GetDatabase_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetDatabase(ctx, req.(*GetDatabaseRequest)) @@ -327,7 +346,7 @@ func _SysDB_CreateTenant_Handler(srv interface{}, ctx context.Context, dec func( } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/CreateTenant", + FullMethod: SysDB_CreateTenant_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateTenant(ctx, req.(*CreateTenantRequest)) @@ -345,7 +364,7 @@ func _SysDB_GetTenant_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/GetTenant", + FullMethod: SysDB_GetTenant_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetTenant(ctx, req.(*GetTenantRequest)) @@ -363,7 +382,7 @@ func _SysDB_CreateSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/CreateSegment", + FullMethod: SysDB_CreateSegment_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateSegment(ctx, req.(*CreateSegmentRequest)) @@ -381,7 +400,7 @@ func _SysDB_DeleteSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/DeleteSegment", + FullMethod: SysDB_DeleteSegment_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).DeleteSegment(ctx, req.(*DeleteSegmentRequest)) @@ -399,7 +418,7 @@ func _SysDB_GetSegments_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/GetSegments", + FullMethod: SysDB_GetSegments_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetSegments(ctx, req.(*GetSegmentsRequest)) @@ -417,7 +436,7 @@ func _SysDB_UpdateSegment_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/UpdateSegment", + FullMethod: SysDB_UpdateSegment_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).UpdateSegment(ctx, req.(*UpdateSegmentRequest)) @@ -435,7 +454,7 @@ func _SysDB_CreateCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/CreateCollection", + FullMethod: SysDB_CreateCollection_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).CreateCollection(ctx, req.(*CreateCollectionRequest)) @@ -453,7 +472,7 @@ func _SysDB_DeleteCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/DeleteCollection", + FullMethod: SysDB_DeleteCollection_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).DeleteCollection(ctx, req.(*DeleteCollectionRequest)) @@ -471,7 +490,7 @@ func _SysDB_GetCollections_Handler(srv interface{}, ctx context.Context, dec fun } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/GetCollections", + FullMethod: SysDB_GetCollections_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetCollections(ctx, req.(*GetCollectionsRequest)) @@ -489,7 +508,7 @@ func _SysDB_UpdateCollection_Handler(srv interface{}, ctx context.Context, dec f } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/UpdateCollection", + FullMethod: SysDB_UpdateCollection_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).UpdateCollection(ctx, req.(*UpdateCollectionRequest)) @@ -507,7 +526,7 @@ func _SysDB_ResetState_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/ResetState", + FullMethod: SysDB_ResetState_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).ResetState(ctx, req.(*emptypb.Empty)) @@ -525,7 +544,7 @@ func _SysDB_GetLastCompactionTimeForTenant_Handler(srv interface{}, ctx context. } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/GetLastCompactionTimeForTenant", + FullMethod: SysDB_GetLastCompactionTimeForTenant_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).GetLastCompactionTimeForTenant(ctx, req.(*GetLastCompactionTimeForTenantRequest)) @@ -543,7 +562,7 @@ func _SysDB_SetLastCompactionTimeForTenant_Handler(srv interface{}, ctx context. } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/SetLastCompactionTimeForTenant", + FullMethod: SysDB_SetLastCompactionTimeForTenant_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).SetLastCompactionTimeForTenant(ctx, req.(*SetLastCompactionTimeForTenantRequest)) @@ -561,7 +580,7 @@ func _SysDB_FlushCollectionCompaction_Handler(srv interface{}, ctx context.Conte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.SysDB/FlushCollectionCompaction", + FullMethod: SysDB_FlushCollectionCompaction_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SysDBServer).FlushCollectionCompaction(ctx, req.(*FlushCollectionCompactionRequest)) diff --git a/go/pkg/proto/logservicepb/logservice.pb.go b/go/pkg/proto/logservicepb/logservice.pb.go index 491d57956d32..2122524d933c 100644 --- a/go/pkg/proto/logservicepb/logservice.pb.go +++ b/go/pkg/proto/logservicepb/logservice.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v4.25.3 +// protoc-gen-go v1.31.0 +// protoc v4.23.4 // source: chromadb/proto/logservice.proto package logservicepb @@ -131,6 +131,7 @@ type PullLogsRequest struct { CollectionId string `protobuf:"bytes,1,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"` StartFromId int64 `protobuf:"varint,2,opt,name=start_from_id,json=startFromId,proto3" json:"start_from_id,omitempty"` BatchSize int32 `protobuf:"varint,3,opt,name=batch_size,json=batchSize,proto3" json:"batch_size,omitempty"` + EndTimestamp int64 `protobuf:"varint,4,opt,name=end_timestamp,json=endTimestamp,proto3" json:"end_timestamp,omitempty"` } func (x *PullLogsRequest) Reset() { @@ -186,6 +187,13 @@ func (x *PullLogsRequest) GetBatchSize() int32 { return 0 } +func (x *PullLogsRequest) GetEndTimestamp() int64 { + if x != nil { + return x.EndTimestamp + } + return 0 +} + type RecordLog struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -454,64 +462,66 @@ var file_chromadb_proto_logservice_proto_rawDesc = []byte{ 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x35, 0x0a, 0x10, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x79, - 0x0a, 0x0f, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, - 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x62, 0x61, - 0x74, 0x63, 0x68, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, - 0x62, 0x61, 0x74, 0x63, 0x68, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x59, 0x0a, 0x09, 0x52, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x4c, 0x6f, 0x67, 0x12, 0x15, 0x0a, 0x06, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6c, 0x6f, 0x67, 0x49, 0x64, 0x12, 0x35, 0x0a, - 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, - 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x45, 0x6d, 0x62, - 0x65, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, - 0x63, 0x6f, 0x72, 0x64, 0x22, 0x3f, 0x0a, 0x10, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x68, 0x72, 0x6f, - 0x6d, 0x61, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x4c, 0x6f, 0x67, 0x52, 0x07, 0x72, 0x65, - 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x7e, 0x0a, 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, - 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0c, - 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0a, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x64, 0x12, 0x25, - 0x0a, 0x0f, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, 0x5f, 0x74, - 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, - 0x67, 0x49, 0x64, 0x54, 0x73, 0x22, 0x26, 0x0a, 0x24, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, + 0x05, 0x52, 0x0b, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x9e, + 0x01, 0x0a, 0x0f, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x62, + 0x61, 0x74, 0x63, 0x68, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x09, 0x62, 0x61, 0x74, 0x63, 0x68, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x6e, + 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0c, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, + 0x59, 0x0a, 0x09, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x4c, 0x6f, 0x67, 0x12, 0x15, 0x0a, 0x06, + 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6c, 0x6f, + 0x67, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x53, 0x75, 0x62, + 0x6d, 0x69, 0x74, 0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x3f, 0x0a, 0x10, 0x50, 0x75, + 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2b, + 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x11, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x4c, + 0x6f, 0x67, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x7e, 0x0a, 0x0e, 0x43, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x23, 0x0a, + 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, + 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, + 0x6f, 0x67, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0f, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, + 0x67, 0x5f, 0x69, 0x64, 0x5f, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x66, + 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x49, 0x64, 0x54, 0x73, 0x22, 0x26, 0x0a, 0x24, 0x47, + 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, + 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x22, 0x6f, 0x0a, 0x25, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, + 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, + 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x13, + 0x61, 0x6c, 0x6c, 0x5f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, + 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x68, 0x72, 0x6f, + 0x6d, 0x61, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, + 0x6f, 0x52, 0x11, 0x61, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x49, 0x6e, 0x66, 0x6f, 0x32, 0x8e, 0x02, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x3f, 0x0a, 0x08, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x12, + 0x17, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, + 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x3f, 0x0a, 0x08, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, + 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, + 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, + 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x7e, 0x0a, 0x1d, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, - 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x6f, 0x0a, - 0x25, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x13, 0x61, 0x6c, 0x6c, 0x5f, 0x63, 0x6f, - 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x43, 0x6f, 0x6c, - 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x11, 0x61, 0x6c, 0x6c, - 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x32, 0x8e, - 0x02, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3f, 0x0a, - 0x08, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, 0x6f, - 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x73, 0x68, - 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3f, - 0x0a, 0x08, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x17, 0x2e, 0x63, 0x68, 0x72, - 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x50, 0x75, 0x6c, - 0x6c, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x7e, 0x0a, 0x1d, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, - 0x12, 0x2c, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, - 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, - 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, - 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, - 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, - 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, - 0x72, 0x6f, 0x6d, 0x61, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, - 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x6f, - 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x12, 0x2c, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, + 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2e, 0x47, 0x65, + 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, + 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, + 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/pkg/proto/logservicepb/logservice_grpc.pb.go b/go/pkg/proto/logservicepb/logservice_grpc.pb.go index 62d87449a124..7b9895d172ae 100644 --- a/go/pkg/proto/logservicepb/logservice_grpc.pb.go +++ b/go/pkg/proto/logservicepb/logservice_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.3 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/logservice.proto package logservicepb @@ -18,6 +18,12 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + LogService_PushLogs_FullMethodName = "/chroma.LogService/PushLogs" + LogService_PullLogs_FullMethodName = "/chroma.LogService/PullLogs" + LogService_GetAllCollectionInfoToCompact_FullMethodName = "/chroma.LogService/GetAllCollectionInfoToCompact" +) + // LogServiceClient is the client API for LogService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -37,7 +43,7 @@ func NewLogServiceClient(cc grpc.ClientConnInterface) LogServiceClient { func (c *logServiceClient) PushLogs(ctx context.Context, in *PushLogsRequest, opts ...grpc.CallOption) (*PushLogsResponse, error) { out := new(PushLogsResponse) - err := c.cc.Invoke(ctx, "/chroma.LogService/PushLogs", in, out, opts...) + err := c.cc.Invoke(ctx, LogService_PushLogs_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -46,7 +52,7 @@ func (c *logServiceClient) PushLogs(ctx context.Context, in *PushLogsRequest, op func (c *logServiceClient) PullLogs(ctx context.Context, in *PullLogsRequest, opts ...grpc.CallOption) (*PullLogsResponse, error) { out := new(PullLogsResponse) - err := c.cc.Invoke(ctx, "/chroma.LogService/PullLogs", in, out, opts...) + err := c.cc.Invoke(ctx, LogService_PullLogs_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -55,7 +61,7 @@ func (c *logServiceClient) PullLogs(ctx context.Context, in *PullLogsRequest, op func (c *logServiceClient) GetAllCollectionInfoToCompact(ctx context.Context, in *GetAllCollectionInfoToCompactRequest, opts ...grpc.CallOption) (*GetAllCollectionInfoToCompactResponse, error) { out := new(GetAllCollectionInfoToCompactResponse) - err := c.cc.Invoke(ctx, "/chroma.LogService/GetAllCollectionInfoToCompact", in, out, opts...) + err := c.cc.Invoke(ctx, LogService_GetAllCollectionInfoToCompact_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -108,7 +114,7 @@ func _LogService_PushLogs_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.LogService/PushLogs", + FullMethod: LogService_PushLogs_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(LogServiceServer).PushLogs(ctx, req.(*PushLogsRequest)) @@ -126,7 +132,7 @@ func _LogService_PullLogs_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.LogService/PullLogs", + FullMethod: LogService_PullLogs_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(LogServiceServer).PullLogs(ctx, req.(*PullLogsRequest)) @@ -144,7 +150,7 @@ func _LogService_GetAllCollectionInfoToCompact_Handler(srv interface{}, ctx cont } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/chroma.LogService/GetAllCollectionInfoToCompact", + FullMethod: LogService_GetAllCollectionInfoToCompact_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(LogServiceServer).GetAllCollectionInfoToCompact(ctx, req.(*GetAllCollectionInfoToCompactRequest)) diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index eaa5d0fad8ff..e55a8ef331aa 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -18,6 +18,7 @@ message PullLogsRequest { string collection_id = 1; int64 start_from_id = 2; int32 batch_size = 3; + int64 end_timestamp = 4; } message RecordLog { diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs index ba8b9bc6e9cc..9e38f9195055 100644 --- a/rust/worker/src/execution/operators/pull_log.rs +++ b/rust/worker/src/execution/operators/pull_log.rs @@ -77,6 +77,7 @@ impl Operator for PullLogsOperator { input.collection_id.to_string(), input.offset, input.batch_size, + None, ) .await .unwrap(); diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index f49c596d6b34..9ece2deb205e 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -37,6 +37,7 @@ pub(crate) trait Log: Send + Sync + LogClone + Debug { collection_id: String, offset: i64, batch_size: i32, + end_timestamp: Option, ) -> Result>, PullLogsError>; async fn get_collections_with_new_data( @@ -119,11 +120,17 @@ impl Log for GrpcLog { collection_id: String, offset: i64, batch_size: i32, + end_timestamp: Option, ) -> Result>, PullLogsError> { + let end_timestamp = match end_timestamp { + Some(end_timestamp) => end_timestamp, + None => -1, + }; let request = self.client.pull_logs(chroma_proto::PullLogsRequest { - collection_id: collection_id, + collection_id, start_from_id: offset, - batch_size: batch_size, + batch_size, + end_timestamp, }); let response = request.await; match response { @@ -260,6 +267,7 @@ impl Log for InMemoryLog { collection_id: String, offset: i64, batch_size: i32, + end_timestamp: Option, ) -> Result>, PullLogsError> { let logs = self.logs.get(&collection_id).unwrap(); let mut result = Vec::new(); From 51883f37a447ecfe9c1aa181ab3c3135aebb3a16 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Wed, 20 Mar 2024 15:34:30 -0700 Subject: [PATCH 2/5] [ENH] Add query-service server (#1899) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Add a new entrypoint for query-service. Delete the old worker entrypoint that did not have read/write decoupling. - Make dispatcher configurable - Wrapped hnsw orchestrator logic into run() so server is unaware of it - Make server struct {} have the resources it needs - New functionality - Add dynamic creation for log - Add dynamic creation for sysdb - Make server respond to query by using orchestrator ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- rust/worker/Cargo.toml | 6 +- rust/worker/chroma_config.yaml | 4 + rust/worker/src/bin/query_service.rs | 6 + rust/worker/src/bin/worker.rs | 6 - rust/worker/src/config.rs | 17 +++ rust/worker/src/errors.rs | 2 +- rust/worker/src/execution/config.rs | 8 ++ rust/worker/src/execution/dispatcher.rs | 45 +++++--- rust/worker/src/execution/mod.rs | 7 +- rust/worker/src/execution/operator.rs | 4 +- .../src/execution/orchestration/hnsw.rs | 48 +++++++- .../worker/src/execution/orchestration/mod.rs | 2 + rust/worker/src/execution/worker_thread.rs | 13 ++- rust/worker/src/lib.rs | 40 ++++++- rust/worker/src/log/mod.rs | 15 +++ rust/worker/src/server.rs | 104 ++++++++++++++---- rust/worker/src/sysdb/mod.rs | 15 +++ rust/worker/src/sysdb/sysdb.rs | 3 +- rust/worker/src/system/scheduler.rs | 4 +- rust/worker/src/system/system.rs | 5 +- 20 files changed, 289 insertions(+), 65 deletions(-) create mode 100644 rust/worker/src/bin/query_service.rs delete mode 100644 rust/worker/src/bin/worker.rs create mode 100644 rust/worker/src/execution/config.rs diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 080e17031ff7..65ab2a68ff04 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2021" [[bin]] -name = "worker" -path = "src/bin/worker.rs" +name = "query_service" +path = "src/bin/query_service.rs" [dependencies] tonic = "0.10" @@ -46,4 +46,4 @@ proptest-state-machine = "0.1.0" [build-dependencies] tonic-build = "0.10" -cc = "1.0" \ No newline at end of file +cc = "1.0" diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index fa6d41fa0691..151d21287792 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -33,3 +33,7 @@ worker: Grpc: host: "logservice.chroma" port: 50052 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 diff --git a/rust/worker/src/bin/query_service.rs b/rust/worker/src/bin/query_service.rs new file mode 100644 index 000000000000..f3cfa4c8282b --- /dev/null +++ b/rust/worker/src/bin/query_service.rs @@ -0,0 +1,6 @@ +use worker::query_service_entrypoint; + +#[tokio::main] +async fn main() { + query_service_entrypoint().await; +} diff --git a/rust/worker/src/bin/worker.rs b/rust/worker/src/bin/worker.rs deleted file mode 100644 index 16428d244ff5..000000000000 --- a/rust/worker/src/bin/worker.rs +++ /dev/null @@ -1,6 +0,0 @@ -use worker::worker_entrypoint; - -#[tokio::main] -async fn main() { - worker_entrypoint().await; -} diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 309155bfb9d9..637c5cac98a5 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -110,6 +110,7 @@ pub(crate) struct WorkerConfig { pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig, pub(crate) storage: crate::storage::config::StorageConfig, pub(crate) log: crate::log::config::LogConfig, + pub(crate) dispatcher: crate::execution::config::DispatcherConfig, } /// # Description @@ -165,6 +166,10 @@ mod tests { Grpc: host: "localhost" port: 50052 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 "#, ); let config = RootConfig::load(); @@ -213,6 +218,10 @@ mod tests { Grpc: host: "localhost" port: 50052 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 "#, ); @@ -277,6 +286,10 @@ mod tests { Grpc: host: "localhost" port: 50052 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 "#, ); let config = RootConfig::load(); @@ -321,6 +334,10 @@ mod tests { Grpc: host: "localhost" port: 50052 + dispatcher: + num_worker_threads: 4 + dispatcher_queue_size: 100 + worker_queue_size: 100 "#, ); let config = RootConfig::load(); diff --git a/rust/worker/src/errors.rs b/rust/worker/src/errors.rs index 18365cb789f8..968dbaeeb175 100644 --- a/rust/worker/src/errors.rs +++ b/rust/worker/src/errors.rs @@ -42,6 +42,6 @@ pub(crate) enum ErrorCodes { DataLoss = 15, } -pub(crate) trait ChromaError: Error { +pub(crate) trait ChromaError: Error + Send { fn code(&self) -> ErrorCodes; } diff --git a/rust/worker/src/execution/config.rs b/rust/worker/src/execution/config.rs new file mode 100644 index 000000000000..d8550dc41bcc --- /dev/null +++ b/rust/worker/src/execution/config.rs @@ -0,0 +1,8 @@ +use serde::Deserialize; + +#[derive(Deserialize)] +pub(crate) struct DispatcherConfig { + pub(crate) num_worker_threads: usize, + pub(crate) dispatcher_queue_size: usize, + pub(crate) worker_queue_size: usize, +} diff --git a/rust/worker/src/execution/dispatcher.rs b/rust/worker/src/execution/dispatcher.rs index 1fe94b255c18..8a25e0b26fe4 100644 --- a/rust/worker/src/execution/dispatcher.rs +++ b/rust/worker/src/execution/dispatcher.rs @@ -1,5 +1,9 @@ use super::{operator::TaskMessage, worker_thread::WorkerThread}; -use crate::system::{Component, ComponentContext, Handler, Receiver, System}; +use crate::{ + config::{Configurable, WorkerConfig}, + errors::ChromaError, + system::{Component, ComponentContext, Handler, Receiver, System}, +}; use async_trait::async_trait; use std::fmt::Debug; @@ -46,21 +50,27 @@ use std::fmt::Debug; coarser work-stealing, and other optimizations. */ #[derive(Debug)] -struct Dispatcher { +pub(crate) struct Dispatcher { task_queue: Vec, waiters: Vec, n_worker_threads: usize, + queue_size: usize, + worker_queue_size: usize, } impl Dispatcher { /// Create a new dispatcher /// # Parameters /// - n_worker_threads: The number of worker threads to use - pub fn new(n_worker_threads: usize) -> Self { + /// - queue_size: The size of the components message queue + /// - worker_queue_size: The size of the worker components queue + pub fn new(n_worker_threads: usize, queue_size: usize, worker_queue_size: usize) -> Self { Dispatcher { task_queue: Vec::new(), waiters: Vec::new(), n_worker_threads, + queue_size, + worker_queue_size, } } @@ -74,7 +84,7 @@ impl Dispatcher { self_receiver: Box>, ) { for _ in 0..self.n_worker_threads { - let worker = WorkerThread::new(self_receiver.clone()); + let worker = WorkerThread::new(self_receiver.clone(), self.worker_queue_size); system.start_component(worker); } } @@ -118,6 +128,17 @@ impl Dispatcher { } } +#[async_trait] +impl Configurable for Dispatcher { + async fn try_from_config(worker_config: &WorkerConfig) -> Result> { + Ok(Dispatcher::new( + worker_config.dispatcher.num_worker_threads, + worker_config.dispatcher.dispatcher_queue_size, + worker_config.dispatcher.worker_queue_size, + )) + } +} + /// A message that a worker thread sends to the dispatcher to request a task /// # Members /// - reply_to: The receiver to send the task to, this is the worker thread @@ -141,7 +162,7 @@ impl TaskRequestMessage { #[async_trait] impl Component for Dispatcher { fn queue_size(&self) -> usize { - 1000 // TODO: make configurable + self.queue_size } async fn on_start(&mut self, ctx: &ComponentContext) { @@ -166,19 +187,15 @@ impl Handler for Dispatcher { #[cfg(test)] mod tests { - use std::{ - env::current_dir, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - }; - use super::*; use crate::{ execution::operator::{wrap, Operator}, system::System, }; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; // Create a component that will schedule DISPATCH_COUNT invocations of the MockOperator // on an interval of DISPATCH_FREQUENCY_MS. @@ -249,7 +266,7 @@ mod tests { #[tokio::test] async fn test_dispatcher() { let mut system = System::new(); - let dispatcher = Dispatcher::new(THREAD_COUNT); + let dispatcher = Dispatcher::new(THREAD_COUNT, 1000, 1000); let dispatcher_handle = system.start_component(dispatcher); let counter = Arc::new(AtomicUsize::new(0)); let dispatch_user = MockDispatchUser { diff --git a/rust/worker/src/execution/mod.rs b/rust/worker/src/execution/mod.rs index 3bd82a311ab7..0000e23f3a3c 100644 --- a/rust/worker/src/execution/mod.rs +++ b/rust/worker/src/execution/mod.rs @@ -1,5 +1,6 @@ -mod dispatcher; -mod operator; +pub(crate) mod config; +pub(crate) mod dispatcher; +pub(crate) mod operator; mod operators; -mod orchestration; +pub(crate) mod orchestration; mod worker_thread; diff --git a/rust/worker/src/execution/operator.rs b/rust/worker/src/execution/operator.rs index 10f7321684f1..85baa7d8c7db 100644 --- a/rust/worker/src/execution/operator.rs +++ b/rust/worker/src/execution/operator.rs @@ -27,12 +27,12 @@ where } /// A message type used by the dispatcher to send tasks to worker threads. -pub(super) type TaskMessage = Box; +pub(crate) type TaskMessage = Box; /// A task wrapper is a trait that can be used to run a task. We use it to /// erase the I, O types from the Task struct so that tasks. #[async_trait] -pub(super) trait TaskWrapper: Send + Debug { +pub(crate) trait TaskWrapper: Send + Debug { async fn run(&self); } diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index 7dd2aebb7bb1..d25862bc9e11 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -1,12 +1,16 @@ use super::super::operator::{wrap, TaskMessage}; use super::super::operators::pull_log::{PullLogsInput, PullLogsOperator, PullLogsOutput}; +use crate::errors::ChromaError; use crate::sysdb::sysdb::SysDb; +use crate::system::System; +use crate::types::VectorQueryResult; use crate::{ log::log::Log, system::{Component, Handler, Receiver}, }; use async_trait::async_trait; -use std::fmt::{self, Debug, Formatter}; +use num_bigint::BigInt; +use std::fmt::Debug; use uuid::Uuid; /** The state of the orchestrator. @@ -35,8 +39,10 @@ enum ExecutionState { } #[derive(Debug)] -struct HnswQueryOrchestrator { +pub(crate) struct HnswQueryOrchestrator { state: ExecutionState, + // Component Execution + system: System, // Query state query_vectors: Vec>, k: i32, @@ -46,10 +52,15 @@ struct HnswQueryOrchestrator { log: Box, sysdb: Box, dispatcher: Box>, + // Result channel + result_channel: Option< + tokio::sync::oneshot::Sender>, Box>>, + >, } impl HnswQueryOrchestrator { - pub fn new( + pub(crate) fn new( + system: System, query_vectors: Vec>, k: i32, include_embeddings: bool, @@ -60,6 +71,7 @@ impl HnswQueryOrchestrator { ) -> Self { HnswQueryOrchestrator { state: ExecutionState::Pending, + system, query_vectors, k, include_embeddings, @@ -67,6 +79,7 @@ impl HnswQueryOrchestrator { log, sysdb, dispatcher, + result_channel: None, } } @@ -108,6 +121,19 @@ impl HnswQueryOrchestrator { } } } + + /// Run the orchestrator and return the result. + /// # Note + /// Use this over spawning the component directly. This method will start the component and + /// wait for it to finish before returning the result. + pub(crate) async fn run(mut self) -> Result>, Box> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.result_channel = Some(tx); + let mut handle = self.system.clone().start_component(self); + let result = rx.await; + handle.stop(); + result.unwrap() + } } // ============== Component Implementation ============== @@ -133,6 +159,22 @@ impl Handler for HnswQueryOrchestrator { ctx: &crate::system::ComponentContext, ) { self.state = ExecutionState::Dedupe; + // TODO: implement the remaining state transitions and operators + // This is an example of the final state transition and result + + match self.result_channel.take() { + Some(tx) => { + let _ = tx.send(Ok(vec![vec![VectorQueryResult { + id: "abc".to_string(), + seq_id: BigInt::from(0), + distance: 0.0, + vector: Some(vec![0.0, 0.0, 0.0]), + }]])); + } + None => { + // Log an error + } + } } } diff --git a/rust/worker/src/execution/orchestration/mod.rs b/rust/worker/src/execution/orchestration/mod.rs index e0c45e2e87c2..902c3eaf84dd 100644 --- a/rust/worker/src/execution/orchestration/mod.rs +++ b/rust/worker/src/execution/orchestration/mod.rs @@ -1 +1,3 @@ mod hnsw; + +pub(crate) use hnsw::*; diff --git a/rust/worker/src/execution/worker_thread.rs b/rust/worker/src/execution/worker_thread.rs index 7a5c0fcbe92b..d651a725d34f 100644 --- a/rust/worker/src/execution/worker_thread.rs +++ b/rust/worker/src/execution/worker_thread.rs @@ -9,11 +9,18 @@ use std::fmt::{Debug, Formatter, Result}; /// - The actor loop will block until work is available pub(super) struct WorkerThread { dispatcher: Box>, + queue_size: usize, } impl WorkerThread { - pub(super) fn new(dispatcher: Box>) -> Self { - WorkerThread { dispatcher } + pub(super) fn new( + dispatcher: Box>, + queue_size: usize, + ) -> WorkerThread { + WorkerThread { + dispatcher, + queue_size, + } } } @@ -26,7 +33,7 @@ impl Debug for WorkerThread { #[async_trait] impl Component for WorkerThread { fn queue_size(&self) -> usize { - 1000 // TODO: make configurable + self.queue_size } fn runtime() -> ComponentRuntime { diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 0af68c5e06cc..1cb31b3f3b2a 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -15,15 +15,50 @@ mod sysdb; mod system; mod types; +use crate::sysdb::sysdb::SysDb; use config::Configurable; use memberlist::MemberlistProvider; -use crate::sysdb::sysdb::SysDb; - mod chroma_proto { tonic::include_proto!("chroma"); } +pub async fn query_service_entrypoint() { + let config = config::RootConfig::load(); + let system: system::System = system::System::new(); + let segment_manager = match segment::SegmentManager::try_from_config(&config.worker).await { + Ok(segment_manager) => segment_manager, + Err(err) => { + println!("Failed to create segment manager component: {:?}", err); + return; + } + }; + let dispatcher = match execution::dispatcher::Dispatcher::try_from_config(&config.worker).await + { + Ok(dispatcher) => dispatcher, + Err(err) => { + println!("Failed to create dispatcher component: {:?}", err); + return; + } + }; + let mut dispatcher_handle = system.start_component(dispatcher); + let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await { + Ok(worker_server) => worker_server, + Err(err) => { + println!("Failed to create worker server component: {:?}", err); + return; + } + }; + worker_server.set_segment_manager(segment_manager.clone()); + worker_server.set_dispatcher(dispatcher_handle.receiver()); + + let server_join_handle = tokio::spawn(async move { + crate::server::WorkerServer::run(worker_server).await; + }); + + let _ = tokio::join!(server_join_handle, dispatcher_handle.join()); +} + pub async fn worker_entrypoint() { let config = config::RootConfig::load(); // Create all the core components and start them @@ -103,5 +138,6 @@ pub async fn worker_entrypoint() { ingest_handle.join(), memberlist_handle.join(), scheduler_handler.join(), + server_join_handle, ); } diff --git a/rust/worker/src/log/mod.rs b/rust/worker/src/log/mod.rs index c7873c00ce91..cd769734c488 100644 --- a/rust/worker/src/log/mod.rs +++ b/rust/worker/src/log/mod.rs @@ -1,2 +1,17 @@ pub(crate) mod config; pub(crate) mod log; + +use crate::{ + config::{Configurable, WorkerConfig}, + errors::ChromaError, +}; + +pub(crate) async fn from_config( + config: &WorkerConfig, +) -> Result, Box> { + match &config.log { + crate::log::config::LogConfig::Grpc(_) => { + Ok(Box::new(log::GrpcLog::try_from_config(config).await?)) + } + } +} diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index 1ecc6ba2e705..205a51b6a976 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -1,28 +1,53 @@ -use std::f32::consts::E; - use crate::chroma_proto; use crate::chroma_proto::{ GetVectorsRequest, GetVectorsResponse, QueryVectorsRequest, QueryVectorsResponse, }; use crate::config::{Configurable, WorkerConfig}; use crate::errors::ChromaError; +use crate::execution::operator::TaskMessage; +use crate::execution::orchestration::HnswQueryOrchestrator; +use crate::log::log::Log; use crate::segment::SegmentManager; +use crate::sysdb::sysdb::SysDb; +use crate::system::{Receiver, System}; use crate::types::ScalarEncoding; use async_trait::async_trait; -use kube::core::request; use tonic::{transport::Server, Request, Response, Status}; use uuid::Uuid; pub struct WorkerServer { + // System + system: Option, + // Component dependencies segment_manager: Option, + dispatcher: Option>>, + // Service dependencies + log: Box, + sysdb: Box, port: u16, } #[async_trait] impl Configurable for WorkerServer { async fn try_from_config(config: &WorkerConfig) -> Result> { + let sysdb = match crate::sysdb::from_config(&config).await { + Ok(sysdb) => sysdb, + Err(err) => { + return Err(err); + } + }; + let log = match crate::log::from_config(&config).await { + Ok(log) => log, + Err(err) => { + return Err(err); + } + }; Ok(WorkerServer { segment_manager: None, + dispatcher: None, + system: None, + sysdb, + log, port: config.my_port, }) } @@ -46,6 +71,14 @@ impl WorkerServer { pub(crate) fn set_segment_manager(&mut self, segment_manager: SegmentManager) { self.segment_manager = Some(segment_manager); } + + pub(crate) fn set_dispatcher(&mut self, dispatcher: Box>) { + self.dispatcher = Some(dispatcher); + } + + pub(crate) fn set_system(&mut self, system: System) { + self.system = Some(system); + } } #[tonic::async_trait] @@ -126,6 +159,8 @@ impl chroma_proto::vector_reader_server::VectorReader for WorkerServer { }; let mut proto_results_for_all = Vec::new(); + + let mut query_vectors = Vec::new(); for proto_query_vector in request.vectors { let (query_vector, encoding) = match proto_query_vector.try_into() { Ok((vector, encoding)) => (vector, encoding), @@ -133,31 +168,58 @@ impl chroma_proto::vector_reader_server::VectorReader for WorkerServer { return Err(Status::internal(format!("Error converting vector: {}", e))); } }; + query_vectors.push(query_vector); + } + + let dispatcher = match self.dispatcher { + Some(ref dispatcher) => dispatcher, + None => { + return Err(Status::internal("No dispatcher found")); + } + }; - let results = match segment_manager - .query_vector( - &segment_uuid, - &query_vector, - request.k as usize, + let result = match self.system { + Some(ref system) => { + let orchestrator = HnswQueryOrchestrator::new( + // TODO: Should not have to clone query vectors here + system.clone(), + query_vectors.clone(), + request.k, request.include_embeddings, - ) - .await - { - Ok(results) => results, - Err(e) => { - return Err(Status::internal(format!("Error querying segment: {}", e))); - } - }; + segment_uuid, + self.log.clone(), + self.sysdb.clone(), + dispatcher.clone(), + ); + orchestrator.run().await + } + None => { + return Err(Status::internal("No system found")); + } + }; + let result = match result { + Ok(result) => result, + Err(e) => { + return Err(Status::internal(format!( + "Error running orchestrator: {}", + e + ))); + } + }; + + for result_set in result { let mut proto_results = Vec::new(); - for query_result in results { + for query_result in result_set { let proto_result = chroma_proto::VectorQueryResult { id: query_result.id, seq_id: query_result.seq_id.to_bytes_le().1, distance: query_result.distance, vector: match query_result.vector { Some(vector) => { - match (vector, ScalarEncoding::FLOAT32, query_vector.len()).try_into() { + match (vector, ScalarEncoding::FLOAT32, query_vectors[0].len()) + .try_into() + { Ok(proto_vector) => Some(proto_vector), Err(e) => { return Err(Status::internal(format!( @@ -172,11 +234,9 @@ impl chroma_proto::vector_reader_server::VectorReader for WorkerServer { }; proto_results.push(proto_result); } - - let vector_query_results = chroma_proto::VectorQueryResults { + proto_results_for_all.push(chroma_proto::VectorQueryResults { results: proto_results, - }; - proto_results_for_all.push(vector_query_results); + }); } let resp = chroma_proto::QueryVectorsResponse { diff --git a/rust/worker/src/sysdb/mod.rs b/rust/worker/src/sysdb/mod.rs index 1db5510f8938..770fa5cc2084 100644 --- a/rust/worker/src/sysdb/mod.rs +++ b/rust/worker/src/sysdb/mod.rs @@ -1,2 +1,17 @@ pub(crate) mod config; pub(crate) mod sysdb; + +use crate::{ + config::{Configurable, WorkerConfig}, + errors::ChromaError, +}; + +pub(crate) async fn from_config( + config: &WorkerConfig, +) -> Result, Box> { + match &config.sysdb { + crate::sysdb::config::SysDbConfig::Grpc(_) => { + Ok(Box::new(sysdb::GrpcSysDb::try_from_config(config).await?)) + } + } +} diff --git a/rust/worker/src/sysdb/sysdb.rs b/rust/worker/src/sysdb/sysdb.rs index 450761e88963..990268e66ec8 100644 --- a/rust/worker/src/sysdb/sysdb.rs +++ b/rust/worker/src/sysdb/sysdb.rs @@ -1,3 +1,4 @@ +use super::config::SysDbConfig; use crate::chroma_proto; use crate::config::{Configurable, WorkerConfig}; use crate::types::{CollectionConversionError, SegmentConversionError}; @@ -11,8 +12,6 @@ use std::fmt::Debug; use thiserror::Error; use uuid::Uuid; -use super::config::SysDbConfig; - const DEFAULT_DATBASE: &str = "default_database"; const DEFAULT_TENANT: &str = "default_tenant"; diff --git a/rust/worker/src/system/scheduler.rs b/rust/worker/src/system/scheduler.rs index f69d8fa3450e..aab82e5d0982 100644 --- a/rust/worker/src/system/scheduler.rs +++ b/rust/worker/src/system/scheduler.rs @@ -1,6 +1,5 @@ use parking_lot::RwLock; use std::fmt::Debug; -use std::num; use std::sync::Arc; use std::time::Duration; use tokio::select; @@ -10,12 +9,13 @@ use super::{ executor::ComponentExecutor, sender::Sender, system::System, Receiver, ReceiverImpl, Wrapper, }; +#[derive(Debug)] pub(crate) struct SchedulerTaskHandle { join_handle: Option>, cancel: tokio_util::sync::CancellationToken, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct Scheduler { handles: Arc>>, } diff --git a/rust/worker/src/system/system.rs b/rust/worker/src/system/system.rs index 42c7c5650468..0d9f47386253 100644 --- a/rust/worker/src/system/system.rs +++ b/rust/worker/src/system/system.rs @@ -10,11 +10,12 @@ use std::sync::Arc; use tokio::runtime::Builder; use tokio::{pin, select}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct System { inner: Arc, } +#[derive(Debug)] struct Inner { scheduler: Scheduler, } @@ -28,7 +29,7 @@ impl System { } } - pub(crate) fn start_component(&mut self, component: C) -> ComponentHandle + pub(crate) fn start_component(&self, component: C) -> ComponentHandle where C: Component + Send + 'static, { From 976aee3875993b6573f5f06eb9cd7c61b38f6f4a Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Wed, 20 Mar 2024 15:36:01 -0700 Subject: [PATCH 3/5] [BLD] Auto rebuild protos if they change in rust (#1902) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Auto rebuild protos if they change in rust - New functionality - None ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- rust/worker/build.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/rust/worker/build.rs b/rust/worker/build.rs index be6fc45b7bcb..c98a96de683a 100644 --- a/rust/worker/build.rs +++ b/rust/worker/build.rs @@ -1,13 +1,15 @@ fn main() -> Result<(), Box> { // Compile the protobuf files in the chromadb proto directory. - tonic_build::configure().compile( - &[ - "../../idl/chromadb/proto/chroma.proto", - "../../idl/chromadb/proto/coordinator.proto", - "../../idl/chromadb/proto/logservice.proto", - ], - &["../../idl/"], - )?; + tonic_build::configure() + .emit_rerun_if_changed(true) + .compile( + &[ + "../../idl/chromadb/proto/chroma.proto", + "../../idl/chromadb/proto/coordinator.proto", + "../../idl/chromadb/proto/logservice.proto", + ], + &["../../idl/"], + )?; // Compile the hnswlib bindings. cc::Build::new() From 3985032d6c7cb0b691aeff56f1367bb5e7bbfa1b Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Wed, 20 Mar 2024 15:43:33 -0700 Subject: [PATCH 4/5] [ENH] Operator/Executor error handling (#1903) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Allows errors to be propagated from operators back to the original caller of the task. - New functionality - None ## Test plan *How are these changes tested?* Compile checks show that the error propagates as expected. - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- rust/worker/src/errors.rs | 1 - rust/worker/src/execution/dispatcher.rs | 13 +++++++--- rust/worker/src/execution/operator.rs | 21 +++++++++------ .../src/execution/operators/pull_log.rs | 16 ++++++++---- .../src/execution/orchestration/hnsw.rs | 26 +++++++++++++------ 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/rust/worker/src/errors.rs b/rust/worker/src/errors.rs index 968dbaeeb175..086b938f2657 100644 --- a/rust/worker/src/errors.rs +++ b/rust/worker/src/errors.rs @@ -1,7 +1,6 @@ // Defines 17 standard error codes based on the error codes defined in the // gRPC spec. https://grpc.github.io/grpc/core/md_doc_statuscodes.html // Custom errors can use these codes in order to allow for generic handling - use std::error::Error; #[derive(PartialEq, Debug)] diff --git a/rust/worker/src/execution/dispatcher.rs b/rust/worker/src/execution/dispatcher.rs index 8a25e0b26fe4..b1668b1c60a2 100644 --- a/rust/worker/src/execution/dispatcher.rs +++ b/rust/worker/src/execution/dispatcher.rs @@ -210,13 +210,14 @@ mod tests { struct MockOperator {} #[async_trait] impl Operator for MockOperator { - async fn run(&self, input: &f32) -> String { + type Error = (); + async fn run(&self, input: &f32) -> Result { // sleep to simulate work tokio::time::sleep(tokio::time::Duration::from_millis( MOCK_OPERATOR_SLEEP_DURATION_MS, )) .await; - input.to_string() + Ok(input.to_string()) } } @@ -244,8 +245,12 @@ mod tests { } } #[async_trait] - impl Handler for MockDispatchUser { - async fn handle(&mut self, message: String, ctx: &ComponentContext) { + impl Handler> for MockDispatchUser { + async fn handle( + &mut self, + message: Result, + ctx: &ComponentContext, + ) { self.counter.fetch_add(1, Ordering::SeqCst); let curr_count = self.counter.load(Ordering::SeqCst); // Cancel self diff --git a/rust/worker/src/execution/operator.rs b/rust/worker/src/execution/operator.rs index 85baa7d8c7db..935c01eb16ec 100644 --- a/rust/worker/src/execution/operator.rs +++ b/rust/worker/src/execution/operator.rs @@ -10,20 +10,23 @@ where I: Send + Sync, O: Send + Sync, { - async fn run(&self, input: &I) -> O; + type Error; + // It would have been nice to do this with a default trait for result + // but that's not stable in rust yet. + async fn run(&self, input: &I) -> Result; } /// A task is a wrapper around an operator and its input. /// It is a description of a function to be run. #[derive(Debug)] -struct Task +struct Task where Input: Send + Sync + Debug, Output: Send + Sync + Debug, { - operator: Box>, + operator: Box>, input: Input, - reply_channel: Box>, + reply_channel: Box>>, } /// A message type used by the dispatcher to send tasks to worker threads. @@ -40,8 +43,9 @@ pub(crate) trait TaskWrapper: Send + Debug { /// erase the I, O types from the Task struct so that tasks can be /// stored in a homogenous queue regardless of their input and output types. #[async_trait] -impl TaskWrapper for Task +impl TaskWrapper for Task where + Error: Debug, Input: Send + Sync + Debug, Output: Send + Sync + Debug, { @@ -53,12 +57,13 @@ where } /// Wrap an operator and its input into a task message. -pub(super) fn wrap( - operator: Box>, +pub(super) fn wrap( + operator: Box>, input: Input, - reply_channel: Box>, + reply_channel: Box>>, ) -> TaskMessage where + Error: Debug + 'static, Input: Send + Sync + Debug + 'static, Output: Send + Sync + Debug + 'static, { diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs index 9e38f9195055..7fb150fd34c4 100644 --- a/rust/worker/src/execution/operators/pull_log.rs +++ b/rust/worker/src/execution/operators/pull_log.rs @@ -1,4 +1,8 @@ -use crate::{execution::operator::Operator, log::log::Log, types::EmbeddingRecord}; +use crate::{ + execution::operator::Operator, + log::log::{Log, PullLogsError}, + types::EmbeddingRecord, +}; use async_trait::async_trait; use uuid::Uuid; @@ -66,9 +70,12 @@ impl PullLogsOutput { } } +pub type PullLogsResult = Result; + #[async_trait] impl Operator for PullLogsOperator { - async fn run(&self, input: &PullLogsInput) -> PullLogsOutput { + type Error = PullLogsError; + async fn run(&self, input: &PullLogsInput) -> PullLogsResult { // We expect the log to be cheaply cloneable, we need to clone it since we need // a mutable reference to it. Not necessarily the best, but it works for our needs. let mut client_clone = self.client.clone(); @@ -79,8 +86,7 @@ impl Operator for PullLogsOperator { input.batch_size, None, ) - .await - .unwrap(); - PullLogsOutput::new(logs) + .await?; + Ok(PullLogsOutput::new(logs)) } } diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index d25862bc9e11..35c4134c940c 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -1,6 +1,8 @@ use super::super::operator::{wrap, TaskMessage}; use super::super::operators::pull_log::{PullLogsInput, PullLogsOperator, PullLogsOutput}; use crate::errors::ChromaError; +use crate::execution::operators::pull_log::PullLogsResult; +use crate::log::log::PullLogsError; use crate::sysdb::sysdb::SysDb; use crate::system::System; use crate::types::VectorQueryResult; @@ -102,7 +104,7 @@ impl HnswQueryOrchestrator { } } - async fn pull_logs(&mut self, self_address: Box>) { + async fn pull_logs(&mut self, self_address: Box>) { self.state = ExecutionState::PullLogs; let operator = PullLogsOperator::new(self.log.clone()); let collection_id = match self.get_collection_id_for_segment_id(self.segment_id).await { @@ -152,10 +154,10 @@ impl Component for HnswQueryOrchestrator { // ============== Handlers ============== #[async_trait] -impl Handler for HnswQueryOrchestrator { +impl Handler for HnswQueryOrchestrator { async fn handle( &mut self, - message: PullLogsOutput, + message: PullLogsResult, ctx: &crate::system::ComponentContext, ) { self.state = ExecutionState::Dedupe; @@ -163,17 +165,25 @@ impl Handler for HnswQueryOrchestrator { // TODO: implement the remaining state transitions and operators // This is an example of the final state transition and result - match self.result_channel.take() { - Some(tx) => { - let _ = tx.send(Ok(vec![vec![VectorQueryResult { + let result_channel = match self.result_channel.take() { + Some(tx) => tx, + None => { + // Log an error + return; + } + }; + + match message { + Ok(logs) => { + let _ = result_channel.send(Ok(vec![vec![VectorQueryResult { id: "abc".to_string(), seq_id: BigInt::from(0), distance: 0.0, vector: Some(vec![0.0, 0.0, 0.0]), }]])); } - None => { - // Log an error + Err(e) => { + let _ = result_channel.send(Err(Box::new(e))); } } } From e6f3aec9d37cc34f9b820e23737b0817a96a2029 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 21 Mar 2024 01:21:18 -0700 Subject: [PATCH 5/5] [ENH] Implement the PullLog operator (#1906) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - ... - New functionality - This PR implements the PullLog operator ## Test plan *How are these changes tested?* - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- .../src/execution/operators/pull_log.rs | 171 +++++++++++++++++- .../src/execution/orchestration/hnsw.rs | 16 +- rust/worker/src/log/log.rs | 12 +- 3 files changed, 185 insertions(+), 14 deletions(-) diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs index 7fb150fd34c4..b172ebfe606c 100644 --- a/rust/worker/src/execution/operators/pull_log.rs +++ b/rust/worker/src/execution/operators/pull_log.rs @@ -26,11 +26,15 @@ impl PullLogsOperator { /// * `collection_id` - The collection id to read logs from. /// * `offset` - The offset to start reading logs from. /// * `batch_size` - The number of log entries to read. +/// * `num_records` - The maximum number of records to read. +/// * `end_timestamp` - The end timestamp to read logs until. #[derive(Debug)] pub struct PullLogsInput { collection_id: Uuid, offset: i64, batch_size: i32, + num_records: Option, + end_timestamp: Option, } impl PullLogsInput { @@ -39,11 +43,21 @@ impl PullLogsInput { /// * `collection_id` - The collection id to read logs from. /// * `offset` - The offset to start reading logs from. /// * `batch_size` - The number of log entries to read. - pub fn new(collection_id: Uuid, offset: i64, batch_size: i32) -> Self { + /// * `num_records` - The maximum number of records to read. + /// * `end_timestamp` - The end timestamp to read logs until. + pub fn new( + collection_id: Uuid, + offset: i64, + batch_size: i32, + num_records: Option, + end_timestamp: Option, + ) -> Self { PullLogsInput { collection_id, offset, batch_size, + num_records, + end_timestamp, } } } @@ -75,18 +89,155 @@ pub type PullLogsResult = Result; #[async_trait] impl Operator for PullLogsOperator { type Error = PullLogsError; + async fn run(&self, input: &PullLogsInput) -> PullLogsResult { // We expect the log to be cheaply cloneable, we need to clone it since we need // a mutable reference to it. Not necessarily the best, but it works for our needs. let mut client_clone = self.client.clone(); - let logs = client_clone - .read( - input.collection_id.to_string(), - input.offset, - input.batch_size, - None, - ) - .await?; - Ok(PullLogsOutput::new(logs)) + let batch_size = input.batch_size; + let mut num_records_read = 0; + let mut offset = input.offset; + let mut result = Vec::new(); + loop { + let logs = client_clone + .read( + input.collection_id.to_string(), + offset, + batch_size, + input.end_timestamp, + ) + .await; + + let mut logs = match logs { + Ok(logs) => logs, + Err(e) => { + return Err(e); + } + }; + + if logs.is_empty() { + break; + } + + num_records_read += logs.len(); + offset += batch_size as i64; + result.append(&mut logs); + + if input.num_records.is_some() + && num_records_read >= input.num_records.unwrap() as usize + { + break; + } + } + if input.num_records.is_some() && result.len() > input.num_records.unwrap() as usize { + result.truncate(input.num_records.unwrap() as usize); + } + Ok(PullLogsOutput::new(result)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::log::log::InMemoryLog; + use crate::log::log::LogRecord; + use crate::types::EmbeddingRecord; + use crate::types::Operation; + use num_bigint::BigInt; + use std::str::FromStr; + use uuid::Uuid; + + #[tokio::test] + async fn test_pull_logs() { + let mut log = Box::new(InMemoryLog::new()); + + let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap(); + let collection_id_1 = collection_uuid_1.to_string(); + log.add_log( + collection_id_1.clone(), + Box::new(LogRecord { + collection_id: collection_id_1.clone(), + log_id: 1, + log_id_ts: 1, + record: Box::new(EmbeddingRecord { + id: "embedding_id_1".to_string(), + seq_id: BigInt::from(1), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + collection_id: collection_uuid_1, + }), + }), + ); + log.add_log( + collection_id_1.clone(), + Box::new(LogRecord { + collection_id: collection_id_1.clone(), + log_id: 2, + log_id_ts: 2, + record: Box::new(EmbeddingRecord { + id: "embedding_id_2".to_string(), + seq_id: BigInt::from(2), + embedding: None, + encoding: None, + metadata: None, + operation: Operation::Add, + collection_id: collection_uuid_1, + }), + }), + ); + + let operator = PullLogsOperator::new(log); + + // Pull all logs from collection 1 + let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, None); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 2); + + // Pull all logs from collection 1 with a large batch size + let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, None); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 2); + + // Pull logs from collection 1 with a limit + let input = PullLogsInput::new(collection_uuid_1, 0, 1, Some(1), None); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); + + // Pull logs from collection 1 with an end timestamp + let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, Some(1)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); + + // Pull logs from collection 1 with an end timestamp + let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, Some(2)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 2); + + // Pull logs from collection 1 with an end timestamp and a limit + let input = PullLogsInput::new(collection_uuid_1, 0, 1, Some(1), Some(2)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); + + // Pull logs from collection 1 with a limit and a large batch size + let input = PullLogsInput::new(collection_uuid_1, 0, 100, Some(1), None); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); + + // Pull logs from collection 1 with an end timestamp and a large batch size + let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, Some(1)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); + + // Pull logs from collection 1 with an end timestamp and a large batch size + let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, Some(2)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 2); + + // Pull logs from collection 1 with an end timestamp and a limit and a large batch size + let input = PullLogsInput::new(collection_uuid_1, 0, 100, Some(1), Some(2)); + let output = operator.run(&input).await.unwrap(); + assert_eq!(output.logs().len(), 1); } } diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index 35c4134c940c..f583506ad1f2 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -2,7 +2,6 @@ use super::super::operator::{wrap, TaskMessage}; use super::super::operators::pull_log::{PullLogsInput, PullLogsOperator, PullLogsOutput}; use crate::errors::ChromaError; use crate::execution::operators::pull_log::PullLogsResult; -use crate::log::log::PullLogsError; use crate::sysdb::sysdb::SysDb; use crate::system::System; use crate::types::VectorQueryResult; @@ -13,6 +12,8 @@ use crate::{ use async_trait::async_trait; use num_bigint::BigInt; use std::fmt::Debug; +use std::fmt::Formatter; +use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; /** The state of the orchestrator. @@ -114,7 +115,16 @@ impl HnswQueryOrchestrator { return; } }; - let input = PullLogsInput::new(collection_id, 0, 100); + let end_timestamp = SystemTime::now().duration_since(UNIX_EPOCH); + let end_timestamp = match end_timestamp { + // TODO: change protobuf definition to use u64 instead of i64 + Ok(end_timestamp) => end_timestamp.as_secs() as i64, + Err(e) => { + // Log an error and reply + return + return; + } + }; + let input = PullLogsInput::new(collection_id, 0, 100, None, Some(end_timestamp)); let task = wrap(operator, input, self_address); match self.dispatcher.send(task).await { Ok(_) => (), @@ -175,6 +185,8 @@ impl Handler for HnswQueryOrchestrator { match message { Ok(logs) => { + // TODO: remove this after debugging + println!("Received logs: {:?}", logs); let _ = result_channel.send(Ok(vec![vec![VectorQueryResult { id: "abc".to_string(), seq_id: BigInt::from(0), diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index 9ece2deb205e..56a7da319e27 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -269,10 +269,18 @@ impl Log for InMemoryLog { batch_size: i32, end_timestamp: Option, ) -> Result>, PullLogsError> { - let logs = self.logs.get(&collection_id).unwrap(); + let end_timestamp = match end_timestamp { + Some(end_timestamp) => end_timestamp, + None => i64::MAX, + }; + + let logs = match self.logs.get(&collection_id) { + Some(logs) => logs, + None => return Ok(Vec::new()), + }; let mut result = Vec::new(); for i in offset..(offset + batch_size as i64) { - if i < logs.len() as i64 { + if i < logs.len() as i64 && logs[i as usize].log_id_ts <= end_timestamp { result.push(logs[i as usize].record.clone()); } }