Skip to content

Commit

Permalink
fix: Replace outer lock with concurrent map (#37817)
Browse files Browse the repository at this point in the history
See also: #37493

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 21, 2024
1 parent 7817ea6 commit 70e6a00
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 115 deletions.
90 changes: 30 additions & 60 deletions internal/flushcommon/writebuffer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// BufferManager is the interface for WriteBuffer management.
Expand Down Expand Up @@ -53,16 +54,15 @@ type BufferManager interface {
func NewManager(syncMgr syncmgr.SyncManager) BufferManager {
return &bufferManager{
syncMgr: syncMgr,
buffers: make(map[string]WriteBuffer),
buffers: typeutil.NewConcurrentMap[string, WriteBuffer](),

ch: lifetime.NewSafeChan(),
}
}

type bufferManager struct {
syncMgr syncmgr.SyncManager
buffers map[string]WriteBuffer
mut sync.RWMutex
buffers *typeutil.ConcurrentMap[string, WriteBuffer]

wg sync.WaitGroup
ch lifetime.SafeChan
Expand Down Expand Up @@ -97,13 +97,11 @@ func (m *bufferManager) memoryCheck() {
return
}
startTime := time.Now()
m.mut.RLock()
defer func() {
dur := time.Since(startTime)
if dur > 30*time.Second {
log.Warn("memory check takes too long", zap.Duration("time", dur))
}
m.mut.RUnlock()
}()

for {
Expand All @@ -116,15 +114,16 @@ func (m *bufferManager) memoryCheck() {
return mem / 1024 / 1024
}

for chanName, buf := range m.buffers {
m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
}
}
return true
})

totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
Expand All @@ -150,27 +149,23 @@ func (m *bufferManager) Stop() {

// Register a new WriteBuffer for channel.
func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, opts ...WriteBufferOption) error {
m.mut.Lock()
defer m.mut.Unlock()

_, ok := m.buffers[channel]
if ok {
return merr.WrapErrChannelReduplicate(channel)
}
buf, err := NewWriteBuffer(channel, metacache, m.syncMgr, opts...)
if err != nil {
return err
}
m.buffers[channel] = buf

_, loaded := m.buffers.GetOrInsert(channel, buf)
if loaded {
buf.Close(context.Background(), false)
return merr.WrapErrChannelReduplicate(channel)
}
return nil
}

// CreateNewGrowingSegment notifies writeBuffer to create a new growing segment.
func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel string, partitionID int64, segmentID int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when create new growing segment",
zap.String("channel", channel),
zap.Int64("partitionID", partitionID),
Expand All @@ -183,11 +178,8 @@ func (m *bufferManager) CreateNewGrowingSegment(ctx context.Context, channel str

// SealSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
zap.String("channel", channel),
zap.Int64s("segmentIDs", segmentIDs))
Expand All @@ -198,11 +190,8 @@ func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmen
}

func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush channel",
zap.String("channel", channel),
zap.Uint64("flushTs", flushTs))
Expand All @@ -214,11 +203,8 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT

// BufferData put data into channel write buffer.
func (m *bufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(context.Background()).Warn("write buffer not found when buffer data",
zap.String("channel", channel))
return merr.WrapErrChannelNotFound(channel)
Expand All @@ -229,11 +215,8 @@ func (m *bufferManager) BufferData(channel string, insertData []*InsertData, del

// GetCheckpoint returns checkpoint for provided channel.
func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return nil, false, merr.WrapErrChannelNotFound(channel)
}
cp := buf.GetCheckpoint()
Expand All @@ -243,10 +226,8 @@ func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool,
}

func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
m.mut.Lock()
defer m.mut.Unlock()
buf, ok := m.buffers[channel]
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}
flushTs := buf.GetFlushTimestamp()
Expand All @@ -259,12 +240,8 @@ func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
// RemoveChannel remove channel WriteBuffer from manager.
// this method discards all buffered data since datanode no longer has the ownership
func (m *bufferManager) RemoveChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()

if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel))
return
}
Expand All @@ -275,12 +252,8 @@ func (m *bufferManager) RemoveChannel(channel string) {
// DropChannel removes channel WriteBuffer and process `DropChannel`
// this method will save all buffered data
func (m *bufferManager) DropChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()

if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to drop channel, channel not maintained in manager", zap.String("channel", channel))
return
}
Expand All @@ -289,11 +262,8 @@ func (m *bufferManager) DropChannel(channel string) {
}

func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs))
return
}
Expand Down
35 changes: 8 additions & 27 deletions internal/flushcommon/writebuffer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ func (s *ManagerSuite) TestFlushSegments() {
defer cancel()

wb := NewMockWriteBuffer(s.T())

s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
s.manager.buffers.Insert(s.channelName, wb)

wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil)

Expand All @@ -119,9 +116,8 @@ func (s *ManagerSuite) TestCreateNewGrowingSegment() {
idAllocator: s.allocator,
})
s.NoError(err)
s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()

s.manager.buffers.Insert(s.channelName, wb)
err = manager.CreateNewGrowingSegment(context.Background(), s.channelName, 1, 1)
s.NoError(err)
}
Expand All @@ -136,10 +132,7 @@ func (s *ManagerSuite) TestBufferData() {
s.Run("normal_buffer_data", func() {
wb := NewMockWriteBuffer(s.T())

s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()

s.manager.buffers.Insert(s.channelName, wb)
wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

err := manager.BufferData(s.channelName, nil, nil, nil, nil)
Expand All @@ -157,10 +150,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("normal_checkpoint", func() {
wb := NewMockWriteBuffer(s.T())

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)}
wb.EXPECT().GetCheckpoint().Return(pos)
wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS)
Expand All @@ -173,10 +163,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("checkpoint_need_update", func() {
wb := NewMockWriteBuffer(s.T())

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0)

pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp}
Expand Down Expand Up @@ -221,10 +208,7 @@ func (s *ManagerSuite) TestDropPartitions() {
wb := NewMockWriteBuffer(s.T())
wb.EXPECT().DropPartitions(mock.Anything).Return()

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
manager.DropPartitions(s.channelName, []int64{1})
})
}
Expand Down Expand Up @@ -261,10 +245,7 @@ func (s *ManagerSuite) TestMemoryCheck() {
}
flag.Store(true)
}).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
manager.Start()
defer manager.Stop()

Expand Down
40 changes: 12 additions & 28 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,35 @@ type checkpointCandidate struct {
}

type checkpointCandidates struct {
candidates map[string]*checkpointCandidate
mu sync.RWMutex
candidates *typeutil.ConcurrentMap[string, *checkpointCandidate]
}

func getCandidatesKey(segmentID int64, timestamp uint64) string {
return fmt.Sprintf("%d-%d", segmentID, timestamp)
}

func newCheckpointCandiates() *checkpointCandidates {
return &checkpointCandidates{
candidates: make(map[string]*checkpointCandidate),
candidates: typeutil.NewConcurrentMap[string, *checkpointCandidate](), // segmentID-ts
}
}

func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp))
}

func (c *checkpointCandidates) RemoveChannel(channel string, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%s-%d", channel, timestamp))
c.candidates.Remove(getCandidatesKey(segmentID, timestamp))
}

func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source}
}

func (c *checkpointCandidates) AddChannel(channel string, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%s-%d", channel, position.GetTimestamp())] = &checkpointCandidate{-1, position, source}
c.candidates.Insert(getCandidatesKey(segmentID, position.GetTimestamp()), &checkpointCandidate{segmentID, position, source})
}

func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate {
c.mu.RLock()
defer c.mu.RUnlock()

var result *checkpointCandidate = def
for _, candidate := range c.candidates {
c.candidates.Range(func(_ string, candidate *checkpointCandidate) bool {
if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() {
result = candidate
}
}
return true
})
return result
}

Expand All @@ -126,8 +111,6 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncm

// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex

collectionID int64
channelName string

Expand All @@ -136,6 +119,7 @@ type writeBufferBase struct {
estSizePerRecord int
metaCache metacache.MetaCache

mut sync.RWMutex
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer

syncPolicies []SyncPolicy
Expand Down

0 comments on commit 70e6a00

Please sign in to comment.