Skip to content

Commit

Permalink
[dbnode] Remove invariant possibility when reading from enqueue chann…
Browse files Browse the repository at this point in the history
…el (#1980)
  • Loading branch information
robskillington authored and Richard Artoul committed Oct 7, 2019
1 parent 3dd55f3 commit a6c0d4e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 38 deletions.
15 changes: 7 additions & 8 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 6 additions & 21 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,18 +2444,7 @@ func (s *session) streamBlocksFromPeers(
enqueueCh.trackProcessed(1)
}
)
enqueueChInputs, err := enqueueCh.get()
if err != nil {
instrument.EmitAndLogInvariantViolation(
s.opts.InstrumentOptions(), func(l *zap.Logger) {
l.Error(
"failed to get enqueueCh input channel",
zap.Error(err))
})
return err
}

for perPeerBlocksMetadata := range enqueueChInputs {
for perPeerBlocksMetadata := range enqueueCh.read() {
// Filter and select which blocks to retrieve from which peers
selected, pooled = s.selectPeersFromPerPeerBlockMetadatas(
perPeerBlocksMetadata, peerQueues, enqueueCh, consistencyLevel, peers,
Expand Down Expand Up @@ -3614,15 +3603,11 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD
return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil
}

func (c *enqueueCh) get() (<-chan []receivedBlockMetadata, error) {
c.Lock()
if c.closed {
c.Unlock()
return nil, errEnqueueChIsClosed
}
c.Unlock()
metadataCh := c.peersMetadataCh
return metadataCh, nil
// read is always safe to call since you can safely range
// over a closed channel, and/or do a checked read in case
// it is closed (unlike when publishing to a channel).
func (c *enqueueCh) read() <-chan []receivedBlockMetadata {
return c.peersMetadataCh
}

func (c *enqueueCh) trackPending(amount int) {
Expand Down
12 changes: 4 additions & 8 deletions src/dbnode/client/session_fetch_bulk_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,8 +1900,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) {
require.NoError(t, err)

require.Equal(t, numBlocks, enqueueCh.unprocessedLen())
enqueueChInputs, err := enqueueCh.get()
require.NoError(t, err)
enqueueChInputs := enqueueCh.read()
require.Equal(t, 0, len(enqueueChInputs))

// Actually enqueue the blocks
Expand All @@ -1911,8 +1910,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) {
enqueueDelayedDone()

require.Equal(t, numBlocks, enqueueCh.unprocessedLen())
enqueueChInputs, err = enqueueCh.get()
require.NoError(t, err)
enqueueChInputs = enqueueCh.read()
require.Equal(t, numBlocks, len(enqueueChInputs))

// Process the blocks
Expand All @@ -1923,8 +1921,7 @@ func TestEnqueueChannelEnqueueDelayed(t *testing.T) {
}

require.Equal(t, 0, enqueueCh.unprocessedLen())
enqueueChInputs, err = enqueueCh.get()
require.NoError(t, err)
enqueueChInputs = enqueueCh.read()
require.Equal(t, 0, len(enqueueChInputs))
}

Expand Down Expand Up @@ -2493,8 +2490,7 @@ func assertEnqueueChannel(
var distinct []receivedBlockMetadata
for {
var perPeerBlocksMetadata []receivedBlockMetadata
enqueueChInputs, err := enqueueCh.get()
require.NoError(t, err)
enqueueChInputs := enqueueCh.read()

select {
case perPeerBlocksMetadata = <-enqueueChInputs:
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ type enqueueDelayedDoneFn func()
type enqueueChannel interface {
enqueue(peersMetadata []receivedBlockMetadata) error
enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueDelayedDoneFn, error)
get() (<-chan []receivedBlockMetadata, error)
// read is always safe to call since you can safely range
// over a closed channel, and/or do a checked read in case
// it is closed (unlike when publishing to a channel).
read() <-chan []receivedBlockMetadata
trackPending(amount int)
trackProcessed(amount int)
unprocessedLen() int
Expand Down

0 comments on commit a6c0d4e

Please sign in to comment.