Skip to content

Commit

Permalink
Merge branch 'chroma-core:main' into feature/close-persistent-client
Browse files Browse the repository at this point in the history
  • Loading branch information
tazarov authored Mar 21, 2024
2 parents f2b1354 + e6f3aec commit 2173199
Show file tree
Hide file tree
Showing 38 changed files with 811 additions and 265 deletions.
1 change: 1 addition & 0 deletions chromadb/logservice/logservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def pull_logs(
collection_id=str(collection_id),
start_from_id=start_id,
batch_size=batch_size,
end_timestamp=-1,
)
response = self._log_service_stub.PullLogs(request)
return response.records # type: ignore
51 changes: 29 additions & 22 deletions chromadb/proto/logservice_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 47 additions & 9 deletions chromadb/proto/logservice_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions go/pkg/logservice/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logservice

import (
"context"

"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel"
"github.com/chroma-core/chroma/go/pkg/types"
Expand All @@ -11,17 +12,19 @@ type (
IRecordLog interface {
common.Component
PushLogs(ctx context.Context, collectionID types.UniqueID, recordContent [][]byte) (int, error)
PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error)
PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error)
GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error)
}
)

var _ IRecordLog = &RecordLog{}

func (s *RecordLog) PushLogs(ctx context.Context, collectionID types.UniqueID, recordsContent [][]byte) (int, error) {
return s.recordLogDb.PushLogs(collectionID, recordsContent)
}

func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) {
return s.recordLogDb.PullLogs(collectionID, id, batchSize)
func (s *RecordLog) PullLogs(ctx context.Context, collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error) {
return s.recordLogDb.PullLogs(collectionID, id, batchSize, endTimestamp)
}

func (s *RecordLog) GetAllCollectionIDsToCompact() ([]*dbmodel.RecordLog, error) {
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/logservice/grpc/record_log_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc

import (
"context"

"github.com/chroma-core/chroma/go/pkg/grpcutils"
"github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (s *Server) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequest
return nil, err
}
records := make([]*logservicepb.RecordLog, 0)
recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize))
recordLogs, err := s.logService.PullLogs(ctx, collectionID, req.GetStartFromId(), int(req.BatchSize), req.GetEndTimestamp())
if err != nil {
log.Error("error pulling logs", zap.Error(err))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
Expand Down
26 changes: 19 additions & 7 deletions go/pkg/metastore/db/dao/record_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package dao
import (
"database/sql"
"errors"
"time"

"github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/pingcap/log"
"go.uber.org/zap"
"gorm.io/gorm"
"time"
)

type recordLogDb struct {
db *gorm.DB
}

var _ dbmodel.IRecordLogDb = &recordLogDb{}

func (s *recordLogDb) PushLogs(collectionID types.UniqueID, recordsContent [][]byte) (int, error) {
err := s.db.Transaction(func(tx *gorm.DB) error {
var timestamp = time.Now().UnixNano()
Expand Down Expand Up @@ -58,18 +61,27 @@ func (s *recordLogDb) PushLogs(collectionID types.UniqueID, recordsContent [][]b
return len(recordsContent), nil
}

func (s *recordLogDb) PullLogs(collectionID types.UniqueID, id int64, batchSize int) ([]*dbmodel.RecordLog, error) {
func (s *recordLogDb) PullLogs(collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*dbmodel.RecordLog, error) {
var collectionIDStr = types.FromUniqueID(collectionID)
log.Info("PullLogs",
zap.String("collectionID", *collectionIDStr),
zap.Int64("ID", id),
zap.Int("batch_size", batchSize))
zap.Int("batch_size", batchSize),
zap.Int64("endTimestamp", endTimestamp))

var recordLogs []*dbmodel.RecordLog
result := s.db.Where("collection_id = ? AND id >= ?", collectionIDStr, id).Order("id").Limit(batchSize).Find(&recordLogs)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
log.Error("PullLogs error", zap.Error(result.Error))
return nil, result.Error
if endTimestamp > 0 {
result := s.db.Where("collection_id = ? AND id >= ? AND timestamp <= ?", collectionIDStr, id, endTimestamp).Order("id").Limit(batchSize).Find(&recordLogs)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
log.Error("PullLogs error", zap.Error(result.Error))
return nil, result.Error
}
} else {
result := s.db.Where("collection_id = ? AND id >= ?", collectionIDStr, id).Order("id").Limit(batchSize).Find(&recordLogs)
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
log.Error("PullLogs error", zap.Error(result.Error))
return nil, result.Error
}
}
log.Info("PullLogs",
zap.String("collectionID", *collectionIDStr),
Expand Down
22 changes: 17 additions & 5 deletions go/pkg/metastore/db/dao/record_log_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package dao

import (
"testing"
"time"

"github.com/chroma-core/chroma/go/pkg/logservice/testutils"
"github.com/chroma-core/chroma/go/pkg/metastore/db/dbcore"
"github.com/chroma-core/chroma/go/pkg/metastore/db/dbmodel"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/pingcap/log"
"github.com/stretchr/testify/suite"
"gorm.io/gorm"
"testing"
)

type RecordLogDbTestSuite struct {
Expand Down Expand Up @@ -101,7 +103,8 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PushLogs() {
func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() {
// pull empty logs
var recordLogs []*dbmodel.RecordLog
recordLogs, err := suite.Db.PullLogs(suite.collectionId1, 0, 3)
invalidEndTimestamp := int64(-1)
recordLogs, err := suite.Db.PullLogs(suite.collectionId1, 0, 3, invalidEndTimestamp)
suite.NoError(err)
suite.Len(recordLogs, 0)

Expand All @@ -114,7 +117,7 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() {
suite.Equal(2, count)

// pull logs from id 0 batch_size 3
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 3)
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 3, invalidEndTimestamp)
suite.NoError(err)
suite.Len(recordLogs, 3)
for index := range recordLogs {
Expand All @@ -123,7 +126,7 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() {
}

// pull logs from id 0 batch_size 6
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 6)
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 0, 6, invalidEndTimestamp)
suite.NoError(err)
suite.Len(recordLogs, 5)

Expand All @@ -133,7 +136,16 @@ func (suite *RecordLogDbTestSuite) TestRecordLogDb_PullLogsFromID() {
}

// pull logs from id 3 batch_size 4
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4)
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4, invalidEndTimestamp)
suite.NoError(err)
suite.Len(recordLogs, 3)
for index := range recordLogs {
suite.Equal(int64(index+3), recordLogs[index].ID)
suite.Equal(suite.records[index+2], *recordLogs[index].Record)
}

// pull logs from id 3 batch_size 4 endTimestamp Now
recordLogs, err = suite.Db.PullLogs(suite.collectionId1, 3, 4, time.Now().UnixNano())
suite.NoError(err)
suite.Len(recordLogs, 3)
for index := range recordLogs {
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/metastore/db/dbmodel/record_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ func (v RecordLog) TableName() string {
//go:generate mockery --name=IRecordLogDb
type IRecordLogDb interface {
PushLogs(collectionID types.UniqueID, recordsContent [][]byte) (int, error)
PullLogs(collectionID types.UniqueID, id int64, batchSize int) ([]*RecordLog, error)
PullLogs(collectionID types.UniqueID, id int64, batchSize int, endTimestamp int64) ([]*RecordLog, error)
GetAllCollectionsToCompact() ([]*RecordLog, error)
}
4 changes: 2 additions & 2 deletions go/pkg/proto/coordinatorpb/chroma.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2173199

Please sign in to comment.