Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Reduce maximum RHP message size #3139

Merged
merged 2 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/2213.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime: Reduce maximum RHP message size to 16 MiB
1 change: 1 addition & 0 deletions .changelog/3139.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute: Enforce maximum batch sizes from txn scheduler
2 changes: 1 addition & 1 deletion docs/runtime/runtime-host-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ using [canonical CBOR]. The frames are serialized on the wire as follows:
[4-byte message length (big endian)] [CBOR-serialized message]
```

Maximum allowed message size is 104857600 bytes.
Maximum allowed message size is 16 MiB.

[canonical CBOR]: ../encoding.md

Expand Down
2 changes: 1 addition & 1 deletion go/common/cbor/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// Maximum message size.
const maxMessageSize = 104857600 // 100 MiB
const maxMessageSize = 16 * 1024 * 1024 // 16 MiB

var (
errMessageTooLarge = errors.New("codec: message too large")
Expand Down
15 changes: 13 additions & 2 deletions go/runtime/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,17 @@ func (bo inBatchOrder) Swap(i, j int) {
func (bo inBatchOrder) Less(i, j int) bool { return bo.order[i] < bo.order[j] }

// GetInputBatch returns a batch of transaction input artifacts in batch order.
func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {
func (t *Tree) GetInputBatch(ctx context.Context, maxBatchSize, maxBatchSizeBytes uint64) (RawBatch, error) {
it := t.tree.NewIterator(ctx, mkvs.IteratorPrefetch(prefetchArtifactCount))
defer it.Close()

var curTx hash.Hash
curTx.Empty()

var bo inBatchOrder
var (
bo inBatchOrder
batchSizeBytes uint64
)
for it.Seek(txnKeyFmt.Encode()); it.Valid(); it.Next() {
var decHash hash.Hash
var decKind artifactKind
Expand All @@ -240,6 +243,14 @@ func (t *Tree) GetInputBatch(ctx context.Context) (RawBatch, error) {

bo.batch = append(bo.batch, ia.Input)
bo.order = append(bo.order, ia.BatchOrder)
batchSizeBytes += uint64(len(ia.Input))

if maxBatchSize > 0 && uint64(len(bo.batch)) > maxBatchSize {
return nil, fmt.Errorf("transaction: input batch too large (max: %d txes)", maxBatchSize)
}
if maxBatchSizeBytes > 0 && batchSizeBytes > maxBatchSizeBytes {
return nil, fmt.Errorf("transaction: input batch too large (max: %d bytes)", maxBatchSizeBytes)
}
}
if it.Err() != nil {
return nil, fmt.Errorf("transaction: get input batch failed: %w", it.Err())
Expand Down
8 changes: 7 additions & 1 deletion go/runtime/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,20 @@ func TestTransaction(t *testing.T) {
}

// Get input batch.
batch, err := tree.GetInputBatch(ctx)
batch, err := tree.GetInputBatch(ctx, 0, 0)
require.NoError(t, err, "GetInputBatch")
require.Len(t, batch, len(testTxns)+1, "batch should have the same transactions")
require.EqualValues(t, tx.Input, batch[0], "input batch transactions must be in correct order")
for idx, checkTx := range testTxns {
require.EqualValues(t, checkTx.Input, batch[idx+1], "input batch transactions must be in correct order")
}

// Get input batch with size limits.
_, err = tree.GetInputBatch(ctx, 5, 0)
require.Error(t, err, "GetInputBatch should fail with too many transactions")
_, err = tree.GetInputBatch(ctx, 0, 64)
require.Error(t, err, "GetInputBatch should fail with too large transactions")

// Commit.
// NOTE: This root is synced with tests in runtime/src/transaction/tree.rs.
writeLog, rootHash, err := tree.Commit(ctx)
Expand Down
5 changes: 4 additions & 1 deletion go/worker/compute/executor/committee/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type unresolvedBatch struct {

batch transaction.RawBatch
spanCtx opentracing.SpanContext

maxBatchSize uint64
maxBatchSizeBytes uint64
}

func (ub *unresolvedBatch) String() string {
Expand All @@ -37,7 +40,7 @@ func (ub *unresolvedBatch) resolve(ctx context.Context, storage storage.Backend)
txs := transaction.NewTree(storage, ub.ioRoot)
defer txs.Close()

batch, err := txs.GetInputBatch(ctx)
batch, err := txs.GetInputBatch(ctx, ub.maxBatchSize, ub.maxBatchSizeBytes)
if err != nil || len(batch) == 0 {
return nil, fmt.Errorf("failed to fetch inputs from storage: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (n *Node) queueBatchBlocking(
},
txnSchedSignature: txnSchedSig,
storageSignatures: storageSignatures,
maxBatchSize: rt.TxnScheduler.MaxBatchSize,
maxBatchSizeBytes: rt.TxnScheduler.MaxBatchSizeBytes,
}
if batchSpan := opentracing.SpanFromContext(ctx); batchSpan != nil {
batch.spanCtx = batchSpan.Context()
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub type Stream = ::std::os::unix::net::UnixStream;
pub type Stream = ::std::net::TcpStream;

/// Maximum message size.
const MAX_MESSAGE_SIZE: usize = 104_857_600; // 100MB
const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; // 16MiB

#[derive(Error, Debug)]
pub enum ProtocolError {
Expand Down