Skip to content

Commit

Permalink
Merge pull request #3139 from oasisprotocol/kostko/fix/reduce-rhp-cod…
Browse files Browse the repository at this point in the history
…ec-limit

runtime: Reduce maximum RHP message size
  • Loading branch information
kostko authored Jul 29, 2020
2 parents 7ab5ad2 + 8dbaf39 commit 683eb0d
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 7 deletions.
1 change: 1 addition & 0 deletions .changelog/2213.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime: Reduce maximum RHP message size to 16 MiB
1 change: 1 addition & 0 deletions .changelog/3139.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute: Enforce maximum batch sizes from txn scheduler
2 changes: 1 addition & 1 deletion docs/runtime/runtime-host-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ using [canonical CBOR]. The frames are serialized on the wire as follows:
[4-byte message length (big endian)] [CBOR-serialized message]
```

Maximum allowed message size is 104857600 bytes.
Maximum allowed message size is 16 MiB.

[canonical CBOR]: ../encoding.md

Expand Down
2 changes: 1 addition & 1 deletion go/common/cbor/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// Maximum message size.
const maxMessageSize = 104857600 // 100 MiB
const maxMessageSize = 16 * 1024 * 1024 // 16 MiB

var (
errMessageTooLarge = errors.New("codec: message too large")
Expand Down
15 changes: 13 additions & 2 deletions go/runtime/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ func (bo inBatchOrder) Swap(i, j int) {
func (bo inBatchOrder) Less(i, j int) bool { return bo.order[i] < bo.order[j] }

// GetInputBatch returns a batch of transaction input artifacts in batch order.
func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {
func (t *Tree) GetInputBatch(ctx context.Context, maxBatchSize, maxBatchSizeBytes uint64) (RawBatch, error) {
it := t.tree.NewIterator(ctx, mkvs.IteratorPrefetch(prefetchArtifactCount))
defer it.Close()

var curTx hash.Hash
curTx.Empty()

var bo inBatchOrder
var (
bo inBatchOrder
batchSizeBytes uint64
)
for it.Seek(txnKeyFmt.Encode()); it.Valid(); it.Next() {
var decHash hash.Hash
var decKind artifactKind
Expand All @@ -240,6 +243,14 @@ func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {

bo.batch = append(bo.batch, ia.Input)
bo.order = append(bo.order, ia.BatchOrder)
batchSizeBytes += uint64(len(ia.Input))

if maxBatchSize > 0 && uint64(len(bo.batch)) > maxBatchSize {
return nil, fmt.Errorf("transaction: input batch too large (max: %d txes)", maxBatchSize)
}
if maxBatchSizeBytes > 0 && batchSizeBytes > maxBatchSizeBytes {
return nil, fmt.Errorf("transaction: input batch too large (max: %d bytes)", maxBatchSizeBytes)
}
}
if it.Err() != nil {
return nil, fmt.Errorf("transaction: get input batch failed: %w", it.Err())
Expand Down
8 changes: 7 additions & 1 deletion go/runtime/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,20 @@ func TestTransaction(t *testing.T) {
}

// Get input batch.
batch, err := tree.GetInputBatch(ctx)
batch, err := tree.GetInputBatch(ctx, 0, 0)
require.NoError(t, err, "GetInputBatch")
require.Len(t, batch, len(testTxns)+1, "batch should have the same transactions")
require.EqualValues(t, tx.Input, batch[0], "input batch transactions must be in correct order")
for idx, checkTx := range testTxns {
require.EqualValues(t, checkTx.Input, batch[idx+1], "input batch transactions must be in correct order")
}

// Get input batch with size limits.
_, err = tree.GetInputBatch(ctx, 5, 0)
require.Error(t, err, "GetInputBatch should fail with too many transactions")
_, err = tree.GetInputBatch(ctx, 0, 64)
require.Error(t, err, "GetInputBatch should fail with too large transactions")

// Commit.
// NOTE: This root is synced with tests in runtime/src/transaction/tree.rs.
writeLog, rootHash, err := tree.Commit(ctx)
Expand Down
5 changes: 4 additions & 1 deletion go/worker/compute/executor/committee/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type unresolvedBatch struct {

batch transaction.RawBatch
spanCtx opentracing.SpanContext

maxBatchSize uint64
maxBatchSizeBytes uint64
}

func (ub *unresolvedBatch) String() string {
Expand All @@ -37,7 +40,7 @@ func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend)
txs := transaction.NewTree(storage, ub.ioRoot)
defer txs.Close()

batch, err := txs.GetInputBatch(ctx)
batch, err := txs.GetInputBatch(ctx, ub.maxBatchSize, ub.maxBatchSizeBytes)
if err != nil || len(batch) == 0 {
return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (n *Node) queueBatchBlocking(
},
txnSchedSignature: txnSchedSig,
storageSignatures: storageSignatures,
maxBatchSize: rt.TxnScheduler.MaxBatchSize,
maxBatchSizeBytes: rt.TxnScheduler.MaxBatchSizeBytes,
}
if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil {
batch.spanCtx = batchSpan.Context()
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub type Stream = ::std::os::unix::net::UnixStream;
pub type Stream = ::std::net::TcpStream;

/// Maximum message size.
const MAX_MESSAGE_SIZE: usize = 104_857_600; // 100MB
const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; // 16MiB

#[derive(Error, Debug)]
pub enum ProtocolError {
Expand Down

0 comments on commit 683eb0d

Please sign in to comment.