From 4ea767249d1ad73e247f1ada2ffa992329bcb58c Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 10:50:11 -0400 Subject: [PATCH 01/17] Fix LRU deadlock and improve logging --- src/dbnode/integration/setup.go | 12 +- src/dbnode/server/server.go | 6 +- src/dbnode/storage/block/block.go | 5 +- src/dbnode/storage/block/wired_list.go | 73 +++++++----- src/dbnode/storage/block/wired_list_test.go | 53 ++++++++- src/dbnode/storage/series/series.go | 14 ++- .../series_wired_list_interaction_test.go | 105 ++++++++++++++++++ 7 files changed, 227 insertions(+), 41 deletions(-) create mode 100644 src/dbnode/storage/series_wired_list_interaction_test.go diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index c6296838ee..67ff39f038 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -298,11 +298,13 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup // Set up wired list if required if storageOpts.SeriesCachePolicy() == series.CacheLRU { - wiredList := block.NewWiredList( - runtimeOptsMgr, - storageOpts.InstrumentOptions(), - storageOpts.ClockOptions(), - ) + wiredList := block.NewWiredList(block.WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: storageOpts.InstrumentOptions(), + ClockOptions: storageOpts.ClockOptions(), + // Use a small event channel size to stress-test the implementation + EventsChannelSize: 1, + }) blockOpts := storageOpts.DatabaseBlockOptions().SetWiredList(wiredList) blockPool := block.NewDatabaseBlockPool(nil) // Have to manually set the blockpool because the default one uses a constructor diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index efb83ce84d..80a9b7e8c9 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1004,7 +1004,11 @@ func withEncodingAndPoolingOptions( if opts.SeriesCachePolicy() == series.CacheLRU { runtimeOpts := opts.RuntimeOptionsManager() - wiredList := block.NewWiredList(runtimeOpts, iopts, opts.ClockOptions()) + wiredList := block.NewWiredList(block.WiredListOptions{ + RuntimeOptionsManager: runtimeOpts, + InstrumentOptions: iopts, + ClockOptions: opts.ClockOptions(), + }) blockOpts = blockOpts.SetWiredList(wiredList) } blockPool := block.NewDatabaseBlockPool(poolOptions(policy.BlockPool, diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index dd6797f768..83e7292e1e 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -196,8 +196,8 @@ func (b *dbBlock) OnRetrieveBlock( defer b.Unlock() if b.closed || - !id.Equal(b.retrieveID) || - !startTime.Equal(b.startWithRLock()) { + !id.Equal(b.retrieveID) { + // !startTime.Equal(b.startWithRLock()) { return } @@ -425,6 +425,7 @@ func (b *dbBlock) Close() { func (b *dbBlock) closeAndDiscard() ts.Segment { b.Lock() if b.closed { + panic("DOUBLE CLOSE") b.Unlock() return ts.Segment{} } diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index 32fbc9eac6..12bbb90617 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -64,8 +64,8 @@ import ( ) const ( - wiredListEventsChannelLength = 65536 - wiredListSampleGaugesEvery = 100 + defaultWiredListEventsChannelSize = 65536 + wiredListSampleGaugesEvery = 100 ) var ( @@ -82,13 +82,14 @@ type WiredList struct { // Max wired blocks, must use atomic store and load to access. maxWired int64 - root dbBlock - length int - updatesCh chan DatabaseBlock - doneCh chan struct{} + root dbBlock + length int + updatesChSize int + updatesCh chan DatabaseBlock + doneCh chan struct{} metrics wiredListMetrics - logger xlog.Logger + iOpts instrument.Options } type wiredListMetrics struct { @@ -118,22 +119,31 @@ func newWiredListMetrics(scope tally.Scope) wiredListMetrics { } } +// WiredListOptions is the options struct for the WiredList constructor. +type WiredListOptions struct { + RuntimeOptionsManager runtime.OptionsManager + InstrumentOptions instrument.Options + ClockOptions clock.Options + EventsChannelSize int +} + // NewWiredList returns a new database block wired list. -func NewWiredList( - runtimeOptsMgr runtime.OptionsManager, - iopts instrument.Options, - copts clock.Options, -) *WiredList { - scope := iopts.MetricsScope(). +func NewWiredList(opts WiredListOptions) *WiredList { + scope := opts.InstrumentOptions.MetricsScope(). SubScope("wired-list") l := &WiredList{ - nowFn: copts.NowFn(), + nowFn: opts.ClockOptions.NowFn(), metrics: newWiredListMetrics(scope), - logger: iopts.Logger(), + iOpts: opts.InstrumentOptions, + } + if opts.EventsChannelSize > 0 { + l.updatesChSize = opts.EventsChannelSize + } else { + l.updatesChSize = defaultWiredListEventsChannelSize } l.root.setNext(&l.root) l.root.setPrev(&l.root) - runtimeOptsMgr.RegisterListener(l) + opts.RuntimeOptionsManager.RegisterListener(l) return l } @@ -151,7 +161,7 @@ func (l *WiredList) Start() error { return errAlreadyStarted } - l.updatesCh = make(chan DatabaseBlock, wiredListEventsChannelLength) + l.updatesCh = make(chan DatabaseBlock, l.updatesChSize) l.doneCh = make(chan struct{}, 1) go func() { i := 0 @@ -202,20 +212,13 @@ func (l *WiredList) Update(v DatabaseBlock) { func (l *WiredList) processUpdateBlock(v DatabaseBlock) { entry := v.wiredListEntry() - if !entry.wasRetrievedFromDisk { - // The WiredList should should never receive blocks that were not retrieved from disk, - // but we check for posterity. - l.logger.WithFields( - xlog.NewField("closed", entry.closed), - xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), - ).Errorf("wired list tried to process a block that was not unwireable") - } - // In some cases the WiredList can receive blocks that are closed. This can happen if a block is // in the updatesCh (because it was read) but also already in the WiredList, and while its still // in the updatesCh, it is evicted from the wired list to make room for some other block that is // being processed. The eviction of the block will close it, but the enqueued update is still in - // the updateCh even though its an update for a closed block. + // the updateCh even though its an update for a closed block. For the same reason, the wired list + // can receive blocks that were not retrieved from disk because the closed block was returned to + // a pool and then re-used. unwireable := !entry.closed && entry.wasRetrievedFromDisk // If a block is still unwireable then its worth keeping track of in the wired list @@ -252,11 +255,23 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { // Try to unwire all blocks possible bl := l.root.next() for l.length > maxWired && bl != &l.root { + entry := bl.wiredListEntry() + if !entry.wasRetrievedFromDisk { + // This should never happen because processUpdateBlock performs the same + // check, and a block should never be pooled in-between those steps because + // the wired list is supposed to have sole ownership over that lifecycle and + // is single-threaded. + invariantLogger := instrument.EmitInvariantViolationAndGetLogger(l.iOpts) + invariantLogger.WithFields( + xlog.NewField("closed", entry.closed), + xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), + ).Errorf("wired list tried to process a block that was not unwireable") + } + // Evict the block before closing it so that callers of series.ReadEncoded() // don't get errors about trying to read from a closed block. if onEvict := bl.OnEvictedFromWiredList(); onEvict != nil { - wlEntry := bl.wiredListEntry() - onEvict.OnEvictedFromWiredList(wlEntry.retrieveID, wlEntry.startTime) + onEvict.OnEvictedFromWiredList(entry.retrieveID, entry.startTime) } // bl.Close() will return the block to the pool. In order to avoid races diff --git a/src/dbnode/storage/block/wired_list_test.go b/src/dbnode/storage/block/wired_list_test.go index 22d46a39e0..4dfd861403 100644 --- a/src/dbnode/storage/block/wired_list_test.go +++ b/src/dbnode/storage/block/wired_list_test.go @@ -54,7 +54,13 @@ func newTestWiredList( iopts = iopts.SetMetricsScope(overrideMetricsScope) } copts := clock.NewOptions() - return NewWiredList(runtimeOptsMgr, iopts, copts), runtimeOptsMgr + return NewWiredList(WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: iopts, + ClockOptions: copts, + // Use a small channel to stress-test the implementation + EventsChannelSize: 1, + }), runtimeOptsMgr } func newTestUnwireableBlock( @@ -160,6 +166,51 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { require.Equal(t, &l.root, l.root.prev()) } +func TestDeadlock(t *testing.T) { + wiredListEventsChannelLength = 1 + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + l, _ := newTestWiredList(runtime.NewOptions().SetMaxWiredBlocks(1), nil) + + opts := testOptions.SetWiredList(l) + + // l.Start() + bl := newTestUnwireableBlock(ctrl, fmt.Sprintf("foo.%d", 0), opts) + + l.Start() + l.Update(bl) + l.Update(bl) + l.Stop() + + // Order due to LRU should be: 1, 0 + require.Equal(t, bl, l.root.next()) + // require.Equal(t, blocks[0], l.root.next().next()) + + // Unwire block and assert removed + // blocks[1].closed = true + + // l.Start() + // l.Update(blocks[1]) + // l.Stop() + + // require.Equal(t, 1, l.length) + // require.Equal(t, blocks[0], l.root.next()) + // require.Equal(t, &l.root, l.root.next().next()) + + // // Unwire final block and assert removed + // blocks[0].closed = true + + // l.Start() + // l.Update(blocks[0]) + // l.Stop() + + // require.Equal(t, 0, l.length) + // require.Equal(t, &l.root, l.root.next()) + // require.Equal(t, &l.root, l.root.prev()) +} + // wiredListTestWiredBlocksString is used to debug the order of the wired list func wiredListTestWiredBlocksString(l *WiredList) string { // nolint: unused b := bytes.NewBuffer(nil) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index a16f209802..e6fc97f577 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3x/context" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" xlog "github.com/m3db/m3x/log" xtime "github.com/m3db/m3x/time" ) @@ -519,9 +520,9 @@ func (s *dbSeries) OnRetrieveBlock( segment ts.Segment, ) { s.Lock() - defer s.Unlock() if !id.Equal(s.id) { + s.Unlock() return } @@ -549,11 +550,17 @@ func (s *dbSeries) OnRetrieveBlock( // If we retrieved this from disk then we directly emplace it s.addBlockWithLock(b) - if list := s.opts.DatabaseBlockOptions().WiredList(); list != nil { + list := s.opts.DatabaseBlockOptions().WiredList() + s.Unlock() + + if list != nil { // Need to update the WiredList so blocks that were read from disk // can enter the list (OnReadBlock is only called for blocks that // were read from memory, regardless of whether the data originated // from disk or a buffer rotation.) + // Also, doing this outside of the lock is safe because updating the + // wired list is asynchronous anyways (Update just puts the block in + // a channel to be processed later.) list.Update(b) } } @@ -585,7 +592,8 @@ func (s *dbSeries) OnEvictedFromWiredList(id ident.ID, blockStart time.Time) { if ok { if !block.WasRetrievedFromDisk() { // Should never happen - invalid application state could cause data loss - s.opts.InstrumentOptions().Logger().WithFields( + instrument.EmitInvariantViolationAndGetLogger( + s.opts.InstrumentOptions()).WithFields( xlog.NewField("id", id.String()), xlog.NewField("blockStart", blockStart), ).Errorf("tried to evict block that was not retrieved from disk") diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go new file mode 100644 index 0000000000..0f6f1503c7 --- /dev/null +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -0,0 +1,105 @@ +package storage + +import ( + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/runtime" + "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/storage/series/lookup" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" +) + +// TestSeriesWiredListConcurrentInteractions was added as a regression test +// after discovering that interactions between a single series and the wired +// list could trigger a mutual dead lock. Specifically, if the wired list event +// channel was full, then the series could get blocked on a call to list.Update() +// in the OnRetrieveBlockMethod while the only goroutine pulling items off of that +// channel was stuck on the same series OnEvictedFromWiredList method. In that case, +// the OnRetrieveBlockMethod was stuck on a channel send while holding a lock that was +// required for the OnEvictedFromWiredList method that the wired list worker routine +// was calling. +func TestSeriesWiredListConcurrentInteractions(t *testing.T) { + var ( + runtimeOptsMgr = runtime.NewOptionsManager() + runtimeOpts = runtime.NewOptions().SetMaxWiredBlocks(1) + ) + runtimeOptsMgr.Update(runtimeOpts) + + runtime.NewOptions().SetMaxWiredBlocks(1) + wl := block.NewWiredList(block.WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: instrument.NewOptions(), + ClockOptions: clock.NewOptions(), + // Use a small channel to stress-test the implementation + EventsChannelSize: 1, + }) + wl.Start() + defer wl.Stop() + + var ( + blOpts = testDatabaseOptions().DatabaseBlockOptions() + blPool = block.NewDatabaseBlockPool( + // Small pool size to make any pooling issues more + // likely to manifest. + pool.NewObjectPoolOptions().SetSize(5), + ) + ) + blPool.Init(func() block.DatabaseBlock { + return block.NewDatabaseBlock(time.Time{}, 0, ts.Segment{}, blOpts) + }) + + var ( + opts = testDatabaseOptions().SetDatabaseBlockOptions( + blOpts. + SetWiredList(wl). + SetDatabaseBlockPool(blPool), + ) + shard = testDatabaseShard(t, opts) + id = ident.StringID("foo") + series = series.NewDatabaseSeries(id, ident.Tags{}, shard.seriesOpts) + ) + + series.Reset(id, ident.Tags{}, nil, shard.seriesOnRetrieveBlock, shard, shard.seriesOpts) + series.Bootstrap(nil) + shard.Lock() + shard.insertNewShardEntryWithLock(lookup.NewEntry(series, 0)) + shard.Unlock() + + var ( + wg = sync.WaitGroup{} + doneCh = make(chan struct{}) + ) + go func() { + // Try and trigger any pooling issues + for { + select { + case <-doneCh: + return + default: + bl := blPool.Get() + bl.ResetRetrievable(time.Time{}, 2*time.Hour, nil, block.RetrievableBlockMetadata{}) + bl.Close() + } + } + }() + + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + blTime := time.Time{}.Add(2 * time.Hour) + shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) + shard.OnEvictedFromWiredList(id, blTime) + wg.Done() + }() + } + + wg.Wait() + close(doneCh) +} From bbdf6d62440c18ad69e482636621c8770fbd3fd0 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 10:52:08 -0400 Subject: [PATCH 02/17] Remove explicit panic --- src/dbnode/storage/block/block.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index 83e7292e1e..b4dc87ef87 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -425,7 +425,6 @@ func (b *dbBlock) Close() { func (b *dbBlock) closeAndDiscard() ts.Segment { b.Lock() if b.closed { - panic("DOUBLE CLOSE") b.Unlock() return ts.Segment{} } From d988413d752434488b3f5d2efd1d09e756414232 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 10:53:50 -0400 Subject: [PATCH 03/17] Remove unused test --- src/dbnode/storage/block/wired_list_test.go | 45 --------------------- 1 file changed, 45 deletions(-) diff --git a/src/dbnode/storage/block/wired_list_test.go b/src/dbnode/storage/block/wired_list_test.go index 4dfd861403..40700a0467 100644 --- a/src/dbnode/storage/block/wired_list_test.go +++ b/src/dbnode/storage/block/wired_list_test.go @@ -166,51 +166,6 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { require.Equal(t, &l.root, l.root.prev()) } -func TestDeadlock(t *testing.T) { - wiredListEventsChannelLength = 1 - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - l, _ := newTestWiredList(runtime.NewOptions().SetMaxWiredBlocks(1), nil) - - opts := testOptions.SetWiredList(l) - - // l.Start() - bl := newTestUnwireableBlock(ctrl, fmt.Sprintf("foo.%d", 0), opts) - - l.Start() - l.Update(bl) - l.Update(bl) - l.Stop() - - // Order due to LRU should be: 1, 0 - require.Equal(t, bl, l.root.next()) - // require.Equal(t, blocks[0], l.root.next().next()) - - // Unwire block and assert removed - // blocks[1].closed = true - - // l.Start() - // l.Update(blocks[1]) - // l.Stop() - - // require.Equal(t, 1, l.length) - // require.Equal(t, blocks[0], l.root.next()) - // require.Equal(t, &l.root, l.root.next().next()) - - // // Unwire final block and assert removed - // blocks[0].closed = true - - // l.Start() - // l.Update(blocks[0]) - // l.Stop() - - // require.Equal(t, 0, l.length) - // require.Equal(t, &l.root, l.root.next()) - // require.Equal(t, &l.root, l.root.prev()) -} - // wiredListTestWiredBlocksString is used to debug the order of the wired list func wiredListTestWiredBlocksString(l *WiredList) string { // nolint: unused b := bytes.NewBuffer(nil) From 3467a28cf13a7b3f22ba0870acd796c7d26d75cc Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 13:15:09 -0400 Subject: [PATCH 04/17] Make wired list events channel size configurable; --- src/cmd/services/m3dbnode/config/cache.go | 3 ++- src/dbnode/server/server.go | 20 ++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index b9e2207015..7e29c6e040 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -47,5 +47,6 @@ type SeriesCacheConfiguration struct { // LRUSeriesCachePolicyConfiguration contains configuration for the LRU // series caching policy. type LRUSeriesCachePolicyConfiguration struct { - MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"` + MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"` + EventsChannelSize uint `yaml:"eventsChannelSize" validate:"nonzero"` } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 80a9b7e8c9..63c04ba915 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1003,12 +1003,20 @@ func withEncodingAndPoolingOptions( SetBytesPool(bytesPool) if opts.SeriesCachePolicy() == series.CacheLRU { - runtimeOpts := opts.RuntimeOptionsManager() - wiredList := block.NewWiredList(block.WiredListOptions{ - RuntimeOptionsManager: runtimeOpts, - InstrumentOptions: iopts, - ClockOptions: opts.ClockOptions(), - }) + var ( + runtimeOpts = opts.RuntimeOptionsManager() + wiredListOpts = block.WiredListOptions{ + RuntimeOptionsManager: runtimeOpts, + InstrumentOptions: iopts, + ClockOptions: opts.ClockOptions(), + } + lruCfg = cfg.Cache.SeriesConfiguration().LRU + ) + + if lruCfg != nil && lruCfg.EventsChannelSize > 0 { + wiredListOpts.EventsChannelSize = lruCfg.EventsChannelSize + } + wiredList := block.NewWiredList(wiredListOpts) blockOpts = blockOpts.SetWiredList(wiredList) } blockPool := block.NewDatabaseBlockPool(poolOptions(policy.BlockPool, From e4dbcf4546efa90e8c9924b79b75dd879b04c031 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 13:39:21 -0400 Subject: [PATCH 05/17] Improve unlock defer --- src/dbnode/storage/block/block.go | 4 ++-- src/dbnode/storage/series/series.go | 8 +++++++- .../storage/series_wired_list_interaction_test.go | 13 ++++++++----- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index b4dc87ef87..dd6797f768 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -196,8 +196,8 @@ func (b *dbBlock) OnRetrieveBlock( defer b.Unlock() if b.closed || - !id.Equal(b.retrieveID) { - // !startTime.Equal(b.startWithRLock()) { + !id.Equal(b.retrieveID) || + !startTime.Equal(b.startWithRLock()) { return } diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index e6fc97f577..dcf734431d 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -520,9 +520,14 @@ func (s *dbSeries) OnRetrieveBlock( segment ts.Segment, ) { s.Lock() + shouldUnlock := true + defer func() { + if shouldUnlock { + s.Unlock() + } + }() if !id.Equal(s.id) { - s.Unlock() return } @@ -551,6 +556,7 @@ func (s *dbSeries) OnRetrieveBlock( s.addBlockWithLock(b) list := s.opts.DatabaseBlockOptions().WiredList() + shouldUnlock = false s.Unlock() if list != nil { diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 0f6f1503c7..bef3865c21 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -73,8 +73,9 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { shard.Unlock() var ( - wg = sync.WaitGroup{} - doneCh = make(chan struct{}) + wg = sync.WaitGroup{} + doneCh = make(chan struct{}) + blockSize = 2 * time.Hour ) go func() { // Try and trigger any pooling issues @@ -84,18 +85,20 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { return default: bl := blPool.Get() - bl.ResetRetrievable(time.Time{}, 2*time.Hour, nil, block.RetrievableBlockMetadata{}) + bl.ResetRetrievable(time.Time{}, blockSize, nil, block.RetrievableBlockMetadata{}) bl.Close() } } }() + start := time.Now().Truncate(blockSize) for i := 0; i < 1000; i++ { wg.Add(1) go func() { - blTime := time.Time{}.Add(2 * time.Hour) + blTime := start + start = start.Add(blockSize) shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) - shard.OnEvictedFromWiredList(id, blTime) + // shard.OnEvictedFromWiredList(id, blTime) wg.Done() }() } From 354fc35053f1be67451c505048cb45628e2fa783 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 13:43:16 -0400 Subject: [PATCH 06/17] Update comment --- src/dbnode/storage/series/series.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index dcf734431d..adb05f5820 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -564,8 +564,8 @@ func (s *dbSeries) OnRetrieveBlock( // can enter the list (OnReadBlock is only called for blocks that // were read from memory, regardless of whether the data originated // from disk or a buffer rotation.) - // Also, doing this outside of the lock is safe because updating the - // wired list is asynchronous anyways (Update just puts the block in + // Doing this outside of the lock is safe because updating the + // wired list is asynchronous already (Update just puts the block in // a channel to be processed later.) list.Update(b) } From f2f9d15a86913ddac85a2e2829b82ed6093985ca Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 14:17:12 -0400 Subject: [PATCH 07/17] Add CloseIfFromDisk and use in wired list --- src/dbnode/storage/block/block.go | 28 +++++++++++++++---- src/dbnode/storage/block/block_test.go | 12 ++++++++ src/dbnode/storage/block/types.go | 5 ++++ src/dbnode/storage/block/wired_list.go | 13 +++++++-- .../series_wired_list_interaction_test.go | 1 - 5 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index dd6797f768..a3a6133f61 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -414,19 +414,37 @@ func (b *dbBlock) resetRetrievableWithLock( } func (b *dbBlock) Discard() ts.Segment { - return b.closeAndDiscard() + seg, _ := b.closeAndDiscardConditionally(nil) + return seg } func (b *dbBlock) Close() { - segment := b.closeAndDiscard() + segment, _ := b.closeAndDiscardConditionally(nil) segment.Finalize() } -func (b *dbBlock) closeAndDiscard() ts.Segment { +func (b *dbBlock) CloseIfFromDisk() bool { + segment, ok := b.closeAndDiscardConditionally(func(b *dbBlock) bool { + return b.wasRetrievedFromDisk + }) + if !ok { + return false + } + segment.Finalize() + return true +} + +func (b *dbBlock) closeAndDiscardConditionally(condition func(b *dbBlock) bool) (ts.Segment, bool) { b.Lock() + + if condition != nil && !condition(b) { + b.Unlock() + return ts.Segment{}, false + } + if b.closed { b.Unlock() - return ts.Segment{} + return ts.Segment{}, true } segment := b.segment @@ -439,7 +457,7 @@ func (b *dbBlock) closeAndDiscard() ts.Segment { pool.Put(b) } - return segment + return segment, true } func (b *dbBlock) resetMergeTargetWithLock() { diff --git a/src/dbnode/storage/block/block_test.go b/src/dbnode/storage/block/block_test.go index c27718217e..84e74ab6ad 100644 --- a/src/dbnode/storage/block/block_test.go +++ b/src/dbnode/storage/block/block_test.go @@ -551,6 +551,18 @@ func TestDatabaseBlockStreamMergePerformsCopy(t *testing.T) { require.NoError(t, iter.Err()) } +func TestDatabaseBlockCloseIfFromDisk(t *testing.T) { + var ( + blockOpts = NewOptions() + blockNotFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock) + blockFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock) + ) + blockFromDisk.wasRetrievedFromDisk = true + + require.False(t, blockNotFromDisk.CloseIfFromDisk()) + require.True(t, blockFromDisk.CloseIfFromDisk()) +} + func TestDatabaseSeriesBlocksAddBlock(t *testing.T) { now := time.Now() blockTimes := []time.Time{now, now.Add(time.Second), now.Add(time.Minute), now.Add(-time.Second), now.Add(-time.Hour)} diff --git a/src/dbnode/storage/block/types.go b/src/dbnode/storage/block/types.go index ba62328bc3..54cb6f2bdb 100644 --- a/src/dbnode/storage/block/types.go +++ b/src/dbnode/storage/block/types.go @@ -197,6 +197,11 @@ type DatabaseBlock interface { // Close closes the block. Close() + // CloseIfFromDisk atomically checks if the disk was retrieved from disk, and + // if so, closes it. It is meant as a layered protection for the WiredList + // which should only close blocks that were retrieved from disk. + CloseIfFromDisk() bool + // SetOnEvictedFromWiredList sets the owner of the block SetOnEvictedFromWiredList(OnEvictedFromWiredList) diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index 12bbb90617..aa60008884 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -263,9 +263,10 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { // is single-threaded. invariantLogger := instrument.EmitInvariantViolationAndGetLogger(l.iOpts) invariantLogger.WithFields( + xlog.NewField("blockStart", entry.startTime), xlog.NewField("closed", entry.closed), xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), - ).Errorf("wired list tried to process a block that was not unwireable") + ).Errorf("wired list tried to process a block that was not retrieved from disk") } // Evict the block before closing it so that callers of series.ReadEncoded() @@ -279,7 +280,15 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { // the block from the wired list before we close it. nextBl := bl.next() l.remove(bl) - bl.Close() + if wasFromDisk := bl.CloseIfFromDisk(); !wasFromDisk { + // Should never happen + invariantLogger := instrument.EmitInvariantViolationAndGetLogger(l.iOpts) + invariantLogger.WithFields( + xlog.NewField("blockStart", entry.startTime), + xlog.NewField("closed", entry.closed), + xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), + ).Errorf("wired list tried to close a block that was not from disk") + } l.metrics.evicted.Inc(1) diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index bef3865c21..70737a0399 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -98,7 +98,6 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { blTime := start start = start.Add(blockSize) shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) - // shard.OnEvictedFromWiredList(id, blTime) wg.Done() }() } From 852d3436e1c2f11275e1cfb24ef192086c6108ef Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 14:24:26 -0400 Subject: [PATCH 08/17] Fix compilation error in server setup code --- src/dbnode/server/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 63c04ba915..113be529d6 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -326,7 +326,7 @@ func Run(runOpts RunOptions) { opts = opts.SetSeriesCachePolicy(seriesCachePolicy) // Apply pooling options - opts = withEncodingAndPoolingOptions(logger, opts, cfg.PoolingPolicy) + opts = withEncodingAndPoolingOptions(cfg, logger, opts, cfg.PoolingPolicy) // Setup the block retriever switch seriesCachePolicy { @@ -882,6 +882,7 @@ func kvWatchBootstrappers( } func withEncodingAndPoolingOptions( + cfg config.DBConfiguration, logger xlog.Logger, opts storage.Options, policy config.PoolingPolicy, From 86fb55affdcb8ffb5213d7c3ba6944c2e009cb8e Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 14:29:03 -0400 Subject: [PATCH 09/17] Fix compilation error in server setup code --- src/dbnode/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 113be529d6..7d6415c552 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1015,7 +1015,7 @@ func withEncodingAndPoolingOptions( ) if lruCfg != nil && lruCfg.EventsChannelSize > 0 { - wiredListOpts.EventsChannelSize = lruCfg.EventsChannelSize + wiredListOpts.EventsChannelSize = int(lruCfg.EventsChannelSize) } wiredList := block.NewWiredList(wiredListOpts) blockOpts = blockOpts.SetWiredList(wiredList) From 91434760fc8e6661cf1ee48a5618c99a7e3f2017 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 14:42:40 -0400 Subject: [PATCH 10/17] Update mocks --- src/dbnode/storage/block/block_mock.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/dbnode/storage/block/block_mock.go b/src/dbnode/storage/block/block_mock.go index 99cc893367..9418eda897 100644 --- a/src/dbnode/storage/block/block_mock.go +++ b/src/dbnode/storage/block/block_mock.go @@ -461,6 +461,18 @@ func (mr *MockDatabaseBlockMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDatabaseBlock)(nil).Close)) } +// CloseIfFromDisk mocks base method +func (m *MockDatabaseBlock) CloseIfFromDisk() bool { + ret := m.ctrl.Call(m, "CloseIfFromDisk") + ret0, _ := ret[0].(bool) + return ret0 +} + +// CloseIfFromDisk indicates an expected call of CloseIfFromDisk +func (mr *MockDatabaseBlockMockRecorder) CloseIfFromDisk() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseIfFromDisk", reflect.TypeOf((*MockDatabaseBlock)(nil).CloseIfFromDisk)) +} + // SetOnEvictedFromWiredList mocks base method func (m *MockDatabaseBlock) SetOnEvictedFromWiredList(arg0 OnEvictedFromWiredList) { m.ctrl.Call(m, "SetOnEvictedFromWiredList", arg0) From 60965982a7bc30a4a276c6f79a073c86de04631a Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 14:53:58 -0400 Subject: [PATCH 11/17] Fix race in test --- .../series_wired_list_interaction_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 70737a0399..5c7fb6ccca 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -91,12 +91,22 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { } }() - start := time.Now().Truncate(blockSize) + var ( + start = time.Now().Truncate(blockSize) + startLock = sync.Mutex{} + getAndIncStart = func() time.Time { + startLock.Lock() + t := start + start = start.Add(blockSize) + startLock.Unlock() + return t + } + ) + for i := 0; i < 1000; i++ { wg.Add(1) go func() { - blTime := start - start = start.Add(blockSize) + blTime := getAndIncStart() shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) wg.Done() }() From 81ae4a87cffefc2c6e62fd120f5ecde2a4522b89 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 15:26:57 -0400 Subject: [PATCH 12/17] Properly measure time spent in LRU --- src/dbnode/storage/block/block.go | 14 ++++---- src/dbnode/storage/block/block_mock.go | 48 +++++++++++++------------- src/dbnode/storage/block/types.go | 4 +-- src/dbnode/storage/block/wired_list.go | 6 ++-- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index a3a6133f61..5d77905699 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -73,9 +73,9 @@ type dbBlock struct { } type listState struct { - next DatabaseBlock - prev DatabaseBlock - nextPrevUpdatedAtUnixNano int64 + next DatabaseBlock + prev DatabaseBlock + enteredListAtUnixNano int64 } // NewDatabaseBlock creates a new DatabaseBlock instance. @@ -488,13 +488,13 @@ func (b *dbBlock) setPrev(value DatabaseBlock) { } // Should only be used by the WiredList. -func (b *dbBlock) nextPrevUpdatedAtUnixNano() int64 { - return b.listState.nextPrevUpdatedAtUnixNano +func (b *dbBlock) enteredListAtUnixNano() int64 { + return b.listState.enteredListAtUnixNano } // Should only be used by the WiredList. -func (b *dbBlock) setNextPrevUpdatedAtUnixNano(value int64) { - b.listState.nextPrevUpdatedAtUnixNano = value +func (b *dbBlock) setEnteredListAtUnixNano(value int64) { + b.listState.enteredListAtUnixNano = value } // wiredListEntry is a snapshot of a subset of the block's state that the WiredList diff --git a/src/dbnode/storage/block/block_mock.go b/src/dbnode/storage/block/block_mock.go index 9418eda897..e6e757a660 100644 --- a/src/dbnode/storage/block/block_mock.go +++ b/src/dbnode/storage/block/block_mock.go @@ -539,26 +539,26 @@ func (mr *MockDatabaseBlockMockRecorder) setPrev(block interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setPrev", reflect.TypeOf((*MockDatabaseBlock)(nil).setPrev), block) } -// nextPrevUpdatedAtUnixNano mocks base method -func (m *MockDatabaseBlock) nextPrevUpdatedAtUnixNano() int64 { - ret := m.ctrl.Call(m, "nextPrevUpdatedAtUnixNano") +// enteredListAtUnixNano mocks base method +func (m *MockDatabaseBlock) enteredListAtUnixNano() int64 { + ret := m.ctrl.Call(m, "enteredListAtUnixNano") ret0, _ := ret[0].(int64) return ret0 } -// nextPrevUpdatedAtUnixNano indicates an expected call of nextPrevUpdatedAtUnixNano -func (mr *MockDatabaseBlockMockRecorder) nextPrevUpdatedAtUnixNano() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "nextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).nextPrevUpdatedAtUnixNano)) +// enteredListAtUnixNano indicates an expected call of enteredListAtUnixNano +func (mr *MockDatabaseBlockMockRecorder) enteredListAtUnixNano() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "enteredListAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).enteredListAtUnixNano)) } -// setNextPrevUpdatedAtUnixNano mocks base method -func (m *MockDatabaseBlock) setNextPrevUpdatedAtUnixNano(value int64) { - m.ctrl.Call(m, "setNextPrevUpdatedAtUnixNano", value) +// setEnteredListAtUnixNano mocks base method +func (m *MockDatabaseBlock) setEnteredListAtUnixNano(value int64) { + m.ctrl.Call(m, "setEnteredListAtUnixNano", value) } -// setNextPrevUpdatedAtUnixNano indicates an expected call of setNextPrevUpdatedAtUnixNano -func (mr *MockDatabaseBlockMockRecorder) setNextPrevUpdatedAtUnixNano(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setNextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).setNextPrevUpdatedAtUnixNano), value) +// setEnteredListAtUnixNano indicates an expected call of setEnteredListAtUnixNano +func (mr *MockDatabaseBlockMockRecorder) setEnteredListAtUnixNano(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setEnteredListAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).setEnteredListAtUnixNano), value) } // wiredListEntry mocks base method @@ -640,26 +640,26 @@ func (mr *MockdatabaseBlockMockRecorder) setPrev(block interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setPrev", reflect.TypeOf((*MockdatabaseBlock)(nil).setPrev), block) } -// nextPrevUpdatedAtUnixNano mocks base method -func (m *MockdatabaseBlock) nextPrevUpdatedAtUnixNano() int64 { - ret := m.ctrl.Call(m, "nextPrevUpdatedAtUnixNano") +// enteredListAtUnixNano mocks base method +func (m *MockdatabaseBlock) enteredListAtUnixNano() int64 { + ret := m.ctrl.Call(m, "enteredListAtUnixNano") ret0, _ := ret[0].(int64) return ret0 } -// nextPrevUpdatedAtUnixNano indicates an expected call of nextPrevUpdatedAtUnixNano -func (mr *MockdatabaseBlockMockRecorder) nextPrevUpdatedAtUnixNano() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "nextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).nextPrevUpdatedAtUnixNano)) +// enteredListAtUnixNano indicates an expected call of enteredListAtUnixNano +func (mr *MockdatabaseBlockMockRecorder) enteredListAtUnixNano() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "enteredListAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).enteredListAtUnixNano)) } -// setNextPrevUpdatedAtUnixNano mocks base method -func (m *MockdatabaseBlock) setNextPrevUpdatedAtUnixNano(value int64) { - m.ctrl.Call(m, "setNextPrevUpdatedAtUnixNano", value) +// setEnteredListAtUnixNano mocks base method +func (m *MockdatabaseBlock) setEnteredListAtUnixNano(value int64) { + m.ctrl.Call(m, "setEnteredListAtUnixNano", value) } -// setNextPrevUpdatedAtUnixNano indicates an expected call of setNextPrevUpdatedAtUnixNano -func (mr *MockdatabaseBlockMockRecorder) setNextPrevUpdatedAtUnixNano(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setNextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).setNextPrevUpdatedAtUnixNano), value) +// setEnteredListAtUnixNano indicates an expected call of setEnteredListAtUnixNano +func (mr *MockdatabaseBlockMockRecorder) setEnteredListAtUnixNano(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setEnteredListAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).setEnteredListAtUnixNano), value) } // wiredListEntry mocks base method diff --git a/src/dbnode/storage/block/types.go b/src/dbnode/storage/block/types.go index 54cb6f2bdb..97fa6eea7d 100644 --- a/src/dbnode/storage/block/types.go +++ b/src/dbnode/storage/block/types.go @@ -218,8 +218,8 @@ type databaseBlock interface { setNext(block DatabaseBlock) prev() DatabaseBlock setPrev(block DatabaseBlock) - nextPrevUpdatedAtUnixNano() int64 - setNextPrevUpdatedAtUnixNano(value int64) + enteredListAtUnixNano() int64 + setEnteredListAtUnixNano(value int64) wiredListEntry() wiredListEntry } diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index aa60008884..0218e177cd 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -242,7 +242,6 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { at.setNext(v) v.setPrev(at) v.setNext(n) - v.setNextPrevUpdatedAtUnixNano(now.UnixNano()) n.setPrev(v) l.length++ @@ -292,8 +291,8 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { l.metrics.evicted.Inc(1) - lastUpdatedAt := time.Unix(0, bl.nextPrevUpdatedAtUnixNano()) - l.metrics.evictedAfterDuration.Record(now.Sub(lastUpdatedAt)) + enteredListAt := time.Unix(0, bl.enteredListAtUnixNano()) + l.metrics.evictedAfterDuration.Record(now.Sub(enteredListAt)) bl = nextBl } @@ -320,6 +319,7 @@ func (l *WiredList) pushBack(v DatabaseBlock) { l.metrics.inserted.Inc(1) l.insertAfter(v, l.root.prev()) + v.setEnteredListAtUnixNano(l.nowFn().UnixNano()) } func (l *WiredList) moveToBack(v DatabaseBlock) { From a4c91ebb3c20e0d7aee75338115902f4542a1d28 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 16:25:25 -0400 Subject: [PATCH 13/17] Fix deadlock with readencoded --- src/dbnode/storage/block/wired_list.go | 16 +++++++++++-- src/dbnode/storage/block/wired_list_test.go | 18 +++++++-------- src/dbnode/storage/series/series.go | 23 +++++++++++++++---- .../series_wired_list_interaction_test.go | 5 ++++ 4 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index 0218e177cd..a2f540eb43 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -198,15 +198,27 @@ func (l *WiredList) Stop() error { return nil } -// Update places the block into the channel of blocks which are waiting to notify the +// BlockingUpdate places the block into the channel of blocks which are waiting to notify the // wired list that they were accessed. All updates must be processed through this channel // to force synchronization. // // We use a channel and a background processing goroutine to reduce blocking / lock contention. -func (l *WiredList) Update(v DatabaseBlock) { +func (l *WiredList) BlockingUpdate(v DatabaseBlock) { l.updatesCh <- v } +// NonBlockingUpdate will attempt to put the block in the events channel, but will not block +// if the channel is full. Used in cases where a blocking update could trigger deadlock with +// the WiredList itself. +func (l *WiredList) NonBlockingUpdate(v DatabaseBlock) bool { + select { + case l.updatesCh <- v: + return true + default: + return false + } +} + // processUpdateBlock inspects a block that has been modified or read recently // and determines what outcome its state should have on the wired list. func (l *WiredList) processUpdateBlock(v DatabaseBlock) { diff --git a/src/dbnode/storage/block/wired_list_test.go b/src/dbnode/storage/block/wired_list_test.go index 40700a0467..baf65c0c30 100644 --- a/src/dbnode/storage/block/wired_list_test.go +++ b/src/dbnode/storage/block/wired_list_test.go @@ -98,10 +98,10 @@ func TestWiredListInsertsAndUpdatesWiredBlocks(t *testing.T) { blocks = append(blocks, bl) } - l.Update(blocks[0]) - l.Update(blocks[1]) - l.Update(blocks[2]) - l.Update(blocks[1]) + l.BlockingUpdate(blocks[0]) + l.BlockingUpdate(blocks[1]) + l.BlockingUpdate(blocks[2]) + l.BlockingUpdate(blocks[1]) l.Stop() @@ -133,9 +133,9 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks = append(blocks, bl) } - l.Update(blocks[0]) - l.Update(blocks[1]) - l.Update(blocks[0]) + l.BlockingUpdate(blocks[0]) + l.BlockingUpdate(blocks[1]) + l.BlockingUpdate(blocks[0]) l.Stop() @@ -147,7 +147,7 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks[1].closed = true l.Start() - l.Update(blocks[1]) + l.BlockingUpdate(blocks[1]) l.Stop() require.Equal(t, 1, l.length) @@ -158,7 +158,7 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks[0].closed = true l.Start() - l.Update(blocks[0]) + l.BlockingUpdate(blocks[0]) l.Stop() require.Equal(t, 0, l.length) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index adb05f5820..2c37822317 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -560,14 +560,20 @@ func (s *dbSeries) OnRetrieveBlock( s.Unlock() if list != nil { - // Need to update the WiredList so blocks that were read from disk + // 1) We need to update the WiredList so that blocks that were read from disk // can enter the list (OnReadBlock is only called for blocks that // were read from memory, regardless of whether the data originated // from disk or a buffer rotation.) - // Doing this outside of the lock is safe because updating the + // 2) We must perform this action outside of the lock to prevent deadlock + // with the WiredList itself when it tries to call OnEvictedFromWiredList + // on the same series that is trying to perform a blocking update. + // 3) Doing this outside of the lock is safe because updating the // wired list is asynchronous already (Update just puts the block in // a channel to be processed later.) - list.Update(b) + // 4) We have to perform a blocking update because in this flow, the block + // is not already in the wired list so we need to make sure that the WiredList + // takes control of its lifecycle. + list.BlockingUpdate(b) } } @@ -578,9 +584,16 @@ func (s *dbSeries) OnReadBlock(b block.DatabaseBlock) { // The WiredList is only responsible for managing the lifecycle of blocks // retrieved from disk. if b.WasRetrievedFromDisk() { - // Need to update the WiredList so it knows which blocks have been + // 1) Need to update the WiredList so it knows which blocks have been // most recently read. - list.Update(b) + // 2) We do a non-blocking update here to prevent deadlock with the + // WiredList calling OnEvictedFromWiredList on the same series since + // OnReadBlock is usually called within the context of a read lock + // on this series. + // 3) Its safe to do a non-blocking update because the wired list has + // already been exposed to this block, so even if the wired list drops + // this update, it will still manage this blocks lifecycle. + list.NonBlockingUpdate(b) } } } diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 5c7fb6ccca..05f0ed576b 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -11,9 +11,11 @@ import ( "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" + "github.com/stretchr/testify/require" ) // TestSeriesWiredListConcurrentInteractions was added as a regression test @@ -108,6 +110,9 @@ func TestSeriesWiredListConcurrentInteractions(t *testing.T) { go func() { blTime := getAndIncStart() shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) + // Simulate concurrent reads + _, err := shard.ReadEncoded(context.NewContext(), id, blTime, blTime.Add(blockSize)) + require.NoError(t, err) wg.Done() }() } From cfbbb5c670585db2bc75b7f6795524c2dd5a5f61 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 16:31:22 -0400 Subject: [PATCH 14/17] Fix import order --- src/dbnode/storage/series_wired_list_interaction_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 05f0ed576b..3875260b02 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" @@ -15,7 +17,6 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" - "github.com/stretchr/testify/require" ) // TestSeriesWiredListConcurrentInteractions was added as a regression test From 3d4860bf46ca82928589cd7d80c3e95f10798d27 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 28 Aug 2018 16:42:03 -0400 Subject: [PATCH 15/17] Fix import order --- src/dbnode/storage/series_wired_list_interaction_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go index 3875260b02..46cc29cea2 100644 --- a/src/dbnode/storage/series_wired_list_interaction_test.go +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" @@ -17,6 +15,8 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" + + "github.com/stretchr/testify/require" ) // TestSeriesWiredListConcurrentInteractions was added as a regression test From 6599338143499e0b79c54610496072fd816901ab Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 29 Aug 2018 10:23:04 -0400 Subject: [PATCH 16/17] Refactor defer in OnRetrieveBlock --- src/dbnode/storage/series/series.go | 46 ++++++++++++++--------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 2c37822317..275f0bfbde 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -519,11 +519,28 @@ func (s *dbSeries) OnRetrieveBlock( startTime time.Time, segment ts.Segment, ) { + var ( + b block.DatabaseBlock + list *block.WiredList + ) s.Lock() - shouldUnlock := true defer func() { - if shouldUnlock { - s.Unlock() + s.Unlock() + if b != nil && list != nil { + // 1) We need to update the WiredList so that blocks that were read from disk + // can enter the list (OnReadBlock is only called for blocks that + // were read from memory, regardless of whether the data originated + // from disk or a buffer rotation.) + // 2) We must perform this action outside of the lock to prevent deadlock + // with the WiredList itself when it tries to call OnEvictedFromWiredList + // on the same series that is trying to perform a blocking update. + // 3) Doing this outside of the lock is safe because updating the + // wired list is asynchronous already (Update just puts the block in + // a channel to be processed later.) + // 4) We have to perform a blocking update because in this flow, the block + // is not already in the wired list so we need to make sure that the WiredList + // takes control of its lifecycle. + list.BlockingUpdate(b) } }() @@ -531,7 +548,7 @@ func (s *dbSeries) OnRetrieveBlock( return } - b := s.opts.DatabaseBlockOptions().DatabaseBlockPool().Get() + b = s.opts.DatabaseBlockOptions().DatabaseBlockPool().Get() metadata := block.RetrievableBlockMetadata{ ID: s.id, Length: segment.Len(), @@ -555,26 +572,7 @@ func (s *dbSeries) OnRetrieveBlock( // If we retrieved this from disk then we directly emplace it s.addBlockWithLock(b) - list := s.opts.DatabaseBlockOptions().WiredList() - shouldUnlock = false - s.Unlock() - - if list != nil { - // 1) We need to update the WiredList so that blocks that were read from disk - // can enter the list (OnReadBlock is only called for blocks that - // were read from memory, regardless of whether the data originated - // from disk or a buffer rotation.) - // 2) We must perform this action outside of the lock to prevent deadlock - // with the WiredList itself when it tries to call OnEvictedFromWiredList - // on the same series that is trying to perform a blocking update. - // 3) Doing this outside of the lock is safe because updating the - // wired list is asynchronous already (Update just puts the block in - // a channel to be processed later.) - // 4) We have to perform a blocking update because in this flow, the block - // is not already in the wired list so we need to make sure that the WiredList - // takes control of its lifecycle. - list.BlockingUpdate(b) - } + list = s.opts.DatabaseBlockOptions().WiredList() } // OnReadBlock is only called for blocks that were read from memory, regardless of From 45cbf94a027c1170deac5a0b7dff16f6fb546d75 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 29 Aug 2018 10:42:56 -0400 Subject: [PATCH 17/17] update comment --- src/dbnode/storage/block/wired_list.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index a2f540eb43..b9c41efa30 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -286,9 +286,9 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { onEvict.OnEvictedFromWiredList(entry.retrieveID, entry.startTime) } - // bl.Close() will return the block to the pool. In order to avoid races - // with the pool itself, we capture the value of the next block and remove - // the block from the wired list before we close it. + // bl.CloseIfFromDisk() will return the block to the pool. In order to avoid + // races with the pool itself, we capture the value of the next block and + // remove the block from the wired list before we close it. nextBl := bl.next() l.remove(bl) if wasFromDisk := bl.CloseIfFromDisk(); !wasFromDisk {