From a6c0d4e04edfa7a321bab3b52e2b01aebd9028b4 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 7 Oct 2019 10:15:25 -0400 Subject: [PATCH] [dbnode] Remove invariant possibility when reading from enqueue channel (#1980) --- src/dbnode/client/client_mock.go | 15 +++++------ src/dbnode/client/session.go | 27 +++++-------------- .../client/session_fetch_bulk_blocks_test.go | 12 +++------ src/dbnode/client/types.go | 5 +++- 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 152a97e85a..936a5af48e 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -4672,19 +4672,18 @@ func (mr *MockenqueueChannelMockRecorder) enqueueDelayed(numToEnqueue interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "enqueueDelayed", reflect.TypeOf((*MockenqueueChannel)(nil).enqueueDelayed), numToEnqueue) } -// get mocks base method -func (m *MockenqueueChannel) get() (<-chan []receivedBlockMetadata, error) { +// read mocks base method +func (m *MockenqueueChannel) read() <-chan []receivedBlockMetadata { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "get") + ret := m.ctrl.Call(m, "read") ret0, _ := ret[0].(<-chan []receivedBlockMetadata) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } -// get indicates an expected call of get -func (mr *MockenqueueChannelMockRecorder) get() *gomock.Call { +// read indicates an expected call of read +func (mr *MockenqueueChannelMockRecorder) read() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "get", reflect.TypeOf((*MockenqueueChannel)(nil).get)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockenqueueChannel)(nil).read)) } // trackPending mocks base method diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index f9128f1a3b..3d0c15d17f 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -2444,18 +2444,7 @@ func (s *session) streamBlocksFromPeers( enqueueCh.trackProcessed(1) } ) - enqueueChInputs, err := enqueueCh.get() - if err != nil { - instrument.EmitAndLogInvariantViolation( - s.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error( - "failed to get enqueueCh input channel", - zap.Error(err)) - }) - return err - } - - for perPeerBlocksMetadata := range enqueueChInputs { + for perPeerBlocksMetadata := range enqueueCh.read() { // Filter and select which blocks to retrieve from which peers selected, pooled = s.selectPeersFromPerPeerBlockMetadatas( perPeerBlocksMetadata, peerQueues, enqueueCh, consistencyLevel, peers, @@ -3614,15 +3603,11 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil } -func (c *enqueueCh) get() (<-chan []receivedBlockMetadata, error) { - c.Lock() - if c.closed { - c.Unlock() - return nil, errEnqueueChIsClosed - } - c.Unlock() - metadataCh := c.peersMetadataCh - return metadataCh, nil +// read is always safe to call since you can safely range +// over a closed channel, and/or do a checked read in case +// it is closed (unlike when publishing to a channel). +func (c *enqueueCh) read() <-chan []receivedBlockMetadata { + return c.peersMetadataCh } func (c *enqueueCh) trackPending(amount int) { diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index 674905c4d0..6cf75193f8 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -1900,8 +1900,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) { require.NoError(t, err) require.Equal(t, numBlocks, enqueueCh.unprocessedLen()) - enqueueChInputs, err := enqueueCh.get() - require.NoError(t, err) + enqueueChInputs := enqueueCh.read() require.Equal(t, 0, len(enqueueChInputs)) // Actually enqueue the blocks @@ -1911,8 +1910,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) { enqueueDelayedDone() require.Equal(t, numBlocks, enqueueCh.unprocessedLen()) - enqueueChInputs, err = enqueueCh.get() - require.NoError(t, err) + enqueueChInputs = enqueueCh.read() require.Equal(t, numBlocks, len(enqueueChInputs)) // Process the blocks @@ -1923,8 +1921,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) { } require.Equal(t, 0, enqueueCh.unprocessedLen()) - enqueueChInputs, err = enqueueCh.get() - require.NoError(t, err) + enqueueChInputs = enqueueCh.read() require.Equal(t, 0, len(enqueueChInputs)) } @@ -2493,8 +2490,7 @@ func assertEnqueueChannel( var distinct []receivedBlockMetadata for { var perPeerBlocksMetadata []receivedBlockMetadata - enqueueChInputs, err := enqueueCh.get() - require.NoError(t, err) + enqueueChInputs := enqueueCh.read() select { case perPeerBlocksMetadata = <-enqueueChInputs: diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index 1c3fa90c7b..84ea5c90af 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -677,7 +677,10 @@ type enqueueDelayedDoneFn func() type enqueueChannel interface { enqueue(peersMetadata []receivedBlockMetadata) error enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueDelayedDoneFn, error) - get() (<-chan []receivedBlockMetadata, error) + // read is always safe to call since you can safely range + // over a closed channel, and/or do a checked read in case + // it is closed (unlike when publishing to a channel). + read() <-chan []receivedBlockMetadata trackPending(amount int) trackProcessed(amount int) unprocessedLen() int