Skip to content

Commit

Permalink
enhance: Pre-allocate ids for compaction (milvus-io#34187)
Browse files Browse the repository at this point in the history
This PR removes the dependency of compaction on the ID allocator by
pre-allocating the logID and segmentID.

issue: milvus-io#33957

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Jul 17, 2024
1 parent 90e765d commit ca758c3
Show file tree
Hide file tree
Showing 25 changed files with 288 additions and 122 deletions.
3 changes: 3 additions & 0 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,18 +577,21 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
case datapb.CompactionType_MixCompaction:
task = &mixCompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
}
case datapb.CompactionType_Level0DeleteCompaction:
task = &l0CompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
}
case datapb.CompactionType_ClusteringCompaction:
task = &clusteringCompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
handler: c.handler,
Expand Down
12 changes: 11 additions & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type clusteringCompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span

span trace.Span
allocator allocator
meta CompactionMeta
sessions SessionManager
handler Handler
Expand Down Expand Up @@ -133,6 +134,10 @@ func (t *clusteringCompactionTask) retryableProcess() error {
}

func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.allocN(1)
if err != nil {
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
Expand All @@ -147,6 +152,11 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need
BeginLogID: beginLogID,
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
Expand Down Expand Up @@ -50,6 +52,8 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() {
},
})
session := NewSessionManagerImpl()
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)

schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true)
pk := &schemapb.FieldSchema{
Expand Down Expand Up @@ -77,9 +81,11 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() {
Schema: schema,
ClusteringKeyField: pk,
InputSegments: []int64{101, 102},
ResultSegments: []int64{1000, 1100},
},
meta: meta,
sessions: session,
meta: meta,
sessions: session,
allocator: alloc,
}

task.processPipelining()
Expand Down
17 changes: 12 additions & 5 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ var _ CompactionTask = (*l0CompactionTask)(nil)

type l0CompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
span trace.Span
sessions SessionManager
meta CompactionMeta
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult

span trace.Span
allocator allocator
sessions SessionManager
meta CompactionMeta
}

// Note: return True means exit this state machine.
Expand Down Expand Up @@ -230,6 +232,10 @@ func (t *l0CompactionTask) CleanLogPath() {
}

func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.allocN(1)
if err != nil {
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
Expand All @@ -239,6 +245,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
}

log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
Expand Down
60 changes: 60 additions & 0 deletions internal/datacoord/compaction_task_l0_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
Expand Down Expand Up @@ -58,6 +74,9 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() {
},
meta: s.mockMeta,
}
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
plan, err := task.BuildCompactionRequest()
s.Require().NoError(err)

Expand Down Expand Up @@ -88,6 +107,9 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
},
meta: s.mockMeta,
}
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc

_, err := task.BuildCompactionRequest()
s.Error(err)
Expand Down Expand Up @@ -121,10 +143,41 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
},
meta: s.mockMeta,
}
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
_, err := task.BuildCompactionRequest()
s.Error(err)
}

func (s *CompactionTaskSuite) TestBuildCompactionRequestFailed_AllocFailed() {
var task CompactionTask

alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, errors.New("mock alloc err"))

task = &l0CompactionTask{
allocator: alloc,
}
_, err := task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)

task = &mixCompactionTask{
allocator: alloc,
}
_, err = task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)

task = &clusteringCompactionTask{
allocator: alloc,
}
_, err = task.BuildCompactionRequest()
s.T().Logf("err=%v", err)
s.Error(err)
}

func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask {
return &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
Expand All @@ -145,9 +198,13 @@ func (s *CompactionTaskSuite) SetupSubTest() {
}

func (s *CompactionTaskSuite) TestProcessStateTrans() {
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)

s.Run("test pipelining needReassignNodeID", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = NullNodeID
t.allocator = alloc
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.State)
Expand All @@ -157,6 +214,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
s.Run("test pipelining BuildCompactionRequest failed", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.allocator = alloc
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}

Expand Down Expand Up @@ -194,6 +252,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
s.Run("test pipelining Compaction failed", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.allocator = alloc
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}

Expand Down Expand Up @@ -234,6 +293,7 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
s.Run("test pipelining success", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
t.allocator = alloc
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}

Expand Down
14 changes: 12 additions & 2 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ var _ CompactionTask = (*mixCompactionTask)(nil)

type mixCompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult
plan *datapb.CompactionPlan
result *datapb.CompactionPlanResult

span trace.Span
allocator allocator
sessions SessionManager
meta CompactionMeta
newSegment *SegmentInfo
Expand Down Expand Up @@ -328,6 +330,10 @@ func (t *mixCompactionTask) CleanLogPath() {
}

func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.allocN(1)
if err != nil {
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
Expand All @@ -337,6 +343,10 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
},
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
42 changes: 25 additions & 17 deletions internal/datacoord/compaction_task_mix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalMix() {
}).Times(2)
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Type: datapb.CompactionType_MixCompaction,
NodeID: 1,
State: datapb.CompactionTaskState_executing,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100},
},
// plan: plan,
meta: s.mockMeta,
}
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
plan, err := task.BuildCompactionRequest()
s.Require().NoError(err)

Expand All @@ -53,18 +57,22 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
}).Once()
task := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []int64{200, 201},
PlanID: 1,
TriggerID: 19530,
CollectionID: 1,
PartitionID: 10,
Channel: channel,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []int64{200, 201},
ResultSegments: []int64{100},
},
meta: s.mockMeta,
}
alloc := NewNMockAllocator(s.T())
alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil)
task.allocator = alloc
_, err := task.BuildCompactionRequest()
s.Error(err)
s.ErrorIs(err, merr.ErrSegmentNotFound)
Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
}

plans := t.generatePlans(group.segments, signal, ct)
currentID, _, err := t.allocator.allocN(int64(len(plans)))
currentID, _, err := t.allocator.allocN(int64(len(plans) * 2))
if err != nil {
return err
}
Expand All @@ -413,6 +413,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
start := time.Now()
planID := currentID
currentID++
targetSegmentID := currentID
currentID++
pts, _ := tsoutil.ParseTS(ct.startTime)
task := &datapb.CompactionTask{
PlanID: planID,
Expand All @@ -426,6 +428,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
PartitionID: group.partitionID,
Channel: group.channelName,
InputSegments: segIDs,
ResultSegments: []int64{targetSegmentID}, // pre-allocated target segment
TotalRows: totalRows,
Schema: coll.Schema,
}
Expand Down Expand Up @@ -503,7 +506,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
}

plans := t.generatePlans(segments, signal, ct)
currentID, _, err := t.allocator.allocN(int64(len(plans)))
currentID, _, err := t.allocator.allocN(int64(len(plans) * 2))
if err != nil {
log.Warn("fail to allocate id", zap.Error(err))
return
Expand All @@ -518,6 +521,8 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
start := time.Now()
planID := currentID
currentID++
targetSegmentID := currentID
currentID++
pts, _ := tsoutil.ParseTS(ct.startTime)
if err := t.compactionHandler.enqueueCompaction(&datapb.CompactionTask{
PlanID: planID,
Expand All @@ -531,6 +536,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
PartitionID: partitionID,
Channel: channel,
InputSegments: segmentIDS,
ResultSegments: []int64{targetSegmentID}, // pre-allocated target segment
TotalRows: totalRows,
Schema: coll.Schema,
}); err != nil {
Expand Down
Loading

0 comments on commit ca758c3

Please sign in to comment.