diff --git a/.changelog/2213.bugfix.md b/.changelog/2213.bugfix.md new file mode 100644 index 00000000000..e9891d4a13e --- /dev/null +++ b/.changelog/2213.bugfix.md @@ -0,0 +1 @@ +runtime: Reduce maximum RHP message size to 16 MiB diff --git a/.changelog/3139.bugfix.md b/.changelog/3139.bugfix.md new file mode 100644 index 00000000000..ca9a3bcf4f3 --- /dev/null +++ b/.changelog/3139.bugfix.md @@ -0,0 +1 @@ +go/worker/compute: Enforce maximum batch sizes from txn scheduler diff --git a/docs/runtime/runtime-host-protocol.md b/docs/runtime/runtime-host-protocol.md index 8cf54ffb572..a6f89e359d5 100644 --- a/docs/runtime/runtime-host-protocol.md +++ b/docs/runtime/runtime-host-protocol.md @@ -24,7 +24,7 @@ using [canonical CBOR]. The frames are serialized on the wire as follows: [4-byte message length (big endian)] [CBOR-serialized message] ``` -Maximum allowed message size is 104857600 bytes. +Maximum allowed message size is 16 MiB. [canonical CBOR]: ../encoding.md diff --git a/go/common/cbor/codec.go b/go/common/cbor/codec.go index d52796cd4f2..8ab02f56777 100644 --- a/go/common/cbor/codec.go +++ b/go/common/cbor/codec.go @@ -10,7 +10,7 @@ import ( ) // Maximum message size. -const maxMessageSize = 104857600 // 100 MiB +const maxMessageSize = 16 * 1024 * 1024 // 16 MiB var ( errMessageTooLarge = errors.New("codec: message too large") diff --git a/go/runtime/transaction/transaction.go b/go/runtime/transaction/transaction.go index 5618d2b9a05..9b1369e3b08 100644 --- a/go/runtime/transaction/transaction.go +++ b/go/runtime/transaction/transaction.go @@ -214,14 +214,17 @@ func (bo inBatchOrder) Swap(i, j int) { func (bo inBatchOrder) Less(i, j int) bool { return bo.order[i] < bo.order[j] } // GetInputBatch returns a batch of transaction input artifacts in batch order. -func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) { +func (t *Tree) GetInputBatch(ctx context.Context, maxBatchSize, maxBatchSizeBytes uint64) (RawBatch, error) { it := t.tree.NewIterator(ctx, mkvs.IteratorPrefetch(prefetchArtifactCount)) defer it.Close() var curTx hash.Hash curTx.Empty() - var bo inBatchOrder + var ( + bo inBatchOrder + batchSizeBytes uint64 + ) for it.Seek(txnKeyFmt.Encode()); it.Valid(); it.Next() { var decHash hash.Hash var decKind artifactKind @@ -240,6 +243,14 @@ func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) { bo.batch = append(bo.batch, ia.Input) bo.order = append(bo.order, ia.BatchOrder) + batchSizeBytes += uint64(len(ia.Input)) + + if maxBatchSize > 0 && uint64(len(bo.batch)) > maxBatchSize { + return nil, fmt.Errorf("transaction: input batch too large (max: %d txes)", maxBatchSize) + } + if maxBatchSizeBytes > 0 && batchSizeBytes > maxBatchSizeBytes { + return nil, fmt.Errorf("transaction: input batch too large (max: %d bytes)", maxBatchSizeBytes) + } } if it.Err() != nil { return nil, fmt.Errorf("transaction: get input batch failed: %w", it.Err()) diff --git a/go/runtime/transaction/transaction_test.go b/go/runtime/transaction/transaction_test.go index f287e639445..d8392f73af4 100644 --- a/go/runtime/transaction/transaction_test.go +++ b/go/runtime/transaction/transaction_test.go @@ -117,7 +117,7 @@ func TestTransaction(t *testing.T) { } // Get input batch. - batch, err := tree.GetInputBatch(ctx) + batch, err := tree.GetInputBatch(ctx, 0, 0) require.NoError(t, err, "GetInputBatch") require.Len(t, batch, len(testTxns)+1, "batch should have the same transactions") require.EqualValues(t, tx.Input, batch[0], "input batch transactions must be in correct order") @@ -125,6 +125,12 @@ func TestTransaction(t *testing.T) { require.EqualValues(t, checkTx.Input, batch[idx+1], "input batch transactions must be in correct order") } + // Get input batch with size limits. + _, err = tree.GetInputBatch(ctx, 5, 0) + require.Error(t, err, "GetInputBatch should fail with too many transactions") + _, err = tree.GetInputBatch(ctx, 0, 64) + require.Error(t, err, "GetInputBatch should fail with too large transactions") + // Commit. // NOTE: This root is synced with tests in runtime/src/transaction/tree.rs. writeLog, rootHash, err := tree.Commit(ctx) diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go index 2d678aa4bdc..aff6229c46d 100644 --- a/go/worker/compute/executor/committee/batch.go +++ b/go/worker/compute/executor/committee/batch.go @@ -22,6 +22,9 @@ type unresolvedBatch struct { batch transaction.RawBatch spanCtx opentracing.SpanContext + + maxBatchSize uint64 + maxBatchSizeBytes uint64 } func (ub *unresolvedBatch) String() string { @@ -37,7 +40,7 @@ func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend) txs := transaction.NewTree(storage, ub.ioRoot) defer txs.Close() - batch, err := txs.GetInputBatch(ctx) + batch, err := txs.GetInputBatch(ctx, ub.maxBatchSize, ub.maxBatchSizeBytes) if err != nil || len(batch) == 0 { return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err) } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 4aafa4e5283..a230ea2d7e3 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -283,6 +283,8 @@ func (n *Node) queueBatchBlocking( }, txnSchedSignature: txnSchedSig, storageSignatures: storageSignatures, + maxBatchSize: rt.TxnScheduler.MaxBatchSize, + maxBatchSizeBytes: rt.TxnScheduler.MaxBatchSizeBytes, } if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil { batch.spanCtx = batchSpan.Context() diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index 485c7d39a9b..7825f1ca7a9 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -31,7 +31,7 @@ pub type Stream = ::std::os::unix::net::UnixStream; pub type Stream = ::std::net::TcpStream; /// Maximum message size. -const MAX_MESSAGE_SIZE: usize = 104_857_600; // 100MB +const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; // 16MiB #[derive(Error, Debug)] pub enum ProtocolError {