Skip to content

Commit

Permalink
go/worker/executor: only remove incoming transactions on finalized round
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Sep 28, 2020
1 parent f89908f commit 4e50be2
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 75 deletions.
8 changes: 8 additions & 0 deletions .changelog/3332.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go/worker/executor: only remove incoming transactions on finalized round

Simplifies the executor to only remove transactions from the incoming queue
once a round is successfully finalized. Before, the proposing executor also
removed transactions when proposing a batch, which was an unneeded leftover
from before the transaction scheduler committee was merged with into executor.

Also fixes an edge case where batch was not reinserted on failed rounds.
65 changes: 31 additions & 34 deletions go/runtime/scheduling/simple/incoming_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (
errQueueFull = errors.New("queue is full")
errCallTooLarge = errors.New("call too large")
errCallAlreadyExists = errors.New("call already exists in queue")
errNoBatchAvailable = errors.New("no batch available in incoming queue")
)

type incomingQueue struct {
Expand Down Expand Up @@ -169,68 +168,66 @@ func (q *incomingQueue) AddBatch(batch transaction.RawBatch) error {
return errs
}

// Take attempts to take a batch from the incoming queue.
func (q *incomingQueue) Take(force bool) (transaction.RawBatch, error) {
// GetBatch attempts to get a batch from the incoming queue.
func (q *incomingQueue) GetBatch(force bool) transaction.RawBatch {
q.Lock()
defer q.Unlock()

// Check if we have a batch ready.
// Check if a batch is ready.
queueSize := uint64(len(q.queue))
if queueSize == 0 {
return nil, errNoBatchAvailable
}
if queueSize < q.maxBatchSize && q.queueSizeBytes < q.maxBatchSizeBytes && !force {
return nil, errNoBatchAvailable
return nil
}

// NOTE: These "returned" variables signify what will be returned back
// to the queue, not what will be returned from the function.
var returned transaction.RawBatch
var returnedSizeBytes uint64
returnedCallHashes := make(map[hash.Hash]bool)

var batch transaction.RawBatch
var batchSizeBytes uint64

for _, call := range q.queue[:] {
shouldReturn := false
callSize := uint64(len(call))

// Check if the batch already has enough calls.
if uint64(len(batch)) >= q.maxBatchSize {
shouldReturn = true
break
}
// Check if the call does not fit into a batch.
// Check if the call does fit into the batch.
// XXX: potentially there could still be smaller calls that would
// fit, which this will miss.
if batchSizeBytes+callSize > q.maxBatchSizeBytes {
shouldReturn = true
break
}

if shouldReturn {
returned = append(returned, call)
returnedSizeBytes += callSize

callHash := hash.NewFromBytes(call)
returnedCallHashes[callHash] = true
continue
}

// Take call.
batch = append(batch, call)
batchSizeBytes += callSize
}

q.queue = returned
q.queueSizeBytes = returnedSizeBytes
q.callHashes = returnedCallHashes

return batch, nil
return batch
}

func (q *incomingQueue) updateConfig(maxBatchSize, maxBatchSizeBytes uint64) {
q.Lock()
defer q.Unlock()
q.maxBatchSize = maxBatchSize
q.maxBatchSizeBytes = maxBatchSizeBytes

// Recheck the queue queue for any calls that are bigger than the updated
// `maxBatchSizeBytes`.
var newQueue transaction.RawBatch
var newQueueSize uint64
newCallHashes := make(map[hash.Hash]bool)
for _, call := range q.queue[:] {
callSize := uint64(len(call))
if callSize > maxBatchSizeBytes {
continue
}

newQueue = append(newQueue, call)
newQueueSize += callSize
callHash := hash.NewFromBytes(call)
newCallHashes[callHash] = true
}
// Update queue.
q.queue = newQueue
q.callHashes = newCallHashes
q.queueSizeBytes = newQueueSize
}

func newIncomingQueue(maxQueueSize, maxBatchSize, maxBatchSizeBytes uint64) *incomingQueue {
Expand Down
18 changes: 12 additions & 6 deletions go/runtime/scheduling/simple/incoming_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ func TestBasic(t *testing.T) {

require.EqualValues(t, 51, queue.Size(), "Size")

batch, err := queue.Take(false)
require.NoError(t, err, "Take")
batch := queue.GetBatch(false)
require.EqualValues(t, 10, len(batch), "Batch size")
require.EqualValues(t, 51, queue.Size(), "Size")

err = queue.RemoveBatch(batch)
require.NoError(t, err, "RemoveBatch")
require.EqualValues(t, 41, queue.Size(), "Size")

require.EqualValues(t, batch[0], []byte("hello world"))
Expand All @@ -55,8 +58,8 @@ func TestNoBatchReady(t *testing.T) {
err := queue.Add([]byte("hello world"))
require.NoError(t, err, "Add")

_, err = queue.Take(false)
require.Error(t, err, "Take error when no batch available and not forced")
batch := queue.GetBatch(false)
require.Empty(t, batch, "GetBatch empty if no batch available")
}

func TestForceBatch(t *testing.T) {
Expand All @@ -65,9 +68,12 @@ func TestForceBatch(t *testing.T) {
err := queue.Add([]byte("hello world"))
require.NoError(t, err, "Add")

batch, err := queue.Take(true)
require.NoError(t, err, "Take no error when no batch available and forced")
batch := queue.GetBatch(true)
require.EqualValues(t, 1, len(batch), "Batch size")
require.EqualValues(t, 1, queue.Size(), "Size")

err = queue.RemoveBatch(batch)
require.NoError(t, err, "RemoveBatch")
require.EqualValues(t, 0, queue.Size(), "Size")
}

Expand Down
15 changes: 1 addition & 14 deletions go/runtime/scheduling/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,10 @@ type scheduler struct {
}

func (s *scheduler) scheduleBatch(force bool) error {
batch, err := s.incomingQueue.Take(force)
if err != nil && err != errNoBatchAvailable {
s.logger.Error("failed to get batch from the queue",
"err", err,
)
return err
}

batch := s.incomingQueue.GetBatch(force)
if len(batch) > 0 {
// Try to dispatch batch.
if err := s.dispatcher.Dispatch(batch); err != nil {
// Put the batch back into the incoming queue in case this failed.
if errAB := s.incomingQueue.AddBatch(batch); errAB != nil {
s.logger.Warn("failed to add batch back into the incoming queue",
"err", errAB,
)
}
return err
}
}
Expand Down
31 changes: 30 additions & 1 deletion go/runtime/scheduling/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type testDispatcher struct {
ShouldFail bool
DispatchedBatches []transaction.RawBatch
scheduler api.Scheduler
}

func (t *testDispatcher) Clear() {
Expand All @@ -28,6 +29,7 @@ func (t *testDispatcher) Dispatch(batch transaction.RawBatch) error {
return errors.New("dispatch failed")
}
t.DispatchedBatches = append(t.DispatchedBatches, batch)
_ = t.scheduler.RemoveTxBatch(batch)
return nil
}

Expand All @@ -36,7 +38,7 @@ func SchedulerImplementationTests(
t *testing.T,
scheduler api.Scheduler,
) {
td := testDispatcher{ShouldFail: false}
td := testDispatcher{ShouldFail: false, scheduler: scheduler}

err := scheduler.Initialize(&td)
require.NoError(t, err, "Initialize(td)")
Expand Down Expand Up @@ -133,6 +135,33 @@ func testScheduleTransactions(t *testing.T, td *testDispatcher, scheduler api.Sc
require.NoError(t, err, "Flush(force=false)")
require.Equal(t, 0, scheduler.UnscheduledSize(), "transaction should get scheduled after a non-forced flush")

// Test update clear transactions.
// Update configuration back to BatchSize=10.
err = scheduler.UpdateParameters(
registry.TxnSchedulerParameters{
Algorithm: scheduler.Name(),
MaxBatchSize: 10,
MaxBatchSizeBytes: 10000,
},
)
require.NoError(t, err, "UpdateParameters")
// Insert a transaction.
err = scheduler.ScheduleTx(testTx)
require.NoError(t, err, "ScheduleTx(testTx)")
// Make sure transaction is scheduled.
require.Equal(t, 1, scheduler.UnscheduledSize(), "one transaction scheduled")
// Update configuration to MaxBatchSizeBytes=1.
err = scheduler.UpdateParameters(
registry.TxnSchedulerParameters{
Algorithm: scheduler.Name(),
MaxBatchSize: 10,
MaxBatchSizeBytes: 1,
},
)
require.NoError(t, err, "UpdateParameters")
// Make sure the transaction was removed.
require.Equal(t, 0, scheduler.UnscheduledSize(), "transaction should get scheduled after a non-forced flush")

// Test invalid udpate.
err = scheduler.UpdateParameters(
registry.TxnSchedulerParameters{
Expand Down
25 changes: 5 additions & 20 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,16 +447,6 @@ func (n *Node) HandleEpochTransitionLocked(epoch *committee.EpochSnapshot) {

switch {
case epoch.IsExecutorWorker():
if state, ok := n.state.(StateWaitingForFinalize); ok {
// In case we are finalizing a batch and epoch transition occurred,
// reinsert the failed transactions back into queue.
if err := n.scheduler.AppendTxBatch(state.raw); err != nil {
n.logger.Warn("failed reinserting aborted transactions in queue",
"err", err,
)
}
incomingQueueSize.With(n.getMetricLabels()).Set(float64(n.scheduler.UnscheduledSize()))
}
if !n.prevEpochWorker {
// Clear incoming queue and cache of any stale transactions in case
// we were not part of the compute committee in previous epoch.
Expand Down Expand Up @@ -541,6 +531,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) {
n.logger.Error("proposed batch was not finalized",
"header_io_root", header.IORoot,
"proposed_io_root", state.proposedIORoot,
"header_type", header.HeaderType,
"batch", state.raw,
)
return
Expand Down Expand Up @@ -752,6 +743,10 @@ func (n *Node) proposeTimeoutLocked() error {

// Dispatch dispatches a batch to the executor committee.
func (n *Node) Dispatch(batch transaction.RawBatch) error {
n.logger.Debug("dispatching a batch",
"batch", batch,
"size", len(batch),
)
lastHeader, err := func() (*block.Header, error) {
n.commonNode.CrossNode.Lock()
defer n.commonNode.CrossNode.Unlock()
Expand Down Expand Up @@ -1066,16 +1061,6 @@ func (n *Node) abortBatchLocked(reason error) {

crash.Here(crashPointBatchAbortAfter)

// In case the batch is resolved, return transactions back in the queue.
if state.batch != nil && len(state.batch.batch) > 0 {
if err := n.scheduler.AppendTxBatch(state.batch.batch); err != nil {
n.logger.Warn("failed reinserting aborted transactions in queue",
"err", err,
)
}
incomingQueueSize.With(n.getMetricLabels()).Set(float64(n.scheduler.UnscheduledSize()))
}

abortedBatchCount.With(n.getMetricLabels()).Inc()
// After the batch has been aborted, we must wait for the round to be
// finalized.
Expand Down

0 comments on commit 4e50be2

Please sign in to comment.