diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index ea483d5d36ed7..40c4756877dd5 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -39,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/workerpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/indexcgowrapper" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" _ "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -201,7 +200,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er if (i+1)%statsBatchSize == 0 && writer.IsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) { serWriteStart := time.Now() - binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer) + binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer) if err != nil { log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) return nil, err @@ -224,7 +223,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er if !writer.FlushAndIsEmpty() { serWriteStart := time.Now() - binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer) + binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer) if err != nil { log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) return nil, err @@ -244,7 +243,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er } serWriteStart := time.Now() - binlogNums, sPath, err := statSerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows) + binlogNums, sPath, err := statSerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows) if err != nil { log.Ctx(ctx).Warn("stats wrong, failed to serialize write segment stats", zap.Int64("taskID", st.req.GetTaskID()), zap.Int64("remaining row count", numRows), zap.Error(err)) @@ -256,7 +255,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er var bm25StatsLogs []*datapb.FieldBinlog if len(bm25FieldIds) > 0 { - binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows) + binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows) if err != nil { log.Ctx(ctx).Warn("compact wrong, failed to serialize write segment bm25 stats", zap.Error(err)) return nil, err @@ -510,7 +509,7 @@ func mergeFieldBinlogs(base, paths map[typeutil.UniqueID]*datapb.FieldBinlog) { } } -func serializeWrite(ctx context.Context, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { +func serializeWrite(ctx context.Context, rootPath string, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite") defer span.End() @@ -525,7 +524,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme for i := range blobs { // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt fID, _ := strconv.ParseInt(blobs[i].GetKey(), 10, 64) - key, _ := binlog.BuildLogPath(storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i)) + key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i)) kvs[key] = blobs[i].GetValue() fieldBinlogs[fID] = &datapb.FieldBinlog{ @@ -546,7 +545,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme return } -func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) { +func statSerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite") defer span.End() sblob, err := writer.Finish() @@ -555,7 +554,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ } binlogNum := int64(1) - key, _ := binlog.BuildLogPath(storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID) + key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID) kvs := map[string][]byte{key: sblob.GetValue()} statFieldLog := &datapb.FieldBinlog{ FieldID: writer.GetPkID(), @@ -576,7 +575,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ return binlogNum, statFieldLog, nil } -func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) { +func bm25SerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "bm25log serializeWrite") defer span.End() stats, err := writer.GetBm25StatsBlob() @@ -588,7 +587,7 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ binlogs := []*datapb.FieldBinlog{} cnt := int64(0) for fieldID, blob := range stats { - key, _ := binlog.BuildLogPath(storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt) + key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt) kvs[key] = blob.GetValue() fieldLog := &datapb.FieldBinlog{ FieldID: fieldID, @@ -614,10 +613,6 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ return cnt, binlogs, nil } -func buildTextLogPrefix(rootPath string, collID, partID, segID, fieldID, version int64) string { - return fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", rootPath, common.TextIndexPath, collID, partID, segID, fieldID, version) -} - func ParseStorageConfig(s *indexpb.StorageConfig) (*indexcgopb.StorageConfig, error) { bs, err := proto.Marshal(s) if err != nil { diff --git a/internal/indexnode/task_stats_test.go b/internal/indexnode/task_stats_test.go index a609c57b22d2a..48cde12f4cbb3 100644 --- a/internal/indexnode/task_stats_test.go +++ b/internal/indexnode/task_stats_test.go @@ -86,7 +86,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() { s.schema = genCollectionSchemaWithBM25() s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() s.GenSegmentWriterWithBM25(0) - cnt, binlogs, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1) + cnt, binlogs, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1) s.Require().NoError(err) s.Equal(int64(1), cnt) s.Equal(1, len(binlogs)) @@ -96,7 +96,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() { s.schema = genCollectionSchemaWithBM25() s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once() s.GenSegmentWriterWithBM25(0) - _, _, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1) + _, _, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1) s.Error(err) }) } @@ -105,7 +105,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() { s.Run("normal case", func() { s.schema = genCollectionSchemaWithBM25() s.GenSegmentWriterWithBM25(0) - _, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter) + _, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter) s.NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) { result := make([][]byte, len(paths)) @@ -149,7 +149,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() { s.Run("upload bm25 binlog failed", func() { s.schema = genCollectionSchemaWithBM25() s.GenSegmentWriterWithBM25(0) - _, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter) + _, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter) s.NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) { result := make([][]byte, len(paths))