diff --git a/internal/flushcommon/writebuffer/manager.go b/internal/flushcommon/writebuffer/manager.go index a1311814c9d76..22e25e4567b82 100644 --- a/internal/flushcommon/writebuffer/manager.go +++ b/internal/flushcommon/writebuffer/manager.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // BufferManager is the interface for WriteBuffer management. @@ -53,7 +54,7 @@ type BufferManager interface { func NewManager(syncMgr syncmgr.SyncManager) BufferManager { return &bufferManager{ syncMgr: syncMgr, - buffers: make(map[string]WriteBuffer), + buffers: typeutil.NewConcurrentMap[string, WriteBuffer](), ch: lifetime.NewSafeChan(), } @@ -61,8 +62,7 @@ func NewManager(syncMgr syncmgr.SyncManager) BufferManager { type bufferManager struct { syncMgr syncmgr.SyncManager - buffers map[string]WriteBuffer - mut sync.RWMutex + buffers *typeutil.ConcurrentMap[string, WriteBuffer] wg sync.WaitGroup ch lifetime.SafeChan @@ -97,13 +97,11 @@ func (m *bufferManager) memoryCheck() { return } startTime := time.Now() - m.mut.RLock() defer func() { dur := time.Since(startTime) if dur > 30*time.Second { log.Warn("memory check takes too long", zap.Duration("time", dur)) } - m.mut.RUnlock() }() for { @@ -116,7 +114,7 @@ func (m *bufferManager) memoryCheck() { return mem / 1024 / 1024 } - for chanName, buf := range m.buffers { + m.buffers.Range(func(chanName string, buf WriteBuffer) bool { size := buf.MemorySize() total += size if size > candiSize { @@ -124,7 +122,8 @@ func (m *bufferManager) memoryCheck() { candidate = buf candiChan = chanName } - } + return true + }) totalMemory := hardware.GetMemoryCount() memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat() @@ -150,27 +149,23 @@ func (m *bufferManager) Stop() { // Register a new WriteBuffer for channel. func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error { - m.mut.Lock() - defer m.mut.Unlock() - - _, ok := m.buffers[channel] - if ok { - return merr.WrapErrChannelReduplicate(channel) - } buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...) if err != nil { return err } - m.buffers[channel] = buf + + _, loaded := m.buffers.GetOrInsert(channel, buf) + if loaded { + buf.Close(context.Background(), false) + return merr.WrapErrChannelReduplicate(channel) + } return nil } // CreateNewGrowingSegment notifies writeBuffer to create a new growing segment. func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(ctx).Warn("write buffer not found when create new growing segment", zap.String("channel", channel), zap.Int64("partitionID", partitionID), @@ -183,11 +178,8 @@ func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel str // SealSegments call sync segment and change segments state to Flushed. func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(ctx).Warn("write buffer not found when flush segments", zap.String("channel", channel), zap.Int64s("segmentIDs", segmentIDs)) @@ -198,11 +190,8 @@ func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmen } func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(ctx).Warn("write buffer not found when flush channel", zap.String("channel", channel), zap.Uint64("flushTs", flushTs)) @@ -214,11 +203,8 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT // BufferData put data into channel write buffer. func (m *bufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(context.Background()).Warn("write buffer not found when buffer data", zap.String("channel", channel)) return merr.WrapErrChannelNotFound(channel) @@ -229,11 +215,8 @@ func (m *bufferManager) BufferData(channel string, insertData []*InsertData, del // GetCheckpoint returns checkpoint for provided channel. func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { return nil, false, merr.WrapErrChannelNotFound(channel) } cp := buf.GetCheckpoint() @@ -243,10 +226,8 @@ func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, } func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { - m.mut.Lock() - defer m.mut.Unlock() - buf, ok := m.buffers[channel] - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { return } flushTs := buf.GetFlushTimestamp() @@ -259,12 +240,8 @@ func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { // RemoveChannel remove channel WriteBuffer from manager. // this method discards all buffered data since datanode no longer has the ownership func (m *bufferManager) RemoveChannel(channel string) { - m.mut.Lock() - buf, ok := m.buffers[channel] - delete(m.buffers, channel) - m.mut.Unlock() - - if !ok { + buf, loaded := m.buffers.GetAndRemove(channel) + if !loaded { log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel)) return } @@ -275,12 +252,8 @@ func (m *bufferManager) RemoveChannel(channel string) { // DropChannel removes channel WriteBuffer and process `DropChannel` // this method will save all buffered data func (m *bufferManager) DropChannel(channel string) { - m.mut.Lock() - buf, ok := m.buffers[channel] - delete(m.buffers, channel) - m.mut.Unlock() - - if !ok { + buf, loaded := m.buffers.GetAndRemove(channel) + if !loaded { log.Warn("failed to drop channel, channel not maintained in manager", zap.String("channel", channel)) return } @@ -289,11 +262,8 @@ func (m *bufferManager) DropChannel(channel string) { } func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs)) return } diff --git a/internal/flushcommon/writebuffer/manager_test.go b/internal/flushcommon/writebuffer/manager_test.go index d344c48d83d7f..3107c12fb7115 100644 --- a/internal/flushcommon/writebuffer/manager_test.go +++ b/internal/flushcommon/writebuffer/manager_test.go @@ -95,10 +95,7 @@ func (s *ManagerSuite) TestFlushSegments() { defer cancel() wb := NewMockWriteBuffer(s.T()) - - s.manager.mut.Lock() - s.manager.buffers[s.channelName] = wb - s.manager.mut.Unlock() + s.manager.buffers.Insert(s.channelName, wb) wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil) @@ -119,9 +116,8 @@ func (s *ManagerSuite) TestCreateNewGrowingSegment() { idAllocator: s.allocator, }) s.NoError(err) - s.manager.mut.Lock() - s.manager.buffers[s.channelName] = wb - s.manager.mut.Unlock() + + s.manager.buffers.Insert(s.channelName, wb) err = manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1) s.NoError(err) } @@ -136,10 +132,7 @@ func (s *ManagerSuite) TestBufferData() { s.Run("normal_buffer_data", func() { wb := NewMockWriteBuffer(s.T()) - s.manager.mut.Lock() - s.manager.buffers[s.channelName] = wb - s.manager.mut.Unlock() - + s.manager.buffers.Insert(s.channelName, wb) wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := manager.BufferData(s.channelName, nil, nil, nil, nil) @@ -157,10 +150,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { s.Run("normal_checkpoint", func() { wb := NewMockWriteBuffer(s.T()) - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)} wb.EXPECT().GetCheckpoint().Return(pos) wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS) @@ -173,10 +163,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { s.Run("checkpoint_need_update", func() { wb := NewMockWriteBuffer(s.T()) - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0) pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp} @@ -221,10 +208,7 @@ func (s *ManagerSuite) TestDropPartitions() { wb := NewMockWriteBuffer(s.T()) wb.EXPECT().DropPartitions(mock.Anything).Return() - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) manager.DropPartitions(s.channelName, []int64{1}) }) } @@ -261,10 +245,7 @@ func (s *ManagerSuite) TestMemoryCheck() { } flag.Store(true) }).Return() - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) manager.Start() defer manager.Stop() diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index 65d33619848ee..ec46d23589e90 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -68,50 +68,35 @@ type checkpointCandidate struct { } type checkpointCandidates struct { - candidates map[string]*checkpointCandidate - mu sync.RWMutex + candidates *typeutil.ConcurrentMap[string, *checkpointCandidate] +} + +func getCandidatesKey(segmentID int64, timestamp uint64) string { + return fmt.Sprintf("%d-%d", segmentID, timestamp) } func newCheckpointCandiates() *checkpointCandidates { return &checkpointCandidates{ - candidates: make(map[string]*checkpointCandidate), + candidates: typeutil.NewConcurrentMap[string, *checkpointCandidate](), // segmentID-ts } } func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp)) -} - -func (c *checkpointCandidates) RemoveChannel(channel string, timestamp uint64) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.candidates, fmt.Sprintf("%s-%d", channel, timestamp)) + c.candidates.Remove(getCandidatesKey(segmentID, timestamp)) } func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) { - c.mu.Lock() - defer c.mu.Unlock() - c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source} -} - -func (c *checkpointCandidates) AddChannel(channel string, position *msgpb.MsgPosition, source string) { - c.mu.Lock() - defer c.mu.Unlock() - c.candidates[fmt.Sprintf("%s-%d", channel, position.GetTimestamp())] = &checkpointCandidate{-1, position, source} + c.candidates.Insert(getCandidatesKey(segmentID, position.GetTimestamp()), &checkpointCandidate{segmentID, position, source}) } func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate { - c.mu.RLock() - defer c.mu.RUnlock() - var result *checkpointCandidate = def - for _, candidate := range c.candidates { + c.candidates.Range(func(_ string, candidate *checkpointCandidate) bool { if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() { result = candidate } - } + return true + }) return result } @@ -126,8 +111,6 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncm // writeBufferBase is the common component for buffering data type writeBufferBase struct { - mut sync.RWMutex - collectionID int64 channelName string @@ -136,6 +119,7 @@ type writeBufferBase struct { estSizePerRecord int metaCache metacache.MetaCache + mut sync.RWMutex buffers map[int64]*segmentBuffer // segmentID => segmentBuffer syncPolicies []SyncPolicy