Skip to content

Commit

Permalink
Merge pull request #4131 from oasisprotocol/ptrus/feature/checktx-queue
Browse files Browse the repository at this point in the history
go/worker/executor: batch CheckTx transactions
  • Loading branch information
ptrus authored Jul 19, 2021
2 parents 38fd6c5 + af41f8d commit fd9abd9
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 116 deletions.
1 change: 1 addition & 0 deletions .changelog/2548.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/executor: batch runtime CheckTx transactions
15 changes: 7 additions & 8 deletions go/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
"github.com/oasisprotocol/oasis-core/go/runtime/tagindexer"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
Expand Down Expand Up @@ -198,15 +197,15 @@ func (c *runtimeClient) CheckTx(ctx context.Context, request *api.CheckTxRequest
return fmt.Errorf("client: failed to get current epoch: %w", err)
}

_, err = rt.CheckTx(ctx, rs.CurrentBlock, lb, epoch, request.Data)
switch {
case err == nil:
return nil
case errors.Is(err, host.ErrCheckTxFailed):
return errors.WithContext(api.ErrCheckTxFailed, errors.Context(err))
default:
resp, err := rt.CheckTx(ctx, rs.CurrentBlock, lb, epoch, transaction.RawBatch{request.Data})
if err != nil {
return fmt.Errorf("client: local transaction check failed: %w", err)
}
if !resp[0].IsSuccess() {
return errors.WithContext(api.ErrCheckTxFailed, resp[0].Error.String())
}

return nil
}

// Implements api.RuntimeClient.
Expand Down
8 changes: 4 additions & 4 deletions go/runtime/client/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func testQuery(
// Check that indexer has indexed txn keys (check the mock worker for key/values).
tx, err = c.QueryTx(ctx, &api.QueryTxRequest{RuntimeID: runtimeID, Key: []byte("txn_foo"), Value: []byte("txn_bar")})
require.NoError(t, err, "QueryTx")
require.EqualValues(t, 2, tx.Block.Header.Round)
require.EqualValues(t, 3, tx.Block.Header.Round)
require.EqualValues(t, 0, tx.Index)
// Check for values from TestNode/ExecutorWorker/QueueTx
require.True(t, strings.HasPrefix(string(tx.Input), "hello world"))
Expand All @@ -161,7 +161,7 @@ func testQuery(
require.EqualValues(t, testInput, txns[0])

// Check events query (see mock worker for emitted events).
events, err := c.GetEvents(ctx, &api.GetEventsRequest{RuntimeID: runtimeID, Round: 2})
events, err := c.GetEvents(ctx, &api.GetEventsRequest{RuntimeID: runtimeID, Round: 3})
require.NoError(t, err, "GetEvents")
require.Len(t, events, 1)
require.EqualValues(t, []byte("txn_foo"), events[0].Key)
Expand All @@ -170,7 +170,7 @@ func testQuery(
// Test advanced transaction queries.
query := api.Query{
RoundMin: 0,
RoundMax: 3,
RoundMax: 4,
Conditions: []api.QueryCondition{
{Key: []byte("txn_foo"), Values: [][]byte{[]byte("txn_bar")}},
},
Expand All @@ -182,7 +182,7 @@ func testQuery(
sort.Slice(results, func(i, j int) bool {
return bytes.Compare(results[i].Input, results[j].Input) < 0
})
require.EqualValues(t, 2, results[0].Block.Header.Round)
require.EqualValues(t, 3, results[0].Block.Header.Round)
require.EqualValues(t, 0, results[0].Index)
// Check for values from TestNode/ExecutorWorker/QueueTx
require.True(t, strings.HasPrefix(string(results[0].Input), "hello world"))
Expand Down
21 changes: 7 additions & 14 deletions go/runtime/host/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type RichRuntime interface {
rb *block.Block,
lb *consensus.LightBlock,
epoch beacon.EpochTime,
tx []byte,
) (*transaction.CheckedTransaction, error)
batch transaction.RawBatch,
) ([]protocol.CheckTxResult, error)

// Query requests the runtime to answer a runtime-specific query.
Query(
Expand Down Expand Up @@ -64,16 +64,16 @@ func (r *richRuntime) CheckTx(
rb *block.Block,
lb *consensus.LightBlock,
epoch beacon.EpochTime,
tx []byte,
) (*transaction.CheckedTransaction, error) {
batch transaction.RawBatch,
) ([]protocol.CheckTxResult, error) {
if rb == nil || lb == nil {
return nil, ErrInvalidArgument
}

resp, err := r.Call(ctx, &protocol.Body{
RuntimeCheckTxBatchRequest: &protocol.RuntimeCheckTxBatchRequest{
ConsensusBlock: *lb,
Inputs: transaction.RawBatch{tx},
Inputs: batch,
Block: *rb,
Epoch: epoch,
},
Expand All @@ -83,17 +83,10 @@ func (r *richRuntime) CheckTx(
return nil, errors.WithContext(ErrInternal, err.Error())
case resp.RuntimeCheckTxBatchResponse == nil:
return nil, errors.WithContext(ErrInternal, "malformed runtime response")
case len(resp.RuntimeCheckTxBatchResponse.Results) != 1:
case len(resp.RuntimeCheckTxBatchResponse.Results) != len(batch):
return nil, errors.WithContext(ErrInternal, "malformed runtime response: incorrect number of results")
}

// Interpret CheckTx result.
result := resp.RuntimeCheckTxBatchResponse.Results[0]
if !result.IsSuccess() {
return nil, errors.WithContext(ErrCheckTxFailed, result.Error.String())
}

return result.ToCheckedTransaction(tx), nil
return resp.RuntimeCheckTxBatchResponse.Results, nil
}

// Implements RichRuntime.
Expand Down
195 changes: 195 additions & 0 deletions go/runtime/scheduling/simple/orderedmap/ordered_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Package orderedmap implements a queue backed by an ordered map.
package orderedmap

import (
"container/list"
"fmt"
"sync"

"github.com/hashicorp/go-multierror"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling/simple/txpool/api"
)

type pair struct {
Key hash.Hash
Value []byte

element *list.Element
}

// OrderedMap is a queue backed by an ordered map.
type OrderedMap struct {
sync.Mutex

transactions map[hash.Hash]*pair
queue *list.List

maxTxPoolSize uint64
maxBatchSize uint64
}

// Add adds transaction into the queue.
func (q *OrderedMap) Add(tx []byte) error {
txHash := hash.NewFromBytes(tx)

q.Lock()
defer q.Unlock()

// Check if there is room in the queue.
if uint64(q.queue.Len()) >= q.maxTxPoolSize {
return api.ErrFull
}

if err := q.checkTxLocked(tx, txHash); err != nil {
return err
}

q.addTxLocked(tx, txHash)

return nil
}

// AddBatch adds a batch of transactions into the queue.
func (q *OrderedMap) AddBatch(batch [][]byte) error {
// Compute all hashes before taking the lock.
var txHashes []hash.Hash
for _, tx := range batch {
txHash := hash.NewFromBytes(tx)
txHashes = append(txHashes, txHash)
}

q.Lock()
defer q.Unlock()

var errs error
for i, tx := range batch {
if err := q.checkTxLocked(tx, txHashes[i]); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed inserting tx: %d, error: %w", i, err))
continue
}

// Check if there is room in the queue.
if uint64(q.queue.Len()) >= q.maxTxPoolSize {
errs = multierror.Append(errs, fmt.Errorf("failed inserting tx: %d, error: %w", i, api.ErrFull))
return errs
}

// Add the tx if checks passed.
q.addTxLocked(tx, txHashes[i])
}

if len(q.transactions) != q.queue.Len() {
panic(fmt.Errorf("inconsistent sizes of the underlying list (%v) and map (%v), after AddBatch", q.queue.Len(), len(q.transactions)))
}

return errs
}

// GetBatch gets a batch of transactions from the queue.
func (q *OrderedMap) GetBatch() [][]byte {
q.Lock()
defer q.Unlock()

var batch [][]byte
current := q.queue.Back()
for {
if current == nil {
break
}
// Check if the batch already has enough transactions.
if uint64(len(batch)) >= q.maxBatchSize {
break
}

el := current.Value.(*pair)

batch = append(batch, el.Value)
current = current.Prev()
}

return batch
}

// RemoveBatch removes a batch of transactions from the queue.
func (q *OrderedMap) RemoveBatch(batch [][]byte) {
q.Lock()
defer q.Unlock()

for _, tx := range batch {
txHash := hash.NewFromBytes(tx)
if pair, ok := q.transactions[txHash]; ok {
q.queue.Remove(pair.element)
delete(q.transactions, pair.Key)
}
}
if len(q.transactions) != q.queue.Len() {
panic(fmt.Errorf("inconsistent sizes of the underlying list (%v) and map (%v) after RemoveBatch", q.queue.Len(), len(q.transactions)))
}
}

// IsQueued checks if a transactions is already queued.
func (q *OrderedMap) IsQueued(txHash hash.Hash) bool {
q.Lock()
defer q.Unlock()

return q.isQueuedLocked(txHash)
}

// Size returns size of the queue.
func (q *OrderedMap) Size() uint64 {
q.Lock()
defer q.Unlock()

return uint64(q.queue.Len())
}

// Clear empties the queue.
func (q *OrderedMap) Clear() {
q.Lock()
defer q.Unlock()

q.queue = list.New()
q.transactions = make(map[hash.Hash]*pair)
}

// NOTE: Assumes lock is held.
func (q *OrderedMap) isQueuedLocked(txHash hash.Hash) bool {
_, ok := q.transactions[txHash]
return ok
}

// NOTE: Assumes lock is held.
func (q *OrderedMap) checkTxLocked(tx []byte, txHash hash.Hash) error {
if q.isQueuedLocked(txHash) {
return api.ErrCallAlreadyExists
}

return nil
}

// NOTE: Assumes lock is held and that checkTxLocked has been called.
func (q *OrderedMap) addTxLocked(tx []byte, txHash hash.Hash) {
// Assuming checkTxLocked has been called before, this can happen if
// duplicate transactions are in the same batch -- just ignore them.
if _, exists := q.transactions[txHash]; exists {
return
}
p := &pair{
Key: txHash,
Value: tx,
}
p.element = q.queue.PushFront(p)
q.transactions[txHash] = p
}

// New returns a new incoming queue.
func New(maxPoolSize, maxBatchSize uint64) *OrderedMap {
return &OrderedMap{
transactions: make(map[hash.Hash]*pair),
queue: list.New(),
maxTxPoolSize: maxPoolSize,
maxBatchSize: maxBatchSize,
}
}
Loading

0 comments on commit fd9abd9

Please sign in to comment.