From cbaa6ef140f41ab4722a0098fdb097183d84bcd1 Mon Sep 17 00:00:00 2001 From: ptrus Date: Thu, 8 Jul 2021 16:29:09 +0200 Subject: [PATCH] go/worker/executor: batch CheckTx transactions --- .changelog/2548.feature.md | 1 + go/runtime/client/client.go | 15 +- go/runtime/client/tests/tester.go | 8 +- go/runtime/host/helpers.go | 21 +- .../simple/orderedmap/ordered_map.go | 195 ++++++++++++++++++ .../simple/orderedmap/ordered_map_test.go | 96 +++++++++ go/worker/compute/executor/committee/batch.go | 1 + go/worker/compute/executor/committee/node.go | 152 +++++++------- go/worker/compute/executor/init.go | 7 +- go/worker/compute/executor/tests/tester.go | 63 ++++-- go/worker/compute/executor/worker.go | 4 + 11 files changed, 447 insertions(+), 116 deletions(-) create mode 100644 .changelog/2548.feature.md create mode 100644 go/runtime/scheduling/simple/orderedmap/ordered_map.go create mode 100644 go/runtime/scheduling/simple/orderedmap/ordered_map_test.go diff --git a/.changelog/2548.feature.md b/.changelog/2548.feature.md new file mode 100644 index 00000000000..443a0d04a59 --- /dev/null +++ b/.changelog/2548.feature.md @@ -0,0 +1 @@ +go/worker/executor: batch runtime CheckTx transactions diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index ce990f05de3..2d288c62f6f 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -22,7 +22,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api" - "github.com/oasisprotocol/oasis-core/go/runtime/host" runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" "github.com/oasisprotocol/oasis-core/go/runtime/tagindexer" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" @@ -198,15 +197,15 @@ func (c *runtimeClient) CheckTx(ctx context.Context, request *api.CheckTxRequest return fmt.Errorf("client: failed to get current epoch: %w", err) } - _, err = rt.CheckTx(ctx, rs.CurrentBlock, lb, epoch, request.Data) - switch { - case err == nil: - return nil - case errors.Is(err, host.ErrCheckTxFailed): - return errors.WithContext(api.ErrCheckTxFailed, errors.Context(err)) - default: + resp, err := rt.CheckTx(ctx, rs.CurrentBlock, lb, epoch, transaction.RawBatch{request.Data}) + if err != nil { return fmt.Errorf("client: local transaction check failed: %w", err) } + if !resp[0].IsSuccess() { + return errors.WithContext(api.ErrCheckTxFailed, resp[0].Error.String()) + } + + return nil } // Implements api.RuntimeClient. diff --git a/go/runtime/client/tests/tester.go b/go/runtime/client/tests/tester.go index e930b2b3c81..a00c54c9ea0 100644 --- a/go/runtime/client/tests/tester.go +++ b/go/runtime/client/tests/tester.go @@ -141,7 +141,7 @@ func testQuery( // Check that indexer has indexed txn keys (check the mock worker for key/values). tx, err = c.QueryTx(ctx, &api.QueryTxRequest{RuntimeID: runtimeID, Key: []byte("txn_foo"), Value: []byte("txn_bar")}) require.NoError(t, err, "QueryTx") - require.EqualValues(t, 2, tx.Block.Header.Round) + require.EqualValues(t, 3, tx.Block.Header.Round) require.EqualValues(t, 0, tx.Index) // Check for values from TestNode/ExecutorWorker/QueueTx require.True(t, strings.HasPrefix(string(tx.Input), "hello world")) @@ -161,7 +161,7 @@ func testQuery( require.EqualValues(t, testInput, txns[0]) // Check events query (see mock worker for emitted events). - events, err := c.GetEvents(ctx, &api.GetEventsRequest{RuntimeID: runtimeID, Round: 2}) + events, err := c.GetEvents(ctx, &api.GetEventsRequest{RuntimeID: runtimeID, Round: 3}) require.NoError(t, err, "GetEvents") require.Len(t, events, 1) require.EqualValues(t, []byte("txn_foo"), events[0].Key) @@ -170,7 +170,7 @@ func testQuery( // Test advanced transaction queries. query := api.Query{ RoundMin: 0, - RoundMax: 3, + RoundMax: 4, Conditions: []api.QueryCondition{ {Key: []byte("txn_foo"), Values: [][]byte{[]byte("txn_bar")}}, }, @@ -182,7 +182,7 @@ func testQuery( sort.Slice(results, func(i, j int) bool { return bytes.Compare(results[i].Input, results[j].Input) < 0 }) - require.EqualValues(t, 2, results[0].Block.Header.Round) + require.EqualValues(t, 3, results[0].Block.Header.Round) require.EqualValues(t, 0, results[0].Index) // Check for values from TestNode/ExecutorWorker/QueueTx require.True(t, strings.HasPrefix(string(results[0].Input), "hello world")) diff --git a/go/runtime/host/helpers.go b/go/runtime/host/helpers.go index a70932f4b2c..baef16c78a3 100644 --- a/go/runtime/host/helpers.go +++ b/go/runtime/host/helpers.go @@ -32,8 +32,8 @@ type RichRuntime interface { rb *block.Block, lb *consensus.LightBlock, epoch beacon.EpochTime, - tx []byte, - ) (*transaction.CheckedTransaction, error) + batch transaction.RawBatch, + ) ([]protocol.CheckTxResult, error) // Query requests the runtime to answer a runtime-specific query. Query( @@ -64,8 +64,8 @@ func (r *richRuntime) CheckTx( rb *block.Block, lb *consensus.LightBlock, epoch beacon.EpochTime, - tx []byte, -) (*transaction.CheckedTransaction, error) { + batch transaction.RawBatch, +) ([]protocol.CheckTxResult, error) { if rb == nil || lb == nil { return nil, ErrInvalidArgument } @@ -73,7 +73,7 @@ func (r *richRuntime) CheckTx( resp, err := r.Call(ctx, &protocol.Body{ RuntimeCheckTxBatchRequest: &protocol.RuntimeCheckTxBatchRequest{ ConsensusBlock: *lb, - Inputs: transaction.RawBatch{tx}, + Inputs: batch, Block: *rb, Epoch: epoch, }, @@ -83,17 +83,10 @@ func (r *richRuntime) CheckTx( return nil, errors.WithContext(ErrInternal, err.Error()) case resp.RuntimeCheckTxBatchResponse == nil: return nil, errors.WithContext(ErrInternal, "malformed runtime response") - case len(resp.RuntimeCheckTxBatchResponse.Results) != 1: + case len(resp.RuntimeCheckTxBatchResponse.Results) != len(batch): return nil, errors.WithContext(ErrInternal, "malformed runtime response: incorrect number of results") } - - // Interpret CheckTx result. - result := resp.RuntimeCheckTxBatchResponse.Results[0] - if !result.IsSuccess() { - return nil, errors.WithContext(ErrCheckTxFailed, result.Error.String()) - } - - return result.ToCheckedTransaction(tx), nil + return resp.RuntimeCheckTxBatchResponse.Results, nil } // Implements RichRuntime. diff --git a/go/runtime/scheduling/simple/orderedmap/ordered_map.go b/go/runtime/scheduling/simple/orderedmap/ordered_map.go new file mode 100644 index 00000000000..bf84d02d88e --- /dev/null +++ b/go/runtime/scheduling/simple/orderedmap/ordered_map.go @@ -0,0 +1,195 @@ +// Package orderedmap implements a queue backed by an ordered map. +package orderedmap + +import ( + "container/list" + "fmt" + "sync" + + "github.com/hashicorp/go-multierror" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/scheduling/simple/txpool/api" +) + +type pair struct { + Key hash.Hash + Value []byte + + element *list.Element +} + +// OrderedMap is a queue backed by an ordered map. +type OrderedMap struct { + sync.Mutex + + transactions map[hash.Hash]*pair + queue *list.List + + maxTxPoolSize uint64 + maxBatchSize uint64 +} + +// Add adds transaction into the queue. +func (q *OrderedMap) Add(tx []byte) error { + txHash := hash.NewFromBytes(tx) + + q.Lock() + defer q.Unlock() + + // Check if there is room in the queue. + if uint64(q.queue.Len()) >= q.maxTxPoolSize { + return api.ErrFull + } + + if err := q.checkTxLocked(tx, txHash); err != nil { + return err + } + + q.addTxLocked(tx, txHash) + + return nil +} + +// AddBatch adds a batch of transactions into the queue. +func (q *OrderedMap) AddBatch(batch [][]byte) error { + // Compute all hashes before taking the lock. + var txHashes []hash.Hash + for _, tx := range batch { + txHash := hash.NewFromBytes(tx) + txHashes = append(txHashes, txHash) + } + + q.Lock() + defer q.Unlock() + + var errs error + for i, tx := range batch { + if err := q.checkTxLocked(tx, txHashes[i]); err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed inserting tx: %d, error: %w", i, err)) + continue + } + + // Check if there is room in the queue. + if uint64(q.queue.Len()) >= q.maxTxPoolSize { + errs = multierror.Append(errs, fmt.Errorf("failed inserting tx: %d, error: %w", i, api.ErrFull)) + return errs + } + + // Add the tx if checks passed. + q.addTxLocked(tx, txHashes[i]) + } + + if len(q.transactions) != q.queue.Len() { + panic(fmt.Errorf("inconsistent sizes of the underlying list (%v) and map (%v), after AddBatch", q.queue.Len(), len(q.transactions))) + } + + return errs +} + +// GetBatch gets a batch of transactions from the queue. +func (q *OrderedMap) GetBatch() [][]byte { + q.Lock() + defer q.Unlock() + + var batch [][]byte + current := q.queue.Back() + for { + if current == nil { + break + } + // Check if the batch already has enough transactions. + if uint64(len(batch)) >= q.maxBatchSize { + break + } + + el := current.Value.(*pair) + + batch = append(batch, el.Value) + current = current.Prev() + } + + return batch +} + +// RemoveBatch removes a batch of transactions from the queue. +func (q *OrderedMap) RemoveBatch(batch [][]byte) { + q.Lock() + defer q.Unlock() + + for _, tx := range batch { + txHash := hash.NewFromBytes(tx) + if pair, ok := q.transactions[txHash]; ok { + q.queue.Remove(pair.element) + delete(q.transactions, pair.Key) + } + } + if len(q.transactions) != q.queue.Len() { + panic(fmt.Errorf("inconsistent sizes of the underlying list (%v) and map (%v) after RemoveBatch", q.queue.Len(), len(q.transactions))) + } +} + +// IsQueued checks if a transactions is already queued. +func (q *OrderedMap) IsQueued(txHash hash.Hash) bool { + q.Lock() + defer q.Unlock() + + return q.isQueuedLocked(txHash) +} + +// Size returns size of the queue. +func (q *OrderedMap) Size() uint64 { + q.Lock() + defer q.Unlock() + + return uint64(q.queue.Len()) +} + +// Clear empties the queue. +func (q *OrderedMap) Clear() { + q.Lock() + defer q.Unlock() + + q.queue = list.New() + q.transactions = make(map[hash.Hash]*pair) +} + +// NOTE: Assumes lock is held. +func (q *OrderedMap) isQueuedLocked(txHash hash.Hash) bool { + _, ok := q.transactions[txHash] + return ok +} + +// NOTE: Assumes lock is held. +func (q *OrderedMap) checkTxLocked(tx []byte, txHash hash.Hash) error { + if q.isQueuedLocked(txHash) { + return api.ErrCallAlreadyExists + } + + return nil +} + +// NOTE: Assumes lock is held and that checkTxLocked has been called. +func (q *OrderedMap) addTxLocked(tx []byte, txHash hash.Hash) { + // Assuming checkTxLocked has been called before, this can happen if + // duplicate transactions are in the same batch -- just ignore them. + if _, exists := q.transactions[txHash]; exists { + return + } + p := &pair{ + Key: txHash, + Value: tx, + } + p.element = q.queue.PushFront(p) + q.transactions[txHash] = p +} + +// New returns a new incoming queue. +func New(maxPoolSize, maxBatchSize uint64) *OrderedMap { + return &OrderedMap{ + transactions: make(map[hash.Hash]*pair), + queue: list.New(), + maxTxPoolSize: maxPoolSize, + maxBatchSize: maxBatchSize, + } +} diff --git a/go/runtime/scheduling/simple/orderedmap/ordered_map_test.go b/go/runtime/scheduling/simple/orderedmap/ordered_map_test.go new file mode 100644 index 00000000000..88d151c8c71 --- /dev/null +++ b/go/runtime/scheduling/simple/orderedmap/ordered_map_test.go @@ -0,0 +1,96 @@ +package orderedmap + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOrderedMapBasic(t *testing.T) { + queue := New(51, 10) + + err := queue.Add([]byte("hello world")) + require.NoError(t, err, "Add") + + err = queue.Add([]byte("hello world")) + require.Error(t, err, "Add error on duplicates") + + // Add some more calls. + for i := 0; i < 50; i++ { + err = queue.Add([]byte(fmt.Sprintf("call %d", i))) + require.NoError(t, err, "Add") + } + + err = queue.Add([]byte("another call")) + require.Error(t, err, "Add error on queue full") + + require.EqualValues(t, 51, queue.Size(), "Size") + + batch := queue.GetBatch() + require.EqualValues(t, 10, len(batch), "Batch size") + require.EqualValues(t, 51, queue.Size(), "Size") + + queue.RemoveBatch(batch) + require.EqualValues(t, 41, queue.Size(), "Size") + + require.EqualValues(t, batch[0], []byte("hello world")) + for i := 0; i < 9; i++ { + require.EqualValues(t, batch[i+1], []byte(fmt.Sprintf("call %d", i))) + } + // Not a duplicate anymore. + err = queue.Add([]byte("hello world")) + require.NoError(t, err, "Add") + require.EqualValues(t, 42, queue.Size(), "Size") + + queue.Clear() + require.EqualValues(t, 0, queue.Size(), "Size") +} + +func TestOrderedMapGetBatch(t *testing.T) { + queue := New(51, 10) + + batch := queue.GetBatch() + require.EqualValues(t, 0, len(batch), "Batch size") + require.EqualValues(t, 0, queue.Size(), "Size") + + err := queue.Add([]byte("hello world")) + require.NoError(t, err, "Add") + + batch = queue.GetBatch() + require.EqualValues(t, 1, len(batch), "Batch size") + require.EqualValues(t, 1, queue.Size(), "Size") + + queue.RemoveBatch(batch) + require.EqualValues(t, 0, queue.Size(), "Size") +} + +func TestOrderedMapRemoveBatch(t *testing.T) { + queue := New(51, 10) + + queue.RemoveBatch([][]byte{}) + + for _, tx := range [][]byte{ + []byte("hello world"), + []byte("one"), + []byte("two"), + []byte("three"), + } { + require.NoError(t, queue.Add(tx), "Add") + } + require.EqualValues(t, 4, queue.Size(), "Size") + + queue.RemoveBatch([][]byte{}) + require.EqualValues(t, 4, queue.Size(), "Size") + + queue.RemoveBatch([][]byte{ + []byte("hello world"), + []byte("two"), + }) + require.EqualValues(t, 2, queue.Size(), "Size") + + queue.RemoveBatch([][]byte{ + []byte("hello world"), + }) + require.EqualValues(t, 2, queue.Size(), "Size") +} diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go index 6da8324f0d5..37117af94bd 100644 --- a/go/worker/compute/executor/committee/batch.go +++ b/go/worker/compute/executor/committee/batch.go @@ -44,5 +44,6 @@ func (ub *unresolvedBatch) resolve(ctx context.Context, sb storage.Backend) (tra if err != nil { return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err) } + ub.batch = batch return batch, nil } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 380bb0f75b2..9c279c24814 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -31,6 +31,7 @@ import ( runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" "github.com/oasisprotocol/oasis-core/go/runtime/scheduling" schedulingAPI "github.com/oasisprotocol/oasis-core/go/runtime/scheduling/api" + "github.com/oasisprotocol/oasis-core/go/runtime/scheduling/simple/orderedmap" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" @@ -52,9 +53,7 @@ var ( // Transaction scheduling errors. errNoBlocks = fmt.Errorf("executor: no blocks") - errNotReady = fmt.Errorf("executor: runtime not ready") errNotTxnScheduler = fmt.Errorf("executor: not transaction scheduler in this round") - errDuplicateTx = p2pError.Permanent(p2pError.Relayable(fmt.Errorf("executor: duplicate transaction"))) // proposeTimeoutDelay is the duration to wait before submitting the propose timeout request. proposeTimeoutDelay = 2 * time.Second @@ -141,7 +140,9 @@ type Node struct { // nolint: maligned lastScheduledCache *lru.Cache scheduleMaxTxPoolSize uint64 - scheduleCh *channels.RingChannel + + checkTxCh *channels.RingChannel + checkTxQueue *orderedmap.OrderedMap // The scheduler mutex is here to protect the initialization // of the scheduler variable and updates to scheduler parameters. @@ -258,23 +259,25 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message, isOw return true, nil } - // Check transaction before queuing it. - tx, err := n.checkTx(ctx, rawTx) - if err != nil { - return true, err + // Skip recently scheduled transactions. + if n.lastScheduledCache != nil { + if _, b := n.lastScheduledCache.Get(hash.NewFromBytes(rawTx)); b { + n.logger.Debug("not scheduling duplicate transaction", "tx", rawTx) + return true, nil + } } - - n.logger.Debug("queuing transaction", - "transaction", tx, + // Queue transaction for checks. + n.logger.Debug("queuing transaction for check", + "tx", rawTx, ) - err = n.QueueTx(tx) - if err != nil { + if err := n.checkTxQueue.Add(rawTx); err != nil { n.logger.Error("unable to queue transaction", - "tx", tx, + "tx", rawTx, "err", err, ) return true, err } + n.checkTxCh.In() <- struct{}{} return true, nil case message.ProposedBatch != nil: @@ -542,7 +545,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { "header_io_root", header.IORoot, "proposed_io_root", state.proposedIORoot, "header_type", header.HeaderType, - "batch", state.raw, + "batch_size", len(state.raw), ) return } @@ -551,14 +554,14 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { batchProcessingTime.With(n.getMetricLabels()).Observe(time.Since(state.batchStartTime).Seconds()) n.logger.Debug("removing processed batch from queue", - "batch", state.raw, + "batch_size", len(state.raw), "io_root", header.IORoot, ) // Removed processed transactions from queue. if err := n.removeTxBatch(state.raw); err != nil { n.logger.Warn("failed removing processed batch from queue", "err", err, - "batch", state.raw, + "batch_size", len(state.raw), ) } }() @@ -581,74 +584,72 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { "round", blk.Header.Round, ) - n.scheduleCh.In() <- struct{}{} + n.checkTxCh.In() <- struct{}{} } } -// checkTx requests the runtime to check the validity of the given transaction. -func (n *Node) checkTx(ctx context.Context, rawTx []byte) (*transaction.CheckedTransaction, error) { +// checkTxBatch requests the runtime to check the validity of a transaction batch. +// Transactions that pass the check are queued for scheduling. +func (n *Node) checkTxBatch() { + batch := n.checkTxQueue.GetBatch() + if len(batch) == 0 { + return + } + rt := n.GetHostedRuntime() + if rt == nil { + n.logger.Debug("CheckTx: hosted runtime not initialized") + return + } + n.commonNode.CrossNode.Lock() currentBlock := n.commonNode.CurrentBlock currentConsensusBlock := n.commonNode.CurrentConsensusBlock currentEpoch := n.commonNode.Group.GetEpochSnapshot().GetEpochNumber() n.commonNode.CrossNode.Unlock() - rt := n.GetHostedRuntime() - if rt == nil { - n.logger.Error("CheckTx: hosted runtime not initialized") - return nil, errNotReady - } - - tx, err := rt.CheckTx(ctx, currentBlock, currentConsensusBlock, currentEpoch, rawTx) - switch { - case err == nil: - case errors.Is(err, host.ErrInvalidArgument): - return nil, errNotReady - case errors.Is(err, host.ErrInternal): - return nil, err - case errors.Is(err, host.ErrCheckTxFailed): - return nil, p2pError.Permanent(err) - default: - return nil, err + // Check transaction batch. + results, err := rt.CheckTx(n.ctx, currentBlock, currentConsensusBlock, currentEpoch, batch) + if err != nil { + n.logger.Error("transaction batch check tx error", "err", err) + return } - return tx, nil -} -// QueueTx queues a runtime transaction for scheduling. -func (n *Node) QueueTx(tx *transaction.CheckedTransaction) error { - n.schedulerMutex.RLock() - defer n.schedulerMutex.RUnlock() + txs := make([]*transaction.CheckedTransaction, 0, len(results)) + for i, res := range results { + if !res.IsSuccess() { + n.logger.Warn("check tx failed", "tx", batch[i], "result", res) + continue + } - if n.scheduler == nil { - return errNotReady + txs = append(txs, res.ToCheckedTransaction(batch[i])) } - // Check if transaction was recently scheduled. - if n.lastScheduledCache != nil { - if _, b := n.lastScheduledCache.Get(tx.Hash()); b { - return errDuplicateTx - } - } + // Remove the checked transaction batch. + n.checkTxQueue.RemoveBatch(batch) - if err := n.scheduler.QueueTx(tx); err != nil { - return err - } + // Queue checked transactions for scheduling. + n.queueTxBatch(txs) +} - if n.lastScheduledCache != nil { - if err := n.lastScheduledCache.Put(tx.Hash(), true); err != nil { - // cache.Put can only error if capacity in bytes is used and the - // inserted value is too large. This should never happen in here. - n.logger.Error("cache put error", - "err", err, - ) +// queueTxBatch queues a runtime transaction batch for scheduling. +func (n *Node) queueTxBatch(txs []*transaction.CheckedTransaction) { + for _, tx := range txs { + if err := n.scheduler.QueueTx(tx); err != nil { + n.logger.Error("unable to schedule transaction", "tx", tx) + continue + } + if n.lastScheduledCache != nil { + if err := n.lastScheduledCache.Put(tx.Hash(), true); err != nil { + // cache.Put can only error if capacity in bytes is used and the + // inserted value is too large. This should never happen in here. + n.logger.Error("cache put error", + "err", err, + ) + } } } - incomingQueueSize.With(n.getMetricLabels()).Set(float64(n.scheduler.UnscheduledSize())) - - // Notify event loop to attempt to schedule a batch. - n.scheduleCh.In() <- struct{}{} - return nil + incomingQueueSize.With(n.getMetricLabels()).Set(float64(n.scheduler.UnscheduledSize())) } // removeTxBatch removes a batch from scheduling queue. @@ -905,7 +906,7 @@ func (n *Node) handleScheduleBatch(force bool) { if !epoch.IsTransactionScheduler(blk.Header.Round) { n.logger.Debug("proposing a timeout", "round", blk.Header.Round, - "batch", batch, + "batch_size", len(batch), "round_results", roundResults, ) @@ -935,7 +936,7 @@ func (n *Node) handleScheduleBatch(force bool) { } n.logger.Debug("scheduling a batch", - "batch_size", batch, + "batch_size", len(batch), "round_results", roundResults, ) @@ -1024,7 +1025,7 @@ func (n *Node) handleScheduleBatch(force bool) { n.logger.Debug("dispatching a new batch proposal", "io_root", ioRoot, - "batch", batch, + "batch_size", len(batch), ) err = n.commonNode.Group.Publish( @@ -1198,7 +1199,7 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { } n.logger.Debug("processing batch", - "batch", batch, + "batch_size", len(batch.batch), ) // Create batch processing context and channel for receiving the response. @@ -1243,7 +1244,7 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { if err != nil { n.logger.Error("failed to resolve batch", "err", err, - "batch", batch, + "batch_size", len(batch.batch), ) return } @@ -1371,7 +1372,7 @@ func (n *Node) proposeBatch( epoch := n.commonNode.Group.GetEpochSnapshot() n.logger.Debug("proposing batch", - "batch", batch, + "batch_size", len(processed.raw), ) // Generate proposed compute results. @@ -1848,8 +1849,9 @@ func (n *Node) worker() { case <-txnScheduleTicker.C: // Force scheduling a batch if possible. n.handleScheduleBatch(true) - case <-n.scheduleCh.Out(): - // Regular scheduling of a batch. + case <-n.checkTxCh.Out(): + // Check any queued transactions and attempt regular scheduling. + n.checkTxBatch() n.handleScheduleBatch(false) case <-n.reselect: // Recalculate select set. @@ -1864,6 +1866,7 @@ func NewNode( roleProvider registration.RoleProvider, scheduleMaxTxPoolSize uint64, lastScheduledCacheSize uint64, + checkTxMaxBatchSize uint64, ) (*Node, error) { metricsOnce.Do(func() { prometheus.MustRegister(nodeCollectors...) @@ -1892,8 +1895,9 @@ func NewNode( roleProvider: roleProvider, scheduleMaxTxPoolSize: scheduleMaxTxPoolSize, lastScheduledCache: cache, + checkTxQueue: orderedmap.New(scheduleMaxTxPoolSize, checkTxMaxBatchSize), roundWeightLimits: make(map[transaction.Weight]uint64), - scheduleCh: channels.NewRingChannel(1), + checkTxCh: channels.NewRingChannel(1), ctx: ctx, cancelCtx: cancel, stopCh: make(chan struct{}), diff --git a/go/worker/compute/executor/init.go b/go/worker/compute/executor/init.go index e4227fba4c3..60991183f37 100644 --- a/go/worker/compute/executor/init.go +++ b/go/worker/compute/executor/init.go @@ -12,6 +12,7 @@ import ( const ( cfgMaxTxPoolSize = "worker.executor.schedule_max_tx_pool_size" cfgScheduleTxCacheSize = "worker.executor.schedule_tx_cache_size" + cfgCheckTxMaxBatchSize = "worker.executor.check_tx_max_batch_size" ) // Flags has the configuration flags. @@ -30,12 +31,14 @@ func New( registration, viper.GetUint64(cfgMaxTxPoolSize), viper.GetUint64(cfgScheduleTxCacheSize), + viper.GetUint64(cfgCheckTxMaxBatchSize), ) } func init() { - Flags.Uint64(cfgMaxTxPoolSize, 10000, "Maximum size of the scheduling transaction pool") - Flags.Uint64(cfgScheduleTxCacheSize, 1000, "Cache size of recently scheduled transactions to prevent re-scheduling") + Flags.Uint64(cfgMaxTxPoolSize, 10_000, "Maximum size of the scheduling transaction pool") + Flags.Uint64(cfgScheduleTxCacheSize, 10_000, "Cache size of recently scheduled transactions to prevent re-scheduling") + Flags.Uint64(cfgCheckTxMaxBatchSize, 10_000, "Maximum check tx batch size") _ = viper.BindPFlags(Flags) } diff --git a/go/worker/compute/executor/tests/tester.go b/go/worker/compute/executor/tests/tester.go index 566e35f5927..503a40faf62 100644 --- a/go/worker/compute/executor/tests/tester.go +++ b/go/worker/compute/executor/tests/tester.go @@ -16,7 +16,9 @@ import ( "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" "github.com/oasisprotocol/oasis-core/go/worker/compute/executor" + executorAPI "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/committee" ) @@ -86,8 +88,10 @@ func testQueueTx( roothash roothash.Backend, st storage.Backend, ) { + ctx := context.Background() + // Subscribe to roothash blocks. - blocksCh, sub, err := roothash.WatchBlocks(context.Background(), runtimeID) + blocksCh, sub, err := roothash.WatchBlocks(ctx, runtimeID) require.NoError(t, err, "WatchBlocks") defer sub.Close() @@ -97,11 +101,20 @@ func testQueueTx( t.Fatalf("failed to receive block") } - // Queue a test call. - // Include a timestamp so each test invocation uses an unique key. - testCall := transaction.RawCheckedTransaction([]byte("hello world at: " + time.Now().String())) - err = rtNode.QueueTx(testCall) - require.NoError(t, err, "QueueCall") + // Include a timestamp so each test invocation uses a unique transaction. + testTx := []byte("hello world at: " + time.Now().String()) + // Submit a test transaction. + handled, err := rtNode.HandlePeerMessage( + ctx, + &p2p.Message{ + Tx: &executorAPI.Tx{ + Data: testTx, + }, + }, + false, + ) + require.NoError(t, err, "tx message should be handled") + require.True(t, handled, "tx message should be handled") // Node should transition to ProcessingBatch state. waitForNodeTransition(t, stateCh, committee.ProcessingBatch) @@ -118,11 +131,14 @@ blockLoop: select { case annBlk := <-blocksCh: blk := annBlk.Block - - // Check that correct block was generated. require.EqualValues(t, block.Normal, blk.Header.HeaderType) - ctx := context.Background() + if blk.Header.Round <= 2 { + // Round <= 2 - genesis round. + continue + } + + // Check that correct block was generated. tree := transaction.NewTree(st, storage.Root{ Namespace: blk.Header.Namespace, Version: blk.Header.Round, @@ -135,9 +151,9 @@ blockLoop: txs, err = tree.GetTransactions(ctx) require.NoError(t, err, "GetTransactions") require.Len(t, txs, 1, "there should be one transaction") - require.EqualValues(t, testCall.Raw(), txs[0].Input) + require.EqualValues(t, testTx, txs[0].Input) // NOTE: Mock host produces output equal to input. - require.EqualValues(t, testCall.Raw(), txs[0].Output) + require.EqualValues(t, testTx, txs[0].Output) // NOTE: Mock host produces an empty state root. var stateRoot hash.Hash @@ -149,7 +165,26 @@ blockLoop: } } - // Requeuing same call should fail. - err = rtNode.QueueTx(testCall) - require.Error(t, err, "QueueCall duplicate transaction") + // Submitting the same transaction should not result in a new block. + handled, err = rtNode.HandlePeerMessage( + ctx, + &p2p.Message{ + Tx: &executorAPI.Tx{ + Data: testTx, + }, + }, + false, + ) + require.NoError(t, err, "tx message should be handled") + require.True(t, handled, "tx message should be handled") + +blockLoop2: + for { + select { + case <-blocksCh: + t.Fatal("unexpected block as a result of a duplicate transaction") + case <-time.After(recvTimeout): + break blockLoop2 + } + } } diff --git a/go/worker/compute/executor/worker.go b/go/worker/compute/executor/worker.go index 36bb6e5f35f..348cbc4956d 100644 --- a/go/worker/compute/executor/worker.go +++ b/go/worker/compute/executor/worker.go @@ -19,6 +19,7 @@ type Worker struct { scheduleMaxTxPoolSize uint64 scheduleTxCacheSize uint64 + checkTxMaxBatchSize uint64 commonWorker *workerCommon.Worker registration *registration.Worker @@ -154,6 +155,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { rp, w.scheduleMaxTxPoolSize, w.scheduleTxCacheSize, + w.checkTxMaxBatchSize, ) if err != nil { return err @@ -176,6 +178,7 @@ func newWorker( registration *registration.Worker, scheduleMaxTxPoolSize uint64, scheduleTxCacheSize uint64, + checkTxMaxBatchSize uint64, ) (*Worker, error) { ctx, cancelCtx := context.WithCancel(context.Background()) @@ -184,6 +187,7 @@ func newWorker( commonWorker: commonWorker, scheduleMaxTxPoolSize: scheduleMaxTxPoolSize, scheduleTxCacheSize: scheduleTxCacheSize, + checkTxMaxBatchSize: checkTxMaxBatchSize, registration: registration, runtimes: make(map[common.Namespace]*committee.Node), ctx: ctx,