From b2f9626a8fe348ed39adf324b6d069ff3629d451 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 18 Mar 2022 12:37:46 +0100 Subject: [PATCH] Don't clear tx pool on epoch transitions, add txsync protocol --- .changelog/4579.bugfix.md | 1 + go/oasis-net-runner/fixtures/default.go | 4 +- .../cmd/debug/txsource/workload/runtime.go | 34 +++--- .../scenario/e2e/runtime/multiple_runtimes.go | 4 +- .../scenario/e2e/runtime/runtime.go | 32 ++++-- .../scenario/e2e/runtime/runtime_client_kv.go | 23 ++-- .../e2e/runtime/runtime_client_kv_enc.go | 50 ++------- .../e2e/runtime/runtime_governance.go | 12 +-- go/runtime/txpool/txpool.go | 50 +++------ go/worker/common/committee/node.go | 3 + go/worker/common/config.go | 9 +- go/worker/common/p2p/rpc/client.go | 24 +++++ go/worker/common/p2p/txsync/client.go | 63 +++++++++++ go/worker/common/p2p/txsync/protocol.go | 31 ++++++ go/worker/common/p2p/txsync/server.go | 55 ++++++++++ go/worker/compute/executor/committee/node.go | 35 +----- .../executor/committee/transactions.go | 102 ++++++++++++++++++ tests/runtimes/simple-keyvalue/src/main.rs | 48 +++++++-- tests/runtimes/simple-keyvalue/src/methods.rs | 25 +---- tests/runtimes/simple-keyvalue/src/types.rs | 9 +- 20 files changed, 402 insertions(+), 212 deletions(-) create mode 100644 .changelog/4579.bugfix.md create mode 100644 go/worker/common/p2p/txsync/client.go create mode 100644 go/worker/common/p2p/txsync/protocol.go create mode 100644 go/worker/common/p2p/txsync/server.go create mode 100644 go/worker/compute/executor/committee/transactions.go diff --git a/.changelog/4579.bugfix.md b/.changelog/4579.bugfix.md new file mode 100644 index 00000000000..a3b75f15542 --- /dev/null +++ b/.changelog/4579.bugfix.md @@ -0,0 +1 @@ +Don't clear tx pool on epoch transitions, add txsync protocol diff --git a/go/oasis-net-runner/fixtures/default.go b/go/oasis-net-runner/fixtures/default.go index c975d24f67e..1df3b2af98a 100644 --- a/go/oasis-net-runner/fixtures/default.go +++ b/go/oasis-net-runner/fixtures/default.go @@ -182,9 +182,9 @@ func newDefaultFixture() (*oasis.NetworkFixture, error) { MaxMessages: 128, }, TxnScheduler: registry.TxnSchedulerParameters{ - MaxBatchSize: 1, + MaxBatchSize: 1000, MaxBatchSizeBytes: 16 * 1024 * 1024, // 16 MiB - BatchFlushTimeout: 20 * time.Second, + BatchFlushTimeout: 1 * time.Second, ProposerTimeout: 20, }, AdmissionPolicy: registry.RuntimeAdmissionPolicy{ diff --git a/go/oasis-node/cmd/debug/txsource/workload/runtime.go b/go/oasis-node/cmd/debug/txsource/workload/runtime.go index f6bfa0ea69f..f1306e9ae18 100644 --- a/go/oasis-node/cmd/debug/txsource/workload/runtime.go +++ b/go/oasis-node/cmd/debug/txsource/workload/runtime.go @@ -76,6 +76,8 @@ var RuntimeFlags = flag.NewFlagSet("", flag.ContinueOnError) // TxnCall is a transaction call in the test runtime. type TxnCall struct { + // Nonce is a nonce. + Nonce uint64 `json:"nonce"` // Method is the called method name. Method string `json:"method"` // Args are the method arguments. @@ -231,15 +233,14 @@ func (r *runtime) doInsertRequest(ctx context.Context, rng *rand.Rand, rtc runti // Submit request. req := &TxnCall{ + Nonce: rng.Uint64(), Method: "insert", Args: struct { Key string `json:"key"` Value string `json:"value"` - Nonce uint64 `json:"nonce"` }{ Key: key, Value: value, - Nonce: rng.Uint64(), }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -284,13 +285,12 @@ func (r *runtime) doGetRequest(ctx context.Context, rng *rand.Rand, rtc runtimeC // Submit request. req := &TxnCall{ + Nonce: rng.Uint64(), Method: "get", Args: struct { - Key string `json:"key"` - Nonce uint64 `json:"nonce"` + Key string `json:"key"` }{ - Key: key, - Nonce: rng.Uint64(), + Key: key, }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -332,13 +332,12 @@ func (r *runtime) doRemoveRequest(ctx context.Context, rng *rand.Rand, rtc runti // Submit request. req := &TxnCall{ + Nonce: rng.Uint64(), Method: "remove", Args: struct { - Key string `json:"key"` - Nonce uint64 `json:"nonce"` + Key string `json:"key"` }{ - Key: key, - Nonce: rng.Uint64(), + Key: key, }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -386,15 +385,14 @@ func (r *runtime) doInMsgRequest(ctx context.Context, rng *rand.Rand, rtc runtim ID: r.runtimeID, Tag: 42, Data: cbor.Marshal(&TxnCall{ + Nonce: rng.Uint64(), Method: "insert", Args: struct { Key string `json:"key"` Value string `json:"value"` - Nonce uint64 `json:"nonce"` }{ Key: key, Value: value, - Nonce: rng.Uint64(), }, }), }) @@ -568,16 +566,15 @@ func (r *runtime) doWithdrawRequest(ctx context.Context, rng *rand.Rand, rtc run // Submit message request. amount := *quantity.NewFromUint64(1) req := &TxnCall{ + Nonce: rng.Uint64(), Method: "consensus_withdraw", Args: struct { Withdraw staking.Withdraw `json:"withdraw"` - Nonce uint64 `json:"nonce"` }{ Withdraw: staking.Withdraw{ From: r.testAddress, Amount: amount, }, - Nonce: rng.Uint64(), }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -613,16 +610,15 @@ func (r *runtime) doTransferRequest(ctx context.Context, rng *rand.Rand, rtc run // Submit message request. amount := *quantity.NewFromUint64(1) req := &TxnCall{ + Nonce: rng.Uint64(), Method: "consensus_transfer", Args: struct { Transfer staking.Transfer `json:"transfer"` - Nonce uint64 `json:"nonce"` }{ Transfer: staking.Transfer{ To: r.testAddress, Amount: amount, }, - Nonce: rng.Uint64(), }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -658,16 +654,15 @@ func (r *runtime) doAddEscrowRequest(ctx context.Context, rng *rand.Rand, rtc ru // Submit message request. amount := *quantity.NewFromUint64(1) req := &TxnCall{ + Nonce: rng.Uint64(), Method: "consensus_add_escrow", Args: struct { Escrow staking.Escrow `json:"escrow"` - Nonce uint64 `json:"nonce"` }{ Escrow: staking.Escrow{ Account: r.testAddress, Amount: amount, }, - Nonce: rng.Uint64(), }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) @@ -705,16 +700,15 @@ func (r *runtime) doReclaimEscrowRequest(ctx context.Context, rng *rand.Rand, rt // getting any rewards or is being slashed. amount := *quantity.NewFromUint64(1) req := &TxnCall{ + Nonce: rng.Uint64(), Method: "consensus_reclaim_escrow", Args: struct { ReclaimEscrow staking.ReclaimEscrow `json:"reclaim_escrow"` - Nonce uint64 `json:"nonce"` }{ ReclaimEscrow: staking.ReclaimEscrow{ Account: r.testAddress, Shares: amount, }, - Nonce: rng.Uint64(), }, } rsp, round, err := r.submitRuntimeRquest(ctx, rtc, req) diff --git a/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go b/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go index ca7360df404..f537aca3d2f 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/multiple_runtimes.go @@ -92,8 +92,8 @@ func (sc *multipleRuntimesImpl) Fixture() (*oasis.NetworkFixture, error) { RoundTimeout: 20, }, TxnScheduler: registry.TxnSchedulerParameters{ - MaxBatchSize: 1, - MaxBatchSizeBytes: 1024, + MaxBatchSize: 100, + MaxBatchSizeBytes: 1024 * 1024, BatchFlushTimeout: 1 * time.Second, ProposerTimeout: 10, }, diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go index d78acc727d3..009ada36756 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go @@ -61,6 +61,8 @@ var ( // TxnCall is a transaction call in the test runtime. type TxnCall struct { + // Nonce is a nonce. + Nonce uint64 `json:"nonce"` // Method is the called method name. Method string `json:"method"` // Args are the method arguments. @@ -202,8 +204,8 @@ func (sc *runtimeImpl) Fixture() (*oasis.NetworkFixture, error) { MaxMessages: 128, }, TxnScheduler: registry.TxnSchedulerParameters{ - MaxBatchSize: 1, - MaxBatchSizeBytes: 1024, + MaxBatchSize: 100, + MaxBatchSizeBytes: 1024 * 1024, BatchFlushTimeout: 1 * time.Second, ProposerTimeout: 20, MaxInMessages: 128, @@ -362,9 +364,15 @@ func (sc *runtimeImpl) Run(childEnv *env.Env) error { return sc.waitTestClient() } -func (sc *runtimeImpl) submitRuntimeTx(ctx context.Context, id common.Namespace, method string, args interface{}) (cbor.RawMessage, error) { +func (sc *runtimeImpl) submitRuntimeTx( + ctx context.Context, + id common.Namespace, + nonce uint64, + method string, + args interface{}, +) (cbor.RawMessage, error) { // Submit a transaction and check the result. - metaResp, err := sc.submitRuntimeTxMeta(ctx, id, method, args) + metaResp, err := sc.submitRuntimeTxMeta(ctx, id, nonce, method, args) if err != nil { return nil, err } @@ -378,6 +386,7 @@ func (sc *runtimeImpl) submitRuntimeTx(ctx context.Context, id common.Namespace, func (sc *runtimeImpl) submitRuntimeTxMeta( ctx context.Context, id common.Namespace, + nonce uint64, method string, args interface{}, ) (*runtimeClient.SubmitTxMetaResponse, error) { @@ -390,6 +399,7 @@ func (sc *runtimeImpl) submitRuntimeTxMeta( resp, err := c.SubmitTxMeta(ctx, &runtimeClient.SubmitTxRequest{ RuntimeID: id, Data: cbor.Marshal(&TxnCall{ + Nonce: nonce, Method: method, Args: args, }), @@ -397,6 +407,9 @@ func (sc *runtimeImpl) submitRuntimeTxMeta( if err != nil { return nil, fmt.Errorf("failed to submit runtime meta tx: %w", err) } + if resp.CheckTxError != nil { + return nil, fmt.Errorf("check tx failed: %s", resp.CheckTxError.Message) + } return resp, nil } @@ -418,12 +431,10 @@ func (sc *runtimeImpl) submitConsensusXferTx( xfer staking.Transfer, nonce uint64, ) error { - _, err := sc.submitRuntimeTx(ctx, runtimeID, "consensus_transfer", struct { + _, err := sc.submitRuntimeTx(ctx, runtimeID, nonce, "consensus_transfer", struct { Transfer staking.Transfer `json:"transfer"` - Nonce uint64 `json:"nonce"` }{ Transfer: xfer, - Nonce: nonce, }) return err } @@ -434,16 +445,14 @@ func (sc *runtimeImpl) submitConsensusXferTxMeta( xfer staking.Transfer, nonce uint64, ) (*runtimeClient.SubmitTxMetaResponse, error) { - return sc.submitRuntimeTxMeta(ctx, runtimeID, "consensus_transfer", struct { + return sc.submitRuntimeTxMeta(ctx, runtimeID, nonce, "consensus_transfer", struct { Transfer staking.Transfer `json:"transfer"` - Nonce uint64 `json:"nonce"` }{ Transfer: xfer, - Nonce: nonce, }) } -func (sc *runtimeImpl) submitRuntimeInMsg(ctx context.Context, id common.Namespace, method string, args interface{}) error { +func (sc *runtimeImpl) submitRuntimeInMsg(ctx context.Context, id common.Namespace, nonce uint64, method string, args interface{}) error { ctrl := sc.Net.ClientController() if ctrl == nil { return fmt.Errorf("client controller not available") @@ -454,6 +463,7 @@ func (sc *runtimeImpl) submitRuntimeInMsg(ctx context.Context, id common.Namespa ID: id, Tag: 42, Data: cbor.Marshal(&TxnCall{ + Nonce: nonce, Method: method, Args: args, }), diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv.go index 7e8c7d934b4..a2d0bafd4b8 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv.go @@ -122,7 +122,7 @@ func (cli *KeyValueTestClient) workload(ctx context.Context) error { // side `to_string()` returns `8000…0000`, and the original Rust // test client was doing a string compare so no one ever noticed // that truncated values were being compared. - if _, err = cli.sc.submitKeyValueRuntimeGetRuntimeIDTx(ctx, runtimeID); err != nil { + if _, err = cli.sc.submitKeyValueRuntimeGetRuntimeIDTx(ctx, runtimeID, rng.Uint64()); err != nil { return fmt.Errorf("failed to query remote runtime ID: %w", err) } @@ -217,14 +217,12 @@ func (cli *KeyValueTestClient) workload(ctx context.Context) error { inMsgKey = "in_msg" inMsgValue = "hello world from inmsg" ) - err = cli.sc.submitRuntimeInMsg(ctx, runtimeID, "insert", struct { + err = cli.sc.submitRuntimeInMsg(ctx, runtimeID, rng.Uint64(), "insert", struct { Key string `json:"key"` Value string `json:"value"` - Nonce uint64 `json:"nonce"` }{ Key: inMsgKey, Value: inMsgValue, - Nonce: rng.Uint64(), }) if err != nil { return fmt.Errorf("failed to submit 'insert' incoming runtime message: %w", err) @@ -239,7 +237,7 @@ func (cli *KeyValueTestClient) workload(ctx context.Context) error { } cli.sc.Logger.Info("testing consensus queries") - if _, err = cli.sc.submitRuntimeTx(ctx, runtimeID, "consensus_accounts", nil); err != nil { + if _, err = cli.sc.submitRuntimeTx(ctx, runtimeID, rng.Uint64(), "consensus_accounts", nil); err != nil { return fmt.Errorf("failed to submit consensus_accounts query: %w", err) } // TODO: The old test printed out the accounts and delegations, but @@ -262,14 +260,12 @@ func (sc *runtimeImpl) submitKeyValueRuntimeInsertTx( key, value string, nonce uint64, ) (string, error) { - rawRsp, err := sc.submitRuntimeTx(ctx, id, "insert", struct { + rawRsp, err := sc.submitRuntimeTx(ctx, id, nonce, "insert", struct { Key string `json:"key"` Value string `json:"value"` - Nonce uint64 `json:"nonce"` }{ Key: key, Value: value, - Nonce: nonce, }) if err != nil { return "", fmt.Errorf("failed to submit insert tx to runtime: %w", err) @@ -289,12 +285,10 @@ func (sc *runtimeImpl) submitKeyValueRuntimeGetTx( key string, nonce uint64, ) (string, error) { - rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, "get", struct { - Key string `json:"key"` - Nonce uint64 `json:"nonce"` + rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, nonce, "get", struct { + Key string `json:"key"` }{ - Key: key, - Nonce: nonce, + Key: key, }) if err != nil { return "", fmt.Errorf("failed to submit get tx to runtime: %w", err) @@ -311,8 +305,9 @@ func (sc *runtimeImpl) submitKeyValueRuntimeGetTx( func (sc *runtimeImpl) submitKeyValueRuntimeGetRuntimeIDTx( ctx context.Context, id common.Namespace, + nonce uint64, ) (string, error) { - rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, "get_runtime_id", nil) + rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, nonce, "get_runtime_id", nil) if err != nil { return "", fmt.Errorf("failed to submit get_runtime_id tx to runtime: %w", err) } diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv_enc.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv_enc.go index 94a6069c01c..17d8ebbbf60 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv_enc.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime_client_kv_enc.go @@ -124,13 +124,12 @@ func (cli *KeyValueEncTestClient) workload(ctx context.Context) error { if _, err = cli.sc.submitRuntimeTx( ctx, runtimeID, + rng.Uint64(), "enc_remove", struct { - Key string `json:"key"` - Nonce uint64 `json:"nonce"` + Key string `json:"key"` }{ - Key: cli.key, - Nonce: rng.Uint64(), + Key: cli.key, }, ); err != nil { return fmt.Errorf("failed to remove k/v pair: %w", err) @@ -150,37 +149,6 @@ func (cli *KeyValueEncTestClient) workload(ctx context.Context) error { return fmt.Errorf("key still exists in database after removal: '%v'", resp) } - // TODO: Someone that cares, that thinks this is still relevant - // should convert the test over, but getting rid of grpc from - // the Rust side of oasis-core would be really nice. - - /* - // Test that key manager connection via EnclaveRPC works. - println!("Testing key manager connection via gRPC transport..."); - // TODO: Key manager MRENCLAVE. - let km_client = Arc::new(oasis_core_keymanager_client::RemoteClient::new_grpc( - runtime_id, - None, - node.channel(), - 1024, - )); - - // Request public key for some "key pair id". - let key_pair_id = KeyPairId::from(Hash::empty_hash().as_ref()); - let r = rt - .block_on(km_client.get_public_key(Context::background(), key_pair_id)) - .unwrap(); - assert!(r.is_some(), "get_public_key should return a public key"); - let pkey = r; - - let r = rt - .block_on(km_client.get_public_key(Context::background(), key_pair_id)) - .unwrap(); - assert_eq!(r, pkey, "get_public_key should return the same public key"); - - println!("Simple key/value client finished."); - */ - cli.sc.Logger.Info("simple k/v (enc) client finished") return nil @@ -199,14 +167,12 @@ func (sc *runtimeImpl) submitKeyValueRuntimeEncInsertTx( key, value string, nonce uint64, ) (string, error) { - rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, "enc_insert", struct { + rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, nonce, "enc_insert", struct { Key string `json:"key"` Value string `json:"value"` - Nonce uint64 `json:"nonce"` }{ Key: key, Value: value, - Nonce: nonce, }) if err != nil { return "", fmt.Errorf("failed to submit enc_insert tx to runtime: %w", err) @@ -226,12 +192,10 @@ func (sc *runtimeImpl) submitKeyValueRuntimeEncGetTx( key string, nonce uint64, ) (string, error) { - rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, "enc_get", struct { - Key string `json:"key"` - Nonce uint64 `json:"nonce"` + rawRsp, err := sc.submitRuntimeTx(ctx, runtimeID, nonce, "enc_get", struct { + Key string `json:"key"` }{ - Key: key, - Nonce: nonce, + Key: key, }) if err != nil { return "", fmt.Errorf("failed to submit get tx to runtime: %w", err) diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go index c9fb3a95a76..13ce90c9cde 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime_governance.go @@ -93,8 +93,8 @@ func (sc *runtimeGovernanceImpl) Fixture() (*oasis.NetworkFixture, error) { MaxMessages: 128, }, TxnScheduler: registry.TxnSchedulerParameters{ - MaxBatchSize: 1, - MaxBatchSizeBytes: 1024, + MaxBatchSize: 100, + MaxBatchSizeBytes: 1024 * 1024, BatchFlushTimeout: 1 * time.Second, ProposerTimeout: 10, }, @@ -274,12 +274,10 @@ func (sc *runtimeGovernanceImpl) Run(childEnv *env.Env) error { newRT.Executor.MaxMessages = 64 newRT.Genesis.StateRoot.Empty() - meta, err := sc.submitRuntimeTxMeta(ctx, rt.ID, "update_runtime", struct { + meta, err := sc.submitRuntimeTxMeta(ctx, rt.ID, rtNonce, "update_runtime", struct { UpdateRuntime registry.Runtime `json:"update_runtime"` - Nonce uint64 `json:"nonce"` }{ UpdateRuntime: newRT, - Nonce: rtNonce, }) if err != nil { return err @@ -335,12 +333,10 @@ func (sc *runtimeGovernanceImpl) Run(childEnv *env.Env) error { "src_runtime", rt.ID, "target_runtime", otherRT.ID, ) - meta, err = sc.submitRuntimeTxMeta(ctx, rt.ID, "update_runtime", struct { + meta, err = sc.submitRuntimeTxMeta(ctx, rt.ID, rtNonce, "update_runtime", struct { UpdateRuntime registry.Runtime `json:"update_runtime"` - Nonce uint64 `json:"nonce"` }{ UpdateRuntime: newRT, - Nonce: rtNonce, }) if err != nil { return err diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 76ee7ae35bf..ce006718405 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -30,6 +30,8 @@ const ( checkTxRetryDelay = 1 * time.Second // abortTimeout is the maximum time the runtime can spend aborting. abortTimeout = 5 * time.Second + // maxRepublishTxs is the maximum amount of transactions to republish. + maxRepublishTxs = 32 ) // Config is the transaction pool configuration. @@ -37,7 +39,6 @@ type Config struct { MaxPoolSize uint64 MaxCheckTxBatchSize uint64 MaxLastSeenCacheSize uint64 - MaxStaleCacheSize uint64 RepublishInterval time.Duration @@ -171,9 +172,6 @@ type txPool struct { // seenCache maps from transaction hashes to time.Time that specifies when the transaction was // last published. seenCache *lru.Cache - // staleCache maps from transaction hashes to *transaction.CheckedTransaction. It is populated - // when clearing the txpool and consulted only when fetching known batches. - staleCache *lru.Cache checkTxCh *channels.RingChannel checkTxQueue *checkTxQueue @@ -276,9 +274,6 @@ func (t *txPool) RemoveTxBatch(txs []hash.Hash) { defer t.schedulerLock.Unlock() t.schedulerQueue.RemoveTxBatch(txs) - for _, txHash := range txs { - _ = t.staleCache.Remove(txHash) - } pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.Size())) } @@ -301,20 +296,7 @@ func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*transaction.CheckedTransac t.schedulerLock.Lock() defer t.schedulerLock.Unlock() - // First consult the scheduler. - txs, missing := t.schedulerQueue.GetKnownBatch(batch) - - // Check the stale cache for any missing transactions. - for txHash, idx := range missing { - tx, exists := t.staleCache.Get(txHash) - if !exists { - continue - } - - txs[idx] = tx.(*transaction.CheckedTransaction) - delete(missing, txHash) - } - return txs, missing + return t.schedulerQueue.GetKnownBatch(batch) } func (t *txPool) ProcessBlock(bi *BlockInfo) error { @@ -412,12 +394,6 @@ func (t *txPool) Clear() { defer t.schedulerLock.Unlock() if t.schedulerQueue != nil { - // Before clearing, move some transactions to the stale cache. - txs := t.schedulerQueue.GetTransactions(int(t.cfg.MaxStaleCacheSize)) - for _, tx := range txs { - _ = t.staleCache.Put(tx.Hash(), tx) - } - t.schedulerQueue.Clear() } t.seenCache.Clear() @@ -555,6 +531,11 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Unschedule any transactions that are being rechecked and have failed checks. t.RemoveTxBatch(unschedule) + // If there are more transactions to check, make sure we check them next. + if t.checkTxQueue.Size() > 0 { + t.checkTxCh.In() <- struct{}{} + } + if len(txs) == 0 { return } @@ -689,7 +670,6 @@ func (t *txPool) republishWorker() { }() for { - var force bool select { case <-t.stopCh: return @@ -710,9 +690,6 @@ func (t *txPool) republishWorker() { } case <-debounceCh: debounceCh = nil - case <-t.epoCh.Out(): - // Force republish on epoch transitions. - force = true } lastRepublish = time.Now() @@ -727,7 +704,7 @@ func (t *txPool) republishWorker() { nextPendingRepublish := republishInterval for _, tx := range txs { ts, seen := t.seenCache.Peek(tx.Hash()) - if !force && seen { + if seen { sinceLast := time.Since(ts.(time.Time)) if sinceLast < republishInterval { if remaining := republishInterval - sinceLast; remaining < nextPendingRepublish { @@ -750,6 +727,9 @@ func (t *txPool) republishWorker() { _ = t.seenCache.Put(tx.Hash(), time.Now()) republishedCount++ + if republishedCount > maxRepublishTxs { + break + } } // Reschedule ticker for next republish. @@ -831,11 +811,6 @@ func New( return nil, fmt.Errorf("error creating seen cache: %w", err) } - staleCache, err := lru.New(lru.Capacity(cfg.MaxStaleCacheSize, false)) - if err != nil { - return nil, fmt.Errorf("error creating stale cache: %w", err) - } - return &txPool{ logger: logging.GetLogger("runtime/txpool"), stopCh: make(chan struct{}), @@ -846,7 +821,6 @@ func New( host: host, txPublisher: txPublisher, seenCache: seenCache, - staleCache: staleCache, checkTxQueue: newCheckTxQueue(cfg.MaxPoolSize, cfg.MaxCheckTxBatchSize), checkTxCh: channels.NewRingChannel(1), checkTxNotifier: pubsub.NewBroker(false), diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 304c2e2594a..d73c3273358 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -22,6 +22,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/txpool" "github.com/oasisprotocol/oasis-core/go/worker/common/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" keymanagerP2P "github.com/oasisprotocol/oasis-core/go/worker/keymanager/p2p" ) @@ -704,6 +705,8 @@ func NewNode( // Register transaction message handler as that is something that all workers must handle. p2pHost.RegisterHandler(runtime.ID(), p2p.TopicKindTx, &txMsgHandler{n}) + // Register transaction sync service. + p2pHost.RegisterProtocolServer(txsync.NewServer(runtime.ID(), txPool)) return n, nil } diff --git a/go/worker/common/config.go b/go/worker/common/config.go index 0fe192a5a57..a8686de13cc 100644 --- a/go/worker/common/config.go +++ b/go/worker/common/config.go @@ -26,7 +26,6 @@ var ( cfgMaxTxPoolSize = "worker.tx_pool.schedule_max_tx_pool_size" cfgScheduleTxCacheSize = "worker.tx_pool.schedule_tx_cache_size" - cfgStaleTxCacheSize = "worker.tx_pool.stale_tx_cache_size" cfgCheckTxMaxBatchSize = "worker.tx_pool.check_tx_max_batch_size" cfgRecheckInterval = "worker.tx_pool.recheck_interval" @@ -96,7 +95,6 @@ func NewConfig() (*Config, error) { MaxPoolSize: viper.GetUint64(cfgMaxTxPoolSize), MaxCheckTxBatchSize: viper.GetUint64(cfgCheckTxMaxBatchSize), MaxLastSeenCacheSize: viper.GetUint64(cfgScheduleTxCacheSize), - MaxStaleCacheSize: viper.GetUint64(cfgStaleTxCacheSize), // TODO: Make these configurable. RepublishInterval: 60 * time.Second, @@ -114,10 +112,9 @@ func init() { Flags.StringSlice(cfgClientAddresses, []string{}, "Address/port(s) to use for client connections when registering this node (if not set, all non-loopback local interfaces will be used)") Flags.StringSlice(CfgSentryAddresses, []string{}, "Address(es) of sentry node(s) to connect to of the form [PubKey@]ip:port (where PubKey@ part represents base64 encoded node TLS public key)") - Flags.Uint64(cfgMaxTxPoolSize, 10_000, "Maximum size of the scheduling transaction pool") - Flags.Uint64(cfgScheduleTxCacheSize, 10_000, "Maximum cache size of recently scheduled transactions to prevent re-scheduling") - Flags.Uint64(cfgStaleTxCacheSize, 64, "Maximum cache size of recently cleared transactions") - Flags.Uint64(cfgCheckTxMaxBatchSize, 10_000, "Maximum check tx batch size") + Flags.Uint64(cfgMaxTxPoolSize, 50_000, "Maximum size of the scheduling transaction pool") + Flags.Uint64(cfgScheduleTxCacheSize, 100_000, "Maximum cache size of recently scheduled transactions to prevent re-scheduling") + Flags.Uint64(cfgCheckTxMaxBatchSize, 1000, "Maximum check tx batch size") Flags.Uint64(cfgRecheckInterval, 32, "Transaction recheck interval (in rounds)") _ = viper.BindPFlags(Flags) diff --git a/go/worker/common/p2p/rpc/client.go b/go/worker/common/p2p/rpc/client.go index 25875e6ac76..c74de10503e 100644 --- a/go/worker/common/p2p/rpc/client.go +++ b/go/worker/common/p2p/rpc/client.go @@ -110,10 +110,14 @@ func WithPeerFilter(filter PeerFilter) ClientOption { } } +// ValidationFunc is a call response validation function. +type ValidationFunc func(pf PeerFeedback) error + // CallOptions are per-call options. type CallOptions struct { retryInterval time.Duration maxRetries uint64 + validationFn ValidationFunc } // CallOption is a per-call option setter. @@ -133,6 +137,15 @@ func WithRetryInterval(retryInterval time.Duration) CallOption { } } +// WithValidationFn configures the response validation function to use for the call. +// +// When the function is called, the decoded response value will be set. +func WithValidationFn(fn ValidationFunc) CallOption { + return func(opts *CallOptions) { + opts.validationFn = fn + } +} + // Client is an RPC client for a given protocol. type Client interface { PeerManager @@ -224,6 +237,17 @@ func (c *client) Call( if err != nil { continue } + if co.validationFn != nil { + err := co.validationFn(pf) + if err != nil { + c.logger.Debug("failed to validate peer response", + "method", method, + "peer_id", peer, + "err", err, + ) + continue + } + } return nil } diff --git a/go/worker/common/p2p/txsync/client.go b/go/worker/common/p2p/txsync/client.go new file mode 100644 index 00000000000..0312136cbe4 --- /dev/null +++ b/go/worker/common/p2p/txsync/client.go @@ -0,0 +1,63 @@ +package txsync + +import ( + "context" + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" +) + +// Client is a transaction sync protocol client. +type Client interface { + // GetTxs queries peers for transaction data. + GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, rpc.PeerFeedback, error) +} + +type client struct { + rc rpc.Client +} + +func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, rpc.PeerFeedback, error) { + // Make sure we don't request too many transactions. + if len(request.Txs) > MaxGetTxsCount { + request.Txs = request.Txs[:MaxGetTxsCount] + } + txHashMap := make(map[hash.Hash]struct{}, len(request.Txs)) + for _, txHash := range request.Txs { + txHashMap[txHash] = struct{}{} + } + + var rsp GetTxsResponse + pf, err := c.rc.Call(ctx, MethodGetTxs, request, &rsp, MaxGetTxsResponseTime, + rpc.WithValidationFn(func(pf rpc.PeerFeedback) error { + // If we received more transactions than we requested, this is an error. + if len(rsp.Txs) > len(request.Txs) { + pf.RecordFailure() + return fmt.Errorf("more transactions than requested (expected: %d got: %d)", len(request.Txs), len(rsp.Txs)) + } + + // If we received transactions that we didn't request, this is an error. + for _, tx := range rsp.Txs { + txHash := hash.NewFromBytes(tx) + if _, valid := txHashMap[txHash]; !valid { + pf.RecordFailure() + return fmt.Errorf("unsolicited transaction: %s", txHash) + } + } + return nil + }), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new transaction sync protocol client. +func NewClient(p2p rpc.P2P, runtimeID common.Namespace) Client { + return &client{ + rc: rpc.NewClient(p2p, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion), + } +} diff --git a/go/worker/common/p2p/txsync/protocol.go b/go/worker/common/p2p/txsync/protocol.go new file mode 100644 index 00000000000..2d41e5d535b --- /dev/null +++ b/go/worker/common/p2p/txsync/protocol.go @@ -0,0 +1,31 @@ +package txsync + +import ( + "time" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/version" +) + +// TxSyncProtocolID is a unique protocol identifier for the transaction sync protocol. +const TxSyncProtocolID = "txsync" + +// TxSyncProtocolVersion is the supported version of the transaction sync protocol. +var TxSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// Constants related to the GetTxs method. +const ( + MethodGetTxs = "GetTxs" + MaxGetTxsResponseTime = 5 * time.Second + MaxGetTxsCount = 128 +) + +// GetTxsRequest is a GetTxs request. +type GetTxsRequest struct { + Txs []hash.Hash `json:"txs"` +} + +// GetTxsResponse is a response to a GetTxs request. +type GetTxsResponse struct { + Txs [][]byte `json:"txs,omitempty"` +} diff --git a/go/worker/common/p2p/txsync/server.go b/go/worker/common/p2p/txsync/server.go new file mode 100644 index 00000000000..84a6e9209e1 --- /dev/null +++ b/go/worker/common/p2p/txsync/server.go @@ -0,0 +1,55 @@ +package txsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/runtime/txpool" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/rpc" +) + +type service struct { + txPool txpool.TransactionPool +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (interface{}, error) { + switch method { + case MethodGetTxs: + var rq GetTxsRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetTxs(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsResponse, error) { + var rsp GetTxsResponse + switch { + case len(request.Txs) == 0: + return &rsp, nil + case len(request.Txs) > MaxGetTxsCount: + // TODO: Could punish calling peer. + request.Txs = request.Txs[:MaxGetTxsCount] + default: + } + + txs, _ := s.txPool.GetKnownBatch(request.Txs) + rsp.Txs = make([][]byte, 0, len(txs)) + for _, tx := range txs { + if tx == nil { + continue + } + rsp.Txs = append(rsp.Txs, tx.Raw()) + } + return &rsp, nil +} + +// NewServer creates a new transaction sync protocol server. +func NewServer(runtimeID common.Namespace, txPool txpool.TransactionPool) rpc.Server { + return rpc.NewServer(runtimeID, TxSyncProtocolID, TxSyncProtocolVersion, &service{txPool}) +} diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index e34c99fb18e..e2c595301e4 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -28,6 +28,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" p2pError "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/error" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" "github.com/oasisprotocol/oasis-core/go/worker/registration" ) @@ -125,7 +126,6 @@ type Node struct { // nolint: maligned // Guarded by .commonNode.CrossNode. proposingTimeout bool - prevEpochWorker bool commonNode *committee.Node commonCfg commonWorker.Config @@ -147,6 +147,7 @@ type Node struct { // nolint: maligned roundCancelCtx context.CancelFunc storage storage.LocalBackend + txSync txsync.Client stateTransitions *pubsub.Broker // Bump this when we need to change what the worker selects over. @@ -280,19 +281,11 @@ func (n *Node) transitionLocked(state NodeState) { // Guarded by n.commonNode.CrossNode. func (n *Node) HandleEpochTransitionLocked(epoch *committee.EpochSnapshot) { switch { - case epoch.IsExecutorWorker(): - if !n.prevEpochWorker { - // Clear incoming queue and cache of any stale transactions in case - // we were not part of the compute committee in previous epoch. - n.commonNode.TxPool.Clear() - } - fallthrough - case epoch.IsExecutorBackupWorker(): + case epoch.IsExecutorWorker(), epoch.IsExecutorBackupWorker(): n.transitionLocked(StateWaitingForBatch{}) default: n.transitionLocked(StateNotReady{}) } - n.prevEpochWorker = epoch.IsExecutorWorker() } // HandleNewBlockEarlyLocked implements NodeHooks. @@ -412,26 +405,6 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { } } -func (n *Node) handleNewCheckedTransactions(txs []*transaction.CheckedTransaction) { - // Check if we are waiting for new transactions. - n.commonNode.CrossNode.Lock() - defer n.commonNode.CrossNode.Unlock() - - state, ok := n.state.(StateWaitingForTxs) - if !ok { - return - } - - for _, tx := range txs { - delete(state.batch.missingTxs, tx.Hash()) - } - if len(state.batch.missingTxs) == 0 { - // We have all transactions, signal the node to start processing the batch. - n.logger.Info("received all transactions needed for batch processing") - n.startProcessingBatchLocked(state.batch) - } -} - func (n *Node) updateBatchWeightLimits(ctx context.Context, blk *block.Block, lb *consensus.LightBlock, epoch beacon.EpochTime) error { n.limitsLastUpdateLock.Lock() defer n.limitsLastUpdateLock.Unlock() @@ -1177,6 +1150,7 @@ func (n *Node) startProcessingBatchLocked(batch *unresolvedBatch) { // Transition into StateWaitingForTxs and wait for peers to republish transactions. n.logger.Debug("some transactions are missing", "num_missing", len(batch.missingTxs)) n.transitionLocked(StateWaitingForTxs{batch}) + go n.requestMissingTransactions() return } @@ -1749,6 +1723,7 @@ func NewNode( quitCh: make(chan struct{}), initCh: make(chan struct{}), state: StateNotReady{}, + txSync: txsync.NewClient(commonNode.P2P, commonNode.Runtime.ID()), stateTransitions: pubsub.NewBroker(false), reselect: make(chan struct{}, 1), logger: logging.GetLogger("worker/executor/committee").With("runtime_id", commonNode.Runtime.ID()), diff --git a/go/worker/compute/executor/committee/transactions.go b/go/worker/compute/executor/committee/transactions.go new file mode 100644 index 00000000000..d1fd80cf024 --- /dev/null +++ b/go/worker/compute/executor/committee/transactions.go @@ -0,0 +1,102 @@ +package committee + +import ( + "context" + "fmt" + + "github.com/cenkalti/backoff/v4" + + cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/transaction" + "github.com/oasisprotocol/oasis-core/go/runtime/txpool" + "github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync" +) + +func (n *Node) handleNewCheckedTransactions(txs []*transaction.CheckedTransaction) { + // Check if we are waiting for new transactions. + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + state, ok := n.state.(StateWaitingForTxs) + if !ok { + return + } + + for _, tx := range txs { + delete(state.batch.missingTxs, tx.Hash()) + } + if len(state.batch.missingTxs) == 0 { + // We have all transactions, signal the node to start processing the batch. + n.logger.Info("received all transactions needed for batch processing") + n.startProcessingBatchLocked(state.batch) + } +} + +func (n *Node) requestMissingTransactions() { + requestOp := func() error { + // Determine what transactions are missing. + txHashes := func() []hash.Hash { + n.commonNode.CrossNode.Lock() + defer n.commonNode.CrossNode.Unlock() + + state, ok := n.state.(StateWaitingForTxs) + if !ok { + return nil + } + + txHashes := make([]hash.Hash, 0, len(state.batch.missingTxs)) + for txHash := range state.batch.missingTxs { + txHashes = append(txHashes, txHash) + } + return txHashes + }() + if len(txHashes) == 0 { + return nil + } + + txCtx, cancel := context.WithCancel(n.roundCtx) + defer cancel() + + rsp, pf, err := n.txSync.GetTxs(txCtx, &txsync.GetTxsRequest{ + Txs: txHashes, + }) + if err != nil { + n.logger.Warn("failed to request missing transactions from peers", + "err", err, + ) + return err + } + + n.logger.Debug("resolved (some) missing transactions", + "resolved", len(rsp.Txs), + "missing", len(txHashes), + ) + + // If we received at least some of the requested transactions, count as success. + if len(rsp.Txs) > 0 { + pf.RecordSuccess() + } + + // Queue all transactions in the transaction pool. + for _, tx := range rsp.Txs { + _ = n.commonNode.TxPool.SubmitTxNoWait(txCtx, tx, &txpool.TransactionMeta{Local: false}) + } + + // Check if there are still missing transactions and perform another request. + if len(txHashes) > len(rsp.Txs) { + return fmt.Errorf("need to resolve more transactions") + } + + return nil + } + + // Retry until we have resolved all transactions (or round context expires). + err := backoff.Retry(requestOp, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), n.roundCtx)) + if err != nil { + n.logger.Warn("failed to resolve missing transactions", + "err", err, + ) + return + } +} diff --git a/tests/runtimes/simple-keyvalue/src/main.rs b/tests/runtimes/simple-keyvalue/src/main.rs index e71632763aa..3c52204ddc5 100644 --- a/tests/runtimes/simple-keyvalue/src/main.rs +++ b/tests/runtimes/simple-keyvalue/src/main.rs @@ -4,11 +4,14 @@ pub mod crypto; pub mod methods; pub mod types; -use std::sync::{atomic::AtomicBool, Arc}; +use std::{ + convert::TryInto, + sync::{atomic::AtomicBool, Arc}, +}; use oasis_core_keymanager_client::KeyManagerClient; use oasis_core_runtime::{ - common::version::Version, + common::{crypto::hash::Hash, version::Version}, config::Config, consensus::{ roothash::{IncomingMessage, Message}, @@ -30,6 +33,10 @@ use simple_keymanager::trusted_policy_signers; use methods::{BlockHandler, Methods}; use types::*; +/// Maximum number of transactions in a batch. Should be less than or equal to what is set in the +/// runtime descriptor to avoid batches being rejected. +const MAX_BATCH_SIZE: usize = 100; + /// A simple context wrapper for processing test transaction batches. /// /// For a proper dispatcher see the [Oasis SDK](https://github.com/oasisprotocol/oasis-sdk). @@ -46,18 +53,21 @@ pub struct Context<'a, 'core> { pub struct TxContext<'a, 'b, 'core> { pub parent: &'a mut Context<'b, 'core>, pub tags: Tags, + + check_only: bool, } impl<'a, 'b, 'core> TxContext<'a, 'b, 'core> { - fn new(parent: &'a mut Context<'b, 'core>) -> Self { + fn new(parent: &'a mut Context<'b, 'core>, check_only: bool) -> Self { Self { parent, tags: vec![], + check_only, } } fn is_check_only(&self) -> bool { - self.parent.core.check_only + self.check_only } fn emit_message(&mut self, message: Message) -> u32 { @@ -113,6 +123,8 @@ impl Dispatcher { } fn dispatch_tx(ctx: &mut TxContext, tx: Call) -> Result { + Methods::check_nonce(ctx, tx.nonce)?; + match tx.method.as_str() { "get_runtime_id" => Self::dispatch_call(ctx, tx.args, Methods::get_runtime_id), "consensus_accounts" => Self::dispatch_call(ctx, tx.args, Methods::consensus_accounts), @@ -148,7 +160,7 @@ impl Dispatcher { message: "malformed transaction batch".to_string(), })?; - let mut tx_ctx = TxContext::new(ctx); + let mut tx_ctx = TxContext::new(ctx, false); match Self::dispatch_tx(&mut tx_ctx, tx) { Ok(result) => Ok(ExecuteTxResult { @@ -168,7 +180,7 @@ impl Dispatcher { } fn check_tx(ctx: &mut Context<'_, '_>, tx: &[u8]) -> Result { - let mut tx_ctx = TxContext::new(ctx); + let mut tx_ctx = TxContext::new(ctx, true); match Self::decode_and_dispatch_tx(&mut tx_ctx, tx) { Ok(_) => Ok(CheckTxResult::default()), @@ -269,18 +281,34 @@ impl TxnDispatcher for Dispatcher { // Execute transactions. // TODO: Actually do some batch reordering. + let mut new_batch = vec![]; let mut results = vec![]; - for tx in batch.iter() { - results.push(Self::execute_tx(&mut ctx, tx)?); + let mut tx_reject_hashes = vec![]; + for tx in batch.drain(..) { + if new_batch.len() >= MAX_BATCH_SIZE { + break; + } + + // Reject any transactions that don't pass check tx. + if Self::check_tx(&mut ctx, &tx)?.error.code != 0 { + tx_reject_hashes.push(Hash::digest_bytes(&tx)); + continue; + } + + results.push(Self::execute_tx(&mut ctx, &tx)?); + new_batch.push(tx); } + // Replace input batch with newly generated batch. + *batch = new_batch.into(); + Ok(ExecuteBatchResult { results, messages: ctx.messages, in_msgs_count: in_msgs.len(), block_tags: vec![], batch_weight_limits: None, - tx_reject_hashes: vec![], + tx_reject_hashes, }) } @@ -341,7 +369,7 @@ pub fn main_with_version(version: Version) { features: Some(Features { // Enable the schedule control feature. schedule_control: Some(FeatureScheduleControl { - initial_batch_size: 10, + initial_batch_size: MAX_BATCH_SIZE.try_into().unwrap(), }), }), ..Default::default() diff --git a/tests/runtimes/simple-keyvalue/src/methods.rs b/tests/runtimes/simple-keyvalue/src/methods.rs index ec16f169dfc..55fea35245b 100644 --- a/tests/runtimes/simple-keyvalue/src/methods.rs +++ b/tests/runtimes/simple-keyvalue/src/methods.rs @@ -48,7 +48,7 @@ impl Methods { ), String, > { - if ctx.parent.core.check_only { + if ctx.is_check_only() { return Ok((Default::default(), Default::default())); } @@ -73,7 +73,7 @@ impl Methods { Ok((result, delegations)) } - fn check_nonce(ctx: &mut TxContext, nonce: u64) -> Result<(), String> { + pub fn check_nonce(ctx: &mut TxContext, nonce: u64) -> Result<(), String> { let nonce_key = NonceKeyFormat { nonce }.encode(); match ctx .parent @@ -83,7 +83,7 @@ impl Methods { { Some(_) => Err(format!("Duplicate nonce: {}", nonce)), None => { - if !ctx.parent.core.check_only { + if !ctx.is_check_only() { ctx.parent.core.runtime_state.insert( IoContext::create_child(&ctx.parent.core.io_ctx), &nonce_key, @@ -104,10 +104,9 @@ impl Methods { /// Withdraw from the consensus layer into the runtime account. pub fn consensus_withdraw(ctx: &mut TxContext, args: Withdraw) -> Result<(), String> { - Self::check_nonce(ctx, args.nonce)?; Self::check_max_messages(ctx)?; - if ctx.parent.core.check_only { + if ctx.is_check_only() { return Ok(()); } @@ -127,7 +126,6 @@ impl Methods { /// Transfer from the runtime account to another account in the consensus layer. pub fn consensus_transfer(ctx: &mut TxContext, args: Transfer) -> Result<(), String> { - Self::check_nonce(ctx, args.nonce)?; Self::check_max_messages(ctx)?; if ctx.is_check_only() { @@ -150,7 +148,6 @@ impl Methods { /// Add escrow from the runtime account to an account in the consensus layer. pub fn consensus_add_escrow(ctx: &mut TxContext, args: AddEscrow) -> Result<(), String> { - Self::check_nonce(ctx, args.nonce)?; Self::check_max_messages(ctx)?; if ctx.is_check_only() { @@ -176,7 +173,6 @@ impl Methods { ctx: &mut TxContext, args: ReclaimEscrow, ) -> Result<(), String> { - Self::check_nonce(ctx, args.nonce)?; Self::check_max_messages(ctx)?; if ctx.is_check_only() { @@ -199,7 +195,6 @@ impl Methods { /// Update existing runtime with given descriptor. pub fn update_runtime(ctx: &mut TxContext, args: UpdateRuntime) -> Result<(), String> { - Self::check_nonce(ctx, args.nonce)?; Self::check_max_messages(ctx)?; if ctx.is_check_only() { @@ -222,8 +217,6 @@ impl Methods { /// Insert a key/value pair. pub fn insert(ctx: &mut TxContext, args: KeyValue) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if args.value.as_bytes().len() > 128 { return Err("Value too big to be inserted.".to_string()); } @@ -246,8 +239,6 @@ impl Methods { /// Retrieve a key/value pair. pub fn get(ctx: &mut TxContext, args: Key) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if ctx.is_check_only() { return Ok(None); } @@ -266,8 +257,6 @@ impl Methods { /// Remove a key/value pair. pub fn remove(ctx: &mut TxContext, args: Key) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if ctx.is_check_only() { return Ok(None); } @@ -307,8 +296,6 @@ impl Methods { /// (encrypted) Insert a key/value pair. pub fn enc_insert(ctx: &mut TxContext, args: KeyValue) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if ctx.is_check_only() { return Ok(None); } @@ -332,8 +319,6 @@ impl Methods { /// (encrypted) Retrieve a key/value pair. pub fn enc_get(ctx: &mut TxContext, args: Key) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if ctx.is_check_only() { return Ok(None); } @@ -351,8 +336,6 @@ impl Methods { /// (encrypted) Remove a key/value pair. pub fn enc_remove(ctx: &mut TxContext, args: Key) -> Result, String> { - Self::check_nonce(ctx, args.nonce)?; - if ctx.is_check_only() { return Ok(None); } diff --git a/tests/runtimes/simple-keyvalue/src/types.rs b/tests/runtimes/simple-keyvalue/src/types.rs index fb775e2bc2e..2b4ae5521c2 100644 --- a/tests/runtimes/simple-keyvalue/src/types.rs +++ b/tests/runtimes/simple-keyvalue/src/types.rs @@ -11,6 +11,8 @@ use oasis_core_runtime::{ /// Test transaction call. #[derive(Clone, Debug, cbor::Encode, cbor::Decode)] pub struct Call { + /// Nonce. + pub nonce: u64, /// Method name. pub method: String, /// Method arguments. @@ -29,44 +31,37 @@ pub enum CallOutput { #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct Key { pub key: String, - pub nonce: u64, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct KeyValue { pub key: String, pub value: String, - pub nonce: u64, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct Withdraw { - pub nonce: u64, pub withdraw: staking::Withdraw, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct Transfer { - pub nonce: u64, pub transfer: staking::Transfer, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct AddEscrow { - pub nonce: u64, pub escrow: staking::Escrow, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct ReclaimEscrow { - pub nonce: u64, pub reclaim_escrow: staking::ReclaimEscrow, } #[derive(Clone, cbor::Encode, cbor::Decode)] pub struct UpdateRuntime { pub update_runtime: registry::Runtime, - pub nonce: u64, } /// Key format used for transaction artifacts.