diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index dd921d97c1f29..40fe73e46f4ab 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -577,18 +577,21 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com case datapb.CompactionType_MixCompaction: task = &mixCompactionTask{ CompactionTask: t, + allocator: c.allocator, meta: c.meta, sessions: c.sessions, } case datapb.CompactionType_Level0DeleteCompaction: task = &l0CompactionTask{ CompactionTask: t, + allocator: c.allocator, meta: c.meta, sessions: c.sessions, } case datapb.CompactionType_ClusteringCompaction: task = &clusteringCompactionTask{ CompactionTask: t, + allocator: c.allocator, meta: c.meta, sessions: c.sessions, handler: c.handler, diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 8a3e39b41196f..2866f6dc2404f 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -48,8 +48,9 @@ type clusteringCompactionTask struct { *datapb.CompactionTask plan *datapb.CompactionPlan result *datapb.CompactionPlanResult - span trace.Span + span trace.Span + allocator allocator meta CompactionMeta sessions SessionManager handler Handler @@ -133,6 +134,10 @@ func (t *clusteringCompactionTask) retryableProcess() error { } func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { + beginLogID, _, err := t.allocator.allocN(1) + if err != nil { + return nil, err + } plan := &datapb.CompactionPlan{ PlanID: t.GetPlanID(), StartTime: t.GetStartTime(), @@ -147,6 +152,11 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP PreferSegmentRows: t.GetPreferSegmentRows(), AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need + BeginLogID: beginLogID, + PreAllocatedSegments: &datapb.IDRange{ + Begin: t.GetResultSegments()[0], + End: t.GetResultSegments()[1], + }, } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index d12a0648b6f6c..39e375e5bc764 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -50,6 +52,8 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { }, }) session := NewSessionManagerImpl() + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true) pk := &schemapb.FieldSchema{ @@ -77,9 +81,11 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { Schema: schema, ClusteringKeyField: pk, InputSegments: []int64{101, 102}, + ResultSegments: []int64{1000, 1100}, }, - meta: meta, - sessions: session, + meta: meta, + sessions: session, + allocator: alloc, } task.processPipelining() diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 84260267f87c9..dd10181da9cf9 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -37,11 +37,13 @@ var _ CompactionTask = (*l0CompactionTask)(nil) type l0CompactionTask struct { *datapb.CompactionTask - plan *datapb.CompactionPlan - result *datapb.CompactionPlanResult - span trace.Span - sessions SessionManager - meta CompactionMeta + plan *datapb.CompactionPlan + result *datapb.CompactionPlanResult + + span trace.Span + allocator allocator + sessions SessionManager + meta CompactionMeta } // Note: return True means exit this state machine. @@ -230,6 +232,10 @@ func (t *l0CompactionTask) CleanLogPath() { } func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { + beginLogID, _, err := t.allocator.allocN(1) + if err != nil { + return nil, err + } plan := &datapb.CompactionPlan{ PlanID: t.GetPlanID(), StartTime: t.GetStartTime(), @@ -239,6 +245,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + BeginLogID: beginLogID, } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index f337017efd99e..6f0853b446d54 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package datacoord import ( @@ -58,6 +74,9 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() { }, meta: s.mockMeta, } + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + task.allocator = alloc plan, err := task.BuildCompactionRequest() s.Require().NoError(err) @@ -88,6 +107,9 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() { }, meta: s.mockMeta, } + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + task.allocator = alloc _, err := task.BuildCompactionRequest() s.Error(err) @@ -121,10 +143,41 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { }, meta: s.mockMeta, } + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + task.allocator = alloc _, err := task.BuildCompactionRequest() s.Error(err) } +func (s *CompactionTaskSuite) TestBuildCompactionRequestFailed_AllocFailed() { + var task CompactionTask + + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, errors.New("mock alloc err")) + + task = &l0CompactionTask{ + allocator: alloc, + } + _, err := task.BuildCompactionRequest() + s.T().Logf("err=%v", err) + s.Error(err) + + task = &mixCompactionTask{ + allocator: alloc, + } + _, err = task.BuildCompactionRequest() + s.T().Logf("err=%v", err) + s.Error(err) + + task = &clusteringCompactionTask{ + allocator: alloc, + } + _, err = task.BuildCompactionRequest() + s.T().Logf("err=%v", err) + s.Error(err) +} + func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask { return &l0CompactionTask{ CompactionTask: &datapb.CompactionTask{ @@ -145,9 +198,13 @@ func (s *CompactionTaskSuite) SetupSubTest() { } func (s *CompactionTaskSuite) TestProcessStateTrans() { + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + s.Run("test pipelining needReassignNodeID", func() { t := generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = NullNodeID + t.allocator = alloc got := t.Process() s.False(got) s.Equal(datapb.CompactionTaskState_pipelining, t.State) @@ -157,6 +214,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { s.Run("test pipelining BuildCompactionRequest failed", func() { t := generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 + t.allocator = alloc channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} @@ -194,6 +252,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { s.Run("test pipelining Compaction failed", func() { t := generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 + t.allocator = alloc channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} @@ -234,6 +293,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { s.Run("test pipelining success", func() { t := generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 + t.allocator = alloc channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 1957b71bbeee1..c27cd2e35ab93 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -18,9 +18,11 @@ var _ CompactionTask = (*mixCompactionTask)(nil) type mixCompactionTask struct { *datapb.CompactionTask - plan *datapb.CompactionPlan - result *datapb.CompactionPlanResult + plan *datapb.CompactionPlan + result *datapb.CompactionPlanResult + span trace.Span + allocator allocator sessions SessionManager meta CompactionMeta newSegment *SegmentInfo @@ -328,6 +330,10 @@ func (t *mixCompactionTask) CleanLogPath() { } func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { + beginLogID, _, err := t.allocator.allocN(1) + if err != nil { + return nil, err + } plan := &datapb.CompactionPlan{ PlanID: t.GetPlanID(), StartTime: t.GetStartTime(), @@ -337,6 +343,10 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + BeginLogID: beginLogID, + PreAllocatedSegments: &datapb.IDRange{ + Begin: t.GetResultSegments()[0], + }, } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 2d9c4e146ace4..50eb094f8ae7a 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -23,18 +23,22 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() { }).Times(2) task := &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Type: datapb.CompactionType_MixCompaction, - NodeID: 1, - State: datapb.CompactionTaskState_executing, - InputSegments: []int64{200, 201}, + PlanID: 1, + TriggerID: 19530, + CollectionID: 1, + PartitionID: 10, + Type: datapb.CompactionType_MixCompaction, + NodeID: 1, + State: datapb.CompactionTaskState_executing, + InputSegments: []int64{200, 201}, + ResultSegments: []int64{100}, }, // plan: plan, meta: s.mockMeta, } + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + task.allocator = alloc plan, err := task.BuildCompactionRequest() s.Require().NoError(err) @@ -53,18 +57,22 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() { }).Once() task := &mixCompactionTask{ CompactionTask: &datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Channel: channel, - Type: datapb.CompactionType_MixCompaction, - State: datapb.CompactionTaskState_executing, - NodeID: 1, - InputSegments: []int64{200, 201}, + PlanID: 1, + TriggerID: 19530, + CollectionID: 1, + PartitionID: 10, + Channel: channel, + Type: datapb.CompactionType_MixCompaction, + State: datapb.CompactionTaskState_executing, + NodeID: 1, + InputSegments: []int64{200, 201}, + ResultSegments: []int64{100}, }, meta: s.mockMeta, } + alloc := NewNMockAllocator(s.T()) + alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + task.allocator = alloc _, err := task.BuildCompactionRequest() s.Error(err) s.ErrorIs(err, merr.ErrSegmentNotFound) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index c3f1eb198bd7e..69016d647c3fb 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -399,7 +399,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { } plans := t.generatePlans(group.segments, signal, ct) - currentID, _, err := t.allocator.allocN(int64(len(plans))) + currentID, _, err := t.allocator.allocN(int64(len(plans) * 2)) if err != nil { return err } @@ -413,6 +413,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { start := time.Now() planID := currentID currentID++ + targetSegmentID := currentID + currentID++ pts, _ := tsoutil.ParseTS(ct.startTime) task := &datapb.CompactionTask{ PlanID: planID, @@ -426,6 +428,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { PartitionID: group.partitionID, Channel: group.channelName, InputSegments: segIDs, + ResultSegments: []int64{targetSegmentID}, // pre-allocated target segment TotalRows: totalRows, Schema: coll.Schema, } @@ -503,7 +506,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } plans := t.generatePlans(segments, signal, ct) - currentID, _, err := t.allocator.allocN(int64(len(plans))) + currentID, _, err := t.allocator.allocN(int64(len(plans) * 2)) if err != nil { log.Warn("fail to allocate id", zap.Error(err)) return @@ -518,6 +521,8 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { start := time.Now() planID := currentID currentID++ + targetSegmentID := currentID + currentID++ pts, _ := tsoutil.ParseTS(ct.startTime) if err := t.compactionHandler.enqueueCompaction(&datapb.CompactionTask{ PlanID: planID, @@ -531,6 +536,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { PartitionID: partitionID, Channel: channel, InputSegments: segmentIDS, + ResultSegments: []int64{targetSegmentID}, // pre-allocated target segment TotalRows: totalRows, Schema: coll.Schema, }); err != nil { diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index e05fa1b1b8cc5..cf0ce1eab9c4a 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -66,6 +66,9 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er CompactionTask: task, meta: h.meta, } + alloc := &MockAllocator0{} + t.allocator = alloc + t.ResultSegments = []int64{100} plan, err := t.BuildCompactionRequest() h.spyChan <- plan return err @@ -474,11 +477,12 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, // StartTime: 0, - TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), - Type: datapb.CompactionType_MixCompaction, - Channel: "ch1", - TotalRows: 200, - Schema: schema, + TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), + Type: datapb.CompactionType_MixCompaction, + Channel: "ch1", + TotalRows: 200, + Schema: schema, + PreAllocatedSegments: &datapb.IDRange{Begin: 100}, }, }, }, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 56df7d4fdbe9c..708f129e36401 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -250,6 +250,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, zap.Int64("planID", task.GetPlanID()), zap.Int64s("segmentIDs", task.GetInputSegments()), zap.Error(err)) + return } log.Info("Finish to submit a LevelZeroCompaction plan", zap.Int64("taskID", taskID), @@ -272,6 +273,12 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } _, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view) + resultSegmentNum := totalRows / preferSegmentRows * 2 + start, end, err := m.allocator.allocN(resultSegmentNum) + if err != nil { + log.Warn("pre-allocate result segments failed", zap.String("view", view.String())) + return + } task := &datapb.CompactionTask{ PlanID: taskID, TriggerID: view.(*ClusteringSegmentsView).triggerID, @@ -286,6 +293,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C Schema: collection.Schema, ClusteringKeyField: view.(*ClusteringSegmentsView).clusteringKeyField, InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), + ResultSegments: []int64{start, end}, // pre-allocated result segments range MaxSegmentRows: maxSegmentRows, PreferSegmentRows: preferSegmentRows, TotalRows: totalRows, @@ -299,6 +307,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C zap.Int64("planID", task.GetPlanID()), zap.Int64s("segmentIDs", task.GetInputSegments()), zap.Error(err)) + return } log.Info("Finish to submit a clustering compaction task", zap.Int64("taskID", taskID), diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 227f5c50c6063..bfc30392f5296 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -221,7 +221,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all Files: importFiles, Options: job.GetOptions(), Ts: ts, - AutoIDRange: &datapb.AutoIDRange{Begin: idBegin, End: idEnd}, + IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd}, RequestSegments: requestSegments, }, nil } diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 1a20260dec301..f348f7ba21398 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -36,7 +36,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/proto/clusteringpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -59,8 +59,9 @@ import ( var _ Compactor = (*clusteringCompactionTask)(nil) type clusteringCompactionTask struct { - binlogIO io.BinlogIO - allocator allocator.Allocator + binlogIO io.BinlogIO + logIDAlloc allocator.Interface + segIDAlloc allocator.Interface ctx context.Context cancel context.CancelFunc @@ -127,15 +128,17 @@ type FlushSignal struct { func NewClusteringCompactionTask( ctx context.Context, binlogIO io.BinlogIO, - alloc allocator.Allocator, plan *datapb.CompactionPlan, ) *clusteringCompactionTask { ctx, cancel := context.WithCancel(ctx) + logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64) + segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegments().GetBegin(), plan.GetPreAllocatedSegments().GetEnd()) return &clusteringCompactionTask{ ctx: ctx, cancel: cancel, binlogIO: binlogIO, - allocator: alloc, + logIDAlloc: logIDAlloc, + segIDAlloc: segIDAlloc, plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), @@ -295,7 +298,9 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - t.refreshBufferWriter(buffer) + if _, err = t.refreshBufferWriter(buffer); err != nil { + return err + } t.clusterBuffers = append(t.clusterBuffers, buffer) for _, key := range bucket { scalarToClusterBufferMap[key] = buffer @@ -346,7 +351,9 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), clusteringKeyFieldStats: fieldStats, } - t.refreshBufferWriter(clusterBuffer) + if _, err = t.refreshBufferWriter(clusterBuffer); err != nil { + return err + } t.clusterBuffers = append(t.clusterBuffers, clusterBuffer) } t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer { @@ -805,7 +812,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, buffer.flushedRowNum.Load()) + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, buffer.flushedRowNum.Load()) if err != nil { return err } @@ -870,7 +877,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus } start := time.Now() - kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer) + kvs, partialBinlogs, err := serializeWrite(ctx, t.logIDAlloc, writer) if err != nil { log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) return err @@ -1136,7 +1143,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b } if buffer.writer == nil || buffer.currentSegmentRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() { pack = true - segmentID, err = t.allocator.AllocOne() + segmentID, err = t.segIDAlloc.AllocOne() if err != nil { return pack, err } diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 63260956203de..eb146a1d06979 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -25,7 +25,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" @@ -41,7 +40,6 @@ type ClusteringCompactionTaskSuite struct { suite.Suite mockBinlogIO *io.MockBinlogIO - mockAlloc *allocator.MockAllocator task *clusteringCompactionTask @@ -54,9 +52,8 @@ func (s *ClusteringCompactionTaskSuite) SetupSuite() { func (s *ClusteringCompactionTaskSuite) SetupTest() { s.mockBinlogIO = io.NewMockBinlogIO(s.T()) - s.mockAlloc = allocator.NewMockAllocator(s.T()) - s.task = NewClusteringCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil) + s.task = NewClusteringCompactionTask(context.Background(), s.mockBinlogIO, nil) paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0") diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index f3ecc8c6a9994..b2cb1d7db292e 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -24,7 +24,7 @@ import ( "go.opentelemetry.io/otel" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/io" iter "github.com/milvus-io/milvus/internal/datanode/iterators" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -130,7 +130,7 @@ func loadDeltaMap(segments []*datapb.CompactionSegmentBinlogs) (map[typeutil.Uni return deltaPaths, allPath, nil } -func serializeWrite(ctx context.Context, allocator allocator.Allocator, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { +func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite") defer span.End() @@ -166,7 +166,7 @@ func serializeWrite(ctx context.Context, allocator allocator.Allocator, writer * return } -func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Allocator, writer *SegmentWriter, finalRowCount int64) (*datapb.FieldBinlog, error) { +func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Interface, writer *SegmentWriter, finalRowCount int64) (*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite") defer span.End() sblob, err := writer.Finish(finalRowCount) diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index f2edabbb3d4bd..8fc62ad52c57f 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/otel" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -45,7 +45,7 @@ import ( type LevelZeroCompactionTask struct { io.BinlogIO - allocator allocator.Allocator + allocator allocator.Interface cm storage.ChunkManager plan *datapb.CompactionPlan @@ -63,11 +63,11 @@ var _ Compactor = (*LevelZeroCompactionTask)(nil) func NewLevelZeroCompactionTask( ctx context.Context, binlogIO io.BinlogIO, - alloc allocator.Allocator, cm storage.ChunkManager, plan *datapb.CompactionPlan, ) *LevelZeroCompactionTask { ctx, cancel := context.WithCancel(ctx) + alloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64) return &LevelZeroCompactionTask{ ctx: ctx, cancel: cancel, diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index c8cd986fdcbe4..28b5d4e55a0c9 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -48,7 +48,6 @@ type LevelZeroCompactionTaskSuite struct { suite.Suite mockBinlogIO *io.MockBinlogIO - mockAlloc *allocator.MockAllocator task *LevelZeroCompactionTask dData *storage.DeleteData @@ -57,10 +56,9 @@ type LevelZeroCompactionTaskSuite struct { func (s *LevelZeroCompactionTaskSuite) SetupTest() { paramtable.Init() - s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockBinlogIO = io.NewMockBinlogIO(s.T()) // plan of the task is unset - s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil, nil) + s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, nil) pk2ts := map[int64]uint64{ 1: 20000, @@ -272,6 +270,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { }, }, }, + BeginLogID: 11111, } s.task.plan = plan @@ -289,7 +288,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) s.Require().Equal(plan.GetPlanID(), s.task.GetPlanID()) s.Require().Equal(plan.GetChannel(), s.task.GetChannelName()) @@ -381,6 +379,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { }, }, }, + BeginLogID: 11111, } s.task.plan = plan @@ -396,7 +395,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once() s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() @@ -443,6 +441,7 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() { SegmentID: 100, }, }, + BeginLogID: 11111, } s.Run("serializeUpload allocator Alloc failed", func() { @@ -468,7 +467,6 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() { writer := NewSegmentDeltaWriter(100, 10, 1) writer.WriteBatch(s.dData.Pks, s.dData.Tss) writers := map[int64]*SegmentDeltaWriter{100: writer} - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) results, err := s.task.serializeUpload(ctx, writers) s.Error(err) @@ -480,7 +478,6 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() { s.task.plan = plan s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) writer := NewSegmentDeltaWriter(100, 10, 1) writer.WriteBatch(s.dData.Pks, s.dData.Tss) writers := map[int64]*SegmentDeltaWriter{100: writer} diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 8144ed8e07366..f22072c7673bd 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -20,6 +20,7 @@ import ( "context" "fmt" sio "io" + "math" "time" "github.com/cockroachdb/errors" @@ -27,7 +28,7 @@ import ( "go.opentelemetry.io/otel" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -42,8 +43,8 @@ import ( // for MixCompaction only type mixCompactionTask struct { - binlogIO io.BinlogIO - allocator.Allocator + binlogIO io.BinlogIO + allocator allocator.Interface currentTs typeutil.Timestamp plan *datapb.CompactionPlan @@ -61,15 +62,15 @@ var _ Compactor = (*mixCompactionTask)(nil) func NewMixCompactionTask( ctx context.Context, binlogIO io.BinlogIO, - alloc allocator.Allocator, plan *datapb.CompactionPlan, ) *mixCompactionTask { ctx1, cancel := context.WithCancel(ctx) + alloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64) return &mixCompactionTask{ ctx: ctx1, cancel: cancel, binlogIO: binlogIO, - Allocator: alloc, + allocator: alloc, plan: plan, tr: timerecord.NewTimeRecorder("mix compaction"), currentTs: tsoutil.GetCurrentTime(), @@ -198,7 +199,7 @@ func (t *mixCompactionTask) merge( if (unflushedRowCount+1)%100 == 0 && writer.FlushAndIsFull() { serWriteStart := time.Now() - kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer) if err != nil { log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) return nil, err @@ -220,7 +221,7 @@ func (t *mixCompactionTask) merge( if !writer.FlushAndIsEmpty() { serWriteStart := time.Now() - kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, writer) if err != nil { log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) return nil, err @@ -239,7 +240,7 @@ func (t *mixCompactionTask) merge( } serWriteStart := time.Now() - sPath, err := statSerializeWrite(ctx, t.binlogIO, t.Allocator, writer, remainingRowCount) + sPath, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, remainingRowCount) if err != nil { log.Warn("compact wrong, failed to serialize write segment stats", zap.Int64("remaining row count", remainingRowCount), zap.Error(err)) @@ -309,12 +310,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { log.Info("compact start") - targetSegID, err := t.AllocOne() - if err != nil { - log.Warn("compact wrong, unable to allocate segmentID", zap.Error(err)) - return nil, err - } - + targetSegID := t.plan.GetPreAllocatedSegments().GetBegin() previousRowCount := t.getNumRows() writer, err := NewSegmentWriter(t.plan.GetSchema(), previousRowCount, targetSegID, partitionID, collectionID) diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 7b5112a10d8b9..4bf0a8efdc6cf 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -18,6 +18,7 @@ package compaction import ( "context" + "github.com/milvus-io/milvus/internal/allocator" "math" "testing" "time" @@ -29,7 +30,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -51,7 +51,6 @@ type MixCompactionTaskSuite struct { suite.Suite mockBinlogIO *io.MockBinlogIO - mockAlloc *allocator.MockAllocator meta *etcdpb.CollectionMeta segWriter *SegmentWriter @@ -66,9 +65,6 @@ func (s *MixCompactionTaskSuite) SetupSuite() { func (s *MixCompactionTaskSuite) SetupTest() { s.mockBinlogIO = io.NewMockBinlogIO(s.T()) - s.mockAlloc = allocator.NewMockAllocator(s.T()) - - s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil) s.meta = genTestCollectionMeta() @@ -82,10 +78,14 @@ func (s *MixCompactionTaskSuite) SetupTest() { Field2StatslogPaths: nil, Deltalogs: nil, }}, - TimeoutInSeconds: 10, - Type: datapb.CompactionType_MixCompaction, - Schema: s.meta.GetSchema(), + TimeoutInSeconds: 10, + Type: datapb.CompactionType_MixCompaction, + Schema: s.meta.GetSchema(), + BeginLogID: 19530, + PreAllocatedSegments: &datapb.IDRange{Begin: 19530}, } + + s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.plan) s.task.plan = s.plan } @@ -104,7 +104,6 @@ func getMilvusBirthday() time.Time { func (s *MixCompactionTaskSuite) TestCompactDupPK() { // Test merge compactions, two segments with the same pk, one deletion pk=1 // The merged segment 19530 should remain 3 pk without pk=100 - s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice() segments := []int64{7, 8, 9} dblobs, err := getInt64DeltaBlobs( 1, @@ -115,8 +114,8 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}). Return([][]byte{dblobs.GetValue()}, nil).Times(3) - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(7777777, 8888888, nil) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64) // clear origial segments s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) @@ -139,7 +138,7 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { //} //bfs := metacache.NewBloomFilterSet(statistic) - kvs, fBinlogs, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) + kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { left, right := lo.Difference(keys, lo.Keys(kvs)) @@ -177,10 +176,8 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { } func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { - s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice() - segments := []int64{5, 6, 7} - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(7777777, 8888888, nil) + alloc := allocator.NewLocalAllocator(7777777, math.MaxInt64) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) for _, segID := range segments { @@ -191,7 +188,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { // MaxPK: s.segWriter.pkstats.MaxPk, //} //bfs := metacache.NewBloomFilterSet(statistic) - kvs, fBinlogs, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) + kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { left, right := lo.Difference(keys, lo.Keys(kvs)) @@ -251,11 +248,10 @@ func (s *MixCompactionTaskSuite) TestMergeBufferFull() { err := s.segWriter.Write(&v) s.Require().NoError(err) - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil).Times(2) - kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) + alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) + kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) - s.mockAlloc.EXPECT().AllocOne().Return(888888, nil) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, paths []string) ([][]byte, error) { s.Require().Equal(len(paths), len(kvs)) @@ -279,11 +275,10 @@ func (s *MixCompactionTaskSuite) TestMergeEntityExpired() { currTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second*(time.Duration(collTTL)+1)), 0) s.task.currentTs = currTs s.task.plan.CollectionTtl = int64(collTTL) - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil) + alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) - kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) + kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) - s.mockAlloc.EXPECT().AllocOne().Return(888888, nil) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, paths []string) ([][]byte, error) { s.Require().Equal(len(paths), len(kvs)) @@ -313,15 +308,11 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { {"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0}, } - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil) - kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) + alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) + kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter) s.Require().NoError(err) for _, test := range tests { s.Run(test.description, func() { - if test.expectedRowCount > 0 { - s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(77777, 99999, nil).Once() - } - s.mockAlloc.EXPECT().AllocOne().Return(888888, nil) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, paths []string) ([][]byte, error) { s.Require().Equal(len(paths), len(kvs)) @@ -489,12 +480,6 @@ func (s *MixCompactionTaskSuite) TestCompactFail() { _, err := s.task.Compact() s.Error(err) }) - - s.Run("Test compact AllocOnce failed", func() { - s.mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc one error")).Once() - _, err := s.task.Compact() - s.Error(err) - }) } func (s *MixCompactionTaskSuite) TestIsExpiredEntity() { diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 99e34f6e205cd..98c28d89c920e 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -265,7 +265,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, @@ -326,7 +326,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, @@ -417,7 +417,7 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 36f2ee6bbdd3e..c7190bf78e13c 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -66,7 +66,7 @@ func NewImportTask(req *datapb.ImportRequest, UnsetAutoID(req.GetSchema()) } // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetAutoIDRange().GetBegin(), math.MaxInt64) + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) task := &ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 9172475f54a18..9bad9bfa6f3e9 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -62,7 +62,7 @@ func NewL0ImportTask(req *datapb.ImportRequest, ) Task { ctx, cancel := context.WithCancel(context.Background()) // Setting end as math.MaxInt64 to incrementally allocate logID. - alloc := allocator.NewLocalAllocator(req.GetAutoIDRange().GetBegin(), math.MaxInt64) + alloc := allocator.NewLocalAllocator(req.GetIDRange().GetBegin(), math.MaxInt64) task := &L0ImportTask{ ImportTaskV2: &datapb.ImportTaskV2{ JobID: req.GetJobID(), diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go index dbedaae47df2c..8a2c991fa5c46 100644 --- a/internal/datanode/importv2/task_l0_import_test.go +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -165,7 +165,7 @@ func (s *L0ImportSuite) TestL0Import() { Vchannel: s.channel, }, }, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.delCnt), }, diff --git a/internal/datanode/services.go b/internal/datanode/services.go index ee235f2de2909..5263a37acf01d 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -213,6 +213,10 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl return merr.Success(), nil } + if req.GetBeginLogID() == 0 { + return merr.Status(merr.WrapErrParameterInvalidMsg("invalid beginLogID")), nil + } + /* spanCtx := trace.SpanContextFromContext(ctx) @@ -226,22 +230,25 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl task = compaction.NewLevelZeroCompactionTask( taskCtx, binlogIO, - node.allocator, node.chunkManager, req, ) case datapb.CompactionType_MixCompaction: + if req.GetPreAllocatedSegments() == nil || req.GetPreAllocatedSegments().GetBegin() == 0 { + return merr.Status(merr.WrapErrParameterInvalidMsg("invalid pre-allocated segmentID range")), nil + } task = compaction.NewMixCompactionTask( taskCtx, binlogIO, - node.allocator, req, ) case datapb.CompactionType_ClusteringCompaction: + if req.GetPreAllocatedSegments() == nil || req.GetPreAllocatedSegments().GetBegin() == 0 { + return merr.Status(merr.WrapErrParameterInvalidMsg("invalid pre-allocated segmentID range")), nil + } task = compaction.NewClusteringCompactionTask( taskCtx, binlogIO, - node.allocator, req, ) default: diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 05ee57b474db4..59d823cfcdf3b 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -231,6 +231,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { resp, err := node.CompactionV2(ctx, req) s.NoError(err) s.False(merr.Ok(resp)) + s.T().Logf("status=%v", resp) }) s.Run("unknown CompactionType", func() { @@ -245,11 +246,13 @@ func (s *DataNodeServicesSuite) TestCompaction() { {SegmentID: 102, Level: datapb.SegmentLevel_L0}, {SegmentID: 103, Level: datapb.SegmentLevel_L1}, }, + BeginLogID: 100, } resp, err := node.CompactionV2(ctx, req) s.NoError(err) s.False(merr.Ok(resp)) + s.T().Logf("status=%v", resp) }) s.Run("compact_clustering", func() { @@ -264,11 +267,60 @@ func (s *DataNodeServicesSuite) TestCompaction() { {SegmentID: 102, Level: datapb.SegmentLevel_L0}, {SegmentID: 103, Level: datapb.SegmentLevel_L1}, }, - Type: datapb.CompactionType_ClusteringCompaction, + Type: datapb.CompactionType_ClusteringCompaction, + BeginLogID: 100, + PreAllocatedSegments: &datapb.IDRange{Begin: 100, End: 200}, } - _, err := node.CompactionV2(ctx, req) + resp, err := node.CompactionV2(ctx, req) + s.NoError(err) + s.True(merr.Ok(resp)) + s.T().Logf("status=%v", resp) + }) + + s.Run("beginLogID is invalid", func() { + node := s.node + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 102, Level: datapb.SegmentLevel_L0}, + {SegmentID: 103, Level: datapb.SegmentLevel_L1}, + }, + Type: datapb.CompactionType_ClusteringCompaction, + BeginLogID: 0, + } + + resp, err := node.CompactionV2(ctx, req) s.NoError(err) + s.False(merr.Ok(resp)) + s.T().Logf("status=%v", resp) + }) + + s.Run("pre-allocated segmentID range is invalid", func() { + node := s.node + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := &datapb.CompactionPlan{ + PlanID: 1000, + Channel: dmChannelName, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 102, Level: datapb.SegmentLevel_L0}, + {SegmentID: 103, Level: datapb.SegmentLevel_L1}, + }, + Type: datapb.CompactionType_ClusteringCompaction, + BeginLogID: 100, + PreAllocatedSegments: &datapb.IDRange{Begin: 0, End: 0}, + } + + resp, err := node.CompactionV2(ctx, req) + s.NoError(err) + s.False(merr.Ok(resp)) + s.T().Logf("status=%v", resp) }) } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index c2b5a8e5e237d..97573f05a7262 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -560,6 +560,8 @@ message CompactionPlan { string analyze_result_path = 14; repeated int64 analyze_segment_ids = 15; int32 state = 16; + int64 begin_logID = 17; + IDRange pre_allocated_segments = 18; // only for clustering compaction } message CompactionSegment { @@ -723,7 +725,7 @@ message PreImportRequest { repeated common.KeyValuePair options = 9; } -message autoIDRange { +message IDRange { int64 begin = 1; int64 end = 2; } @@ -745,7 +747,7 @@ message ImportRequest { repeated internal.ImportFile files = 8; repeated common.KeyValuePair options = 9; uint64 ts = 10; - autoIDRange autoID_range = 11; + IDRange ID_range = 11; repeated ImportRequestSegment request_segments = 12; }