Skip to content

Commit

Permalink
[FAB-5709] Infer LastOffsetPersisted correctly
Browse files Browse the repository at this point in the history
The `LastOffsetPersisted` of block produced by Kafka orderer
may be off by 1 in a corner case, see FAB-5709 for details.

This is fixed by having `Ordered` method returning a boolean
indicating if there's still message pending in the receiver.

Also, a hacky way of calculating offset in subtests is removed
in favor of querying sarama mock Consumer directly.

Change-Id: I7f0384cd9df498b686d0971696fdcca102c52a59
Signed-off-by: Jay Guo <[email protected]>
  • Loading branch information
guoger authored and mastersingh24 committed Aug 28, 2017
1 parent ae4e37d commit a3b40de
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 245 deletions.
45 changes: 21 additions & 24 deletions orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ var logger = logging.MustGetLogger("orderer/common/blockcutter")
type Receiver interface {
// Ordered should be invoked sequentially as messages are ordered
// If the current message valid, and no batches need to be cut:
// - Ordered will return nil, nil, and true (indicating ok).
// - Ordered will return nil, nil, and true (indicating valid Tx).
// If the current message valid, and batches need to be cut:
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok).
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating valid Tx).
// If the current message is invalid:
// - Ordered will return nil, nil, and false (to indicate not ok).
// - Ordered will return nil, nil, and false (to indicate invalid Tx).
//
// Given a valid message, if the current message needs to be isolated (as determined during filtering).
// - Ordered will return:
Expand All @@ -45,7 +45,9 @@ type Receiver interface {
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool)
//
// In any case, `pending` is set to true if there are still messages pending in the receiver after cutting the block.
Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committers [][]filter.Committer, validTx bool, pending bool)

// Cut returns the current batch and starts a new one
Cut() ([]*cb.Envelope, []filter.Committer)
Expand All @@ -69,29 +71,34 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet

// Ordered should be invoked sequentially as messages are ordered
// If the current message valid, and no batches need to be cut:
// - Ordered will return nil, nil, and true (indicating ok).
// - Ordered will return nil, nil, true (indicating valid tx) and true (indicating there are pending messages).
// If the current message valid, and batches need to be cut:
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating ok).
// - Ordered will return 1 or 2 batches of messages, 1 or 2 batches of committers, and true (indicating valid tx).
// If the current message is invalid:
// - Ordered will return nil, nil, and false (to indicate not ok).
// - Ordered will return nil, nil, and false (to indicate invalid tx).
//
// Given a valid message, if the current message needs to be isolated (as determined during filtering).
// - Ordered will return:
// * The pending batch of (if not empty), and a second batch containing only the isolated message.
// * The corresponding batches of committers.
// * true (indicating ok).
// * true (indicating valid tx).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) {
//
// In any case, `pending` is set to true if there are still messages pending in the receiver after cutting the block.
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committerBatches [][]filter.Committer, validTx bool, pending bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
committer, err := r.filters.Apply(msg)
if err != nil {
logger.Debugf("Rejecting message: %s", err)
return nil, nil, false
return // We don't bother to determine `pending` here as it's not processed in error case
}

// message is valid
validTx = true

messageSizeBytes := messageSizeBytes(msg)

if committer.Isolated() || messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {
Expand All @@ -102,9 +109,6 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, r.sharedConfigManager.BatchSize().PreferredMaxBytes)
}

messageBatches := [][]*cb.Envelope{}
committerBatches := [][]filter.Committer{}

// cut pending batch, if it has any messages
if len(r.pendingBatch) > 0 {
messageBatch, committerBatch := r.Cut()
Expand All @@ -116,12 +120,9 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
messageBatches = append(messageBatches, []*cb.Envelope{msg})
committerBatches = append(committerBatches, []filter.Committer{committer})

return messageBatches, committerBatches, true
return
}

messageBatches := [][]*cb.Envelope{}
committerBatches := [][]filter.Committer{}

messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes

if messageWillOverflowBatchSizeBytes {
Expand All @@ -136,21 +137,17 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
r.pendingCommitters = append(r.pendingCommitters, committer)
pending = true

if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount {
logger.Debugf("Batch size met, cutting batch")
messageBatch, committerBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
committerBatches = append(committerBatches, committerBatch)
pending = false
}

// return nils instead of empty slices
if len(messageBatches) == 0 {
return nil, nil, true
}

return messageBatches, committerBatches, true

return
}

// Cut returns the current batch and starts a new one
Expand Down
Loading

0 comments on commit a3b40de

Please sign in to comment.