Skip to content

Commit

Permalink
go/worker/compute: Enforce maximum batch sizes from txn scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jul 29, 2020
1 parent 0ceb8e8 commit e1ba093
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
15 changes: 13 additions & 2 deletions go/runtime/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ func (bo inBatchOrder) Swap(i, j int) {
func (bo inBatchOrder) Less(i, j int) bool { return bo.order[i] < bo.order[j] }

// GetInputBatch returns a batch of transaction input artifacts in batch order.
func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {
func (t *Tree) GetInputBatch(ctx context.Context, maxBatchSize, maxBatchSizeBytes uint64) (RawBatch, error) {
it := t.tree.NewIterator(ctx, mkvs.IteratorPrefetch(prefetchArtifactCount))
defer it.Close()

var curTx hash.Hash
curTx.Empty()

var bo inBatchOrder
var (
bo inBatchOrder
batchSizeBytes uint64
)
for it.Seek(txnKeyFmt.Encode()); it.Valid(); it.Next() {
var decHash hash.Hash
var decKind artifactKind
Expand All @@ -240,6 +243,14 @@ func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {

bo.batch = append(bo.batch, ia.Input)
bo.order = append(bo.order, ia.BatchOrder)
batchSizeBytes += uint64(len(ia.Input))

if maxBatchSize > 0 && uint64(len(bo.batch)) > maxBatchSize {
return nil, fmt.Errorf("transaction: input batch too large (max: %d txes)", maxBatchSize)
}
if maxBatchSizeBytes > 0 && batchSizeBytes > maxBatchSizeBytes {
return nil, fmt.Errorf("transaction: input batch too large (max: %d bytes)", maxBatchSizeBytes)
}
}
if it.Err() != nil {
return nil, fmt.Errorf("transaction: get input batch failed: %w", it.Err())
Expand Down
8 changes: 7 additions & 1 deletion go/runtime/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,20 @@ func TestTransaction(t *testing.T) {
}

// Get input batch.
batch, err := tree.GetInputBatch(ctx)
batch, err := tree.GetInputBatch(ctx, 0, 0)
require.NoError(t, err, "GetInputBatch")
require.Len(t, batch, len(testTxns)+1, "batch should have the same transactions")
require.EqualValues(t, tx.Input, batch[0], "input batch transactions must be in correct order")
for idx, checkTx := range testTxns {
require.EqualValues(t, checkTx.Input, batch[idx+1], "input batch transactions must be in correct order")
}

// Get input batch with size limits.
_, err = tree.GetInputBatch(ctx, 5, 0)
require.Error(t, err, "GetInputBatch should fail with too many transactions")
_, err = tree.GetInputBatch(ctx, 0, 64)
require.Error(t, err, "GetInputBatch should fail with too large transactions")

// Commit.
// NOTE: This root is synced with tests in runtime/src/transaction/tree.rs.
writeLog, rootHash, err := tree.Commit(ctx)
Expand Down
5 changes: 4 additions & 1 deletion go/worker/compute/executor/committee/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type unresolvedBatch struct {

batch transaction.RawBatch
spanCtx opentracing.SpanContext

maxBatchSize uint64
maxBatchSizeBytes uint64
}

func (ub *unresolvedBatch) String() string {
Expand All @@ -37,7 +40,7 @@ func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend)
txs := transaction.NewTree(storage, ub.ioRoot)
defer txs.Close()

batch, err := txs.GetInputBatch(ctx)
batch, err := txs.GetInputBatch(ctx, ub.maxBatchSize, ub.maxBatchSizeBytes)
if err != nil || len(batch) == 0 {
return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (n *Node) queueBatchBlocking(
},
txnSchedSignature: txnSchedSig,
storageSignatures: storageSignatures,
maxBatchSize: rt.TxnScheduler.MaxBatchSize,
maxBatchSizeBytes: rt.TxnScheduler.MaxBatchSizeBytes,
}
if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil {
batch.spanCtx = batchSpan.Context()
Expand Down

0 comments on commit e1ba093

Please sign in to comment.