Skip to content

Commit

Permalink
Optimize fetch pathway to avoid decoding more data than required to s…
Browse files Browse the repository at this point in the history
…atisfy consistency (#1966)
  • Loading branch information
Richard Artoul authored Oct 11, 2019
1 parent 5a830ae commit 1260780
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,7 @@ func (s *session) fetchIDsAttempt(
resultErr error
resultErrs int32
majority int32
numReplicas int32
consistencyLevel topology.ReadConsistencyLevel
fetchBatchOpsByHostIdx [][]*fetchBatchOp
success = false
Expand Down Expand Up @@ -1514,6 +1515,7 @@ func (s *session) fetchIDsAttempt(

consistencyLevel = s.state.readLevel
majority = int32(s.state.majority)
numReplicas = int32(s.state.replicas)

// NB(prateek): namespaceAccessors tracks the number of pending accessors for nsID.
// It is set to incremented by `replica` for each requested ID during fetch enqueuing,
Expand Down Expand Up @@ -1570,8 +1572,15 @@ func (s *session) fetchIDsAttempt(
resultErrLock.Unlock()
} else {
resultsLock.RLock()
successIters := results[:success]
numItersToInclude := int(success)
numDesired := topology.NumDesiredForReadConsistency(consistencyLevel, int(numReplicas), int(majority))
if numDesired < numItersToInclude {
// Avoid decoding more data than is required to satisfy the consistency guarantees.
numItersToInclude = numDesired
}
itersToInclude := results[:numItersToInclude]
resultsLock.RUnlock()

iter := s.pools.seriesIterator.Get()
// NB(prateek): we need to allocate a copy of ident.ID to allow the seriesIterator
// to have control over the lifecycle of ID. We cannot allow seriesIterator
Expand All @@ -1584,7 +1593,7 @@ func (s *session) fetchIDsAttempt(
Namespace: namespaceID,
StartInclusive: startInclusive,
EndExclusive: endExclusive,
Replicas: successIters,
Replicas: itersToInclude,
})
iters.SetAt(idx, iter)
}
Expand Down
16 changes: 16 additions & 0 deletions src/dbnode/topology/consistency_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,19 @@ func ReadConsistencyAchieved(
}
panic(fmt.Errorf("unrecognized consistency level: %s", level.String()))
}

// NumDesiredForReadConsistency returns the number of replicas that would ideally be used to
// satisfy the read consistency.
func NumDesiredForReadConsistency(level ReadConsistencyLevel, numReplicas, majority int) int {
switch level {
case ReadConsistencyLevelAll:
return numReplicas
case ReadConsistencyLevelMajority, ReadConsistencyLevelUnstrictMajority:
return majority
case ReadConsistencyLevelOne:
return 1
case ReadConsistencyLevelNone:
return 0
}
panic(fmt.Errorf("unrecognized consistency level: %s", level.String()))
}

0 comments on commit 1260780

Please sign in to comment.