From 37ca8ef401c968ec6a9c2d602703c874371b5672 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 2 Mar 2021 15:50:27 +0100 Subject: [PATCH] go/runtime/client: add SubmitTxNoWait method --- .changelog/3444.feature.md | 4 + .../scenario/e2e/runtime/client_expire.go | 20 +++ .../scenario/e2e/runtime/late_start.go | 16 ++ go/runtime/client/api/api.go | 7 +- go/runtime/client/api/grpc.go | 33 ++++ go/runtime/client/client.go | 89 +++++----- .../client/{watcher.go => submitter.go} | 168 +++++++++--------- go/runtime/client/tests/tester.go | 39 ++++ 8 files changed, 239 insertions(+), 137 deletions(-) create mode 100644 .changelog/3444.feature.md rename go/runtime/client/{watcher.go => submitter.go} (56%) diff --git a/.changelog/3444.feature.md b/.changelog/3444.feature.md new file mode 100644 index 00000000000..1f3f9d24dbc --- /dev/null +++ b/.changelog/3444.feature.md @@ -0,0 +1,4 @@ +go/runtime/client: add SubmitTxNoWait method + +SubmitTxNoWait method publishes the runtime transaction and doesn't wait for +results. diff --git a/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go index 5ffa6af5168..54cbe2e5886 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go @@ -5,10 +5,13 @@ import ( "errors" "fmt" + "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" + runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api" + runtimeTransaction "github.com/oasisprotocol/oasis-core/go/runtime/transaction" ) // ClientExpire is the ClientExpire node scenario. @@ -61,6 +64,23 @@ func (sc *clientExpireImpl) Run(childEnv *env.Env) error { return err } + err = nodeCtrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{ + RuntimeID: runtimeID, + Data: cbor.Marshal(&runtimeTransaction.TxnCall{ + Method: "insert", + Args: struct { + Key string `json:"key"` + Value string `json:"value"` + }{ + Key: "hello", + Value: "test", + }, + }), + }) + if err != nil { + return fmt.Errorf("SubmitTxNoWait expected no error, got: %b", err) + } + err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, "hello", "test") if !errors.Is(err, api.ErrTransactionExpired) { return fmt.Errorf("expected error: %v, got: %v", api.ErrTransactionExpired, err) diff --git a/go/oasis-test-runner/scenario/e2e/runtime/late_start.go b/go/oasis-test-runner/scenario/e2e/runtime/late_start.go index 6bfc1e108d4..03895b0d1bf 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/late_start.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/late_start.go @@ -78,6 +78,22 @@ func (sc *lateStartImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to create controller for client: %w", err) } + err = ctrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{ + RuntimeID: runtimeID, + Data: cbor.Marshal(&runtimeTransaction.TxnCall{ + Method: "insert", + Args: struct { + Key string `json:"key"` + Value string `json:"value"` + }{ + Key: "hello", + Value: "test", + }, + }), + }) + if !errors.Is(err, api.ErrNotSynced) { + return fmt.Errorf("expected error: %v, got: %v", api.ErrNotSynced, err) + } _, err = ctrl.RuntimeClient.SubmitTx(ctx, &runtimeClient.SubmitTxRequest{ RuntimeID: runtimeID, Data: cbor.Marshal(&runtimeTransaction.TxnCall{ diff --git a/go/runtime/client/api/api.go b/go/runtime/client/api/api.go index c39e7b4b6d5..e2fd1353a45 100644 --- a/go/runtime/client/api/api.go +++ b/go/runtime/client/api/api.go @@ -43,9 +43,14 @@ var ( type RuntimeClient interface { enclaverpc.Transport - // SubmitTx submits a transaction to the runtime transaction scheduler. + // SubmitTx submits a transaction to the runtime transaction scheduler and waits + // for transaction execution results. SubmitTx(ctx context.Context, request *SubmitTxRequest) ([]byte, error) + // SubmitTxNoWait submits a transaction to the runtime transaction scheduler but does + // not wait for transaction execution. + SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error + // CheckTx asks the local runtime to check the specified transaction. CheckTx(ctx context.Context, request *CheckTxRequest) error diff --git a/go/runtime/client/api/grpc.go b/go/runtime/client/api/grpc.go index fb402b8792c..8d9a1e5dd96 100644 --- a/go/runtime/client/api/grpc.go +++ b/go/runtime/client/api/grpc.go @@ -22,6 +22,8 @@ var ( // methodSubmitTx is the SubmitTx method. methodSubmitTx = serviceName.NewMethod("SubmitTx", SubmitTxRequest{}) + // methodSubmitTxNoWait is the SubmitTxNoWait method. + methodSubmitTxNoWait = serviceName.NewMethod("SubmitTxNoWait", SubmitTxRequest{}) // methodCheckTx is the CheckTx method. methodCheckTx = serviceName.NewMethod("CheckTx", CheckTxRequest{}) // methodGetGenesisBlock is the GetGenesisBlock method. @@ -59,6 +61,10 @@ var ( MethodName: methodSubmitTx.ShortName(), Handler: handlerSubmitTx, }, + { + MethodName: methodSubmitTxNoWait.ShortName(), + Handler: handlerSubmitTxNoWait, + }, { MethodName: methodCheckTx.ShortName(), Handler: handlerCheckTx, @@ -141,6 +147,29 @@ func handlerSubmitTx( // nolint: golint return interceptor(ctx, &rq, info, handler) } +func handlerSubmitTxNoWait( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var rq SubmitTxRequest + if err := dec(&rq); err != nil { + return nil, err + } + if interceptor == nil { + return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, &rq) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodSubmitTxNoWait.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, req.(*SubmitTxRequest)) + } + return interceptor(ctx, &rq, info, handler) +} + func handlerCheckTx( // nolint: golint srv interface{}, ctx context.Context, @@ -506,6 +535,10 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *SubmitTxRequest) return rsp, nil } +func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error { + return c.conn.Invoke(ctx, methodSubmitTxNoWait.FullName(), request, nil) +} + func (c *runtimeClient) CheckTx(ctx context.Context, request *CheckTxRequest) error { return c.conn.Invoke(ctx, methodCheckTx.FullName(), request, nil) } diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index 7d35e0e39db..83ae694ef0a 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -28,7 +28,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" - executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" ) const ( @@ -63,9 +62,9 @@ type runtimeClient struct { common *clientCommon quitCh chan struct{} - hosts map[common.Namespace]*clientHost - watchers map[common.Namespace]*blockWatcher - kmClients map[common.Namespace]*keymanager.Client + hosts map[common.Namespace]*clientHost + txSubmitters map[common.Namespace]*txSubmitter + kmClients map[common.Namespace]*keymanager.Client maxTransactionAge int64 @@ -81,8 +80,7 @@ func (c *runtimeClient) tagIndexer(runtimeID common.Namespace) (tagindexer.Query return rt.TagIndexer(), nil } -// Implements api.RuntimeClient. -func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) { +func (c *runtimeClient) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *txResult, error) { if c.common.p2p == nil { return nil, fmt.Errorf("client: cannot submit transaction, p2p disabled") } @@ -107,29 +105,22 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque } } - var watcher *blockWatcher + var submitter *txSubmitter var ok bool - var err error c.Lock() - if watcher, ok = c.watchers[request.RuntimeID]; !ok { - watcher, err = newWatcher(c.common, request.RuntimeID, c.common.p2p, c.maxTransactionAge) - if err != nil { - c.Unlock() - return nil, err - } - if err = watcher.Start(); err != nil { - c.Unlock() - return nil, err - } - c.watchers[request.RuntimeID] = watcher + if submitter, ok = c.txSubmitters[request.RuntimeID]; !ok { + submitter = newTxSubmitter(c.common, request.RuntimeID, c.common.p2p, c.maxTransactionAge) + submitter.Start() + c.txSubmitters[request.RuntimeID] = submitter } c.Unlock() // Send a request for watching a new runtime transaction. - respCh := make(chan *watchResult) - req := &watchRequest{ + respCh := make(chan *txResult) + req := &txRequest{ ctx: ctx, respCh: respCh, + req: request, } req.id.FromBytes(request.Data) select { @@ -139,12 +130,22 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque case <-c.common.ctx.Done(): // Client is shutting down. return nil, fmt.Errorf("client: shutting down") - case watcher.newCh <- req: + case submitter.newCh <- req: + } + + return respCh, nil +} + +// Implements api.RuntimeClient. +func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) { + respCh, err := c.submitTx(ctx, request) + if err != nil { + return nil, err } - // Wait for response, handling retries if/when needed. + // Wait for result. for { - var resp *watchResult + var resp *txResult var ok bool select { @@ -158,29 +159,17 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque if !ok { return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)") } - - if resp.err != nil { - return nil, resp.err - } - - // The main event is getting a response from the watcher, handled below. If there is - // no result yet, this means that we need to retry publish. - if resp.result == nil { - break - } - - return resp.result, nil + return resp.result, resp.err } - - c.common.p2p.Publish(context.Background(), request.RuntimeID, &p2p.Message{ - Tx: &executor.Tx{ - Data: request.Data, - }, - GroupVersion: resp.groupVersion, - }) } } +// Implements api.RuntimeClient. +func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error { + _, err := c.submitTx(ctx, request) + return err +} + // Implements api.RuntimeClient. func (c *runtimeClient) CheckTx(ctx context.Context, request *api.CheckTxRequest) error { hrt, ok := c.hosts[request.RuntimeID] @@ -561,9 +550,11 @@ func (c *runtimeClient) Start() error { // Implements service.BackgroundService. func (c *runtimeClient) Stop() { // Watchers. - for _, watcher := range c.watchers { - watcher.Stop() + c.Lock() + for _, submitter := range c.txSubmitters { + submitter.Stop() } + c.Unlock() // Hosts. for _, host := range c.hosts { host.Stop() @@ -578,9 +569,11 @@ func (c *runtimeClient) Quit() <-chan struct{} { // Cleanup waits for all block watchers to finish. func (c *runtimeClient) Cleanup() { // Watchers. - for _, watcher := range c.watchers { - <-watcher.Quit() + c.Lock() + for _, submitter := range c.txSubmitters { + <-submitter.Quit() } + c.Unlock() } // New returns a new runtime client instance. @@ -606,7 +599,7 @@ func New( }, quitCh: make(chan struct{}), hosts: make(map[common.Namespace]*clientHost), - watchers: make(map[common.Namespace]*blockWatcher), + txSubmitters: make(map[common.Namespace]*txSubmitter), kmClients: make(map[common.Namespace]*keymanager.Client), maxTransactionAge: maxTransactionAge, logger: logging.GetLogger("runtime/client"), diff --git a/go/runtime/client/watcher.go b/go/runtime/client/submitter.go similarity index 56% rename from go/runtime/client/watcher.go rename to go/runtime/client/submitter.go index fdbfbc11830..e7d0b01921e 100644 --- a/go/runtime/client/watcher.go +++ b/go/runtime/client/submitter.go @@ -6,61 +6,68 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/common/service" + "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" ) -type watchRequest struct { +type txRequest struct { id hash.Hash ctx context.Context - respCh chan *watchResult + req *api.SubmitTxRequest height int64 -} -func (w *watchRequest) send(res *watchResult, height int64) error { - w.height = height + respCh chan<- *txResult +} +func (w *txRequest) result(res *txResult) { select { case <-w.ctx.Done(): - return w.ctx.Err() case w.respCh <- res: - return nil } } -type watchResult struct { - err error - result []byte - groupVersion int64 +type txResult struct { + err error + result []byte } -type blockWatcher struct { - service.BaseBackgroundService +type txSubmitter struct { + logger *logging.Logger common *clientCommon id common.Namespace - watched map[hash.Hash]*watchRequest - newCh chan *watchRequest + transactions map[hash.Hash]*txRequest + newCh chan *txRequest maxTransactionAge int64 - - toBeChecked []*block.Block + toBeChecked []*block.Block stopCh chan struct{} + quitCh chan struct{} } -func (w *blockWatcher) checkBlock(blk *block.Block) error { +func (w *txSubmitter) publishTx(tx *txRequest, groupVersion int64) { + w.common.p2p.Publish(w.common.ctx, w.id, &p2p.Message{ + Tx: &executor.Tx{ + Data: tx.req.Data, + }, + GroupVersion: groupVersion, + }) +} + +func (w *txSubmitter) checkBlock(blk *block.Block) error { if blk.Header.IORoot.IsEmpty() { return nil } // If there's no pending transactions, we can skip the check. - if len(w.watched) == 0 { + if len(w.transactions) == 0 { return nil } @@ -77,7 +84,7 @@ func (w *blockWatcher) checkBlock(blk *block.Block) error { // Check if there's anything interesting in this block. var txHashes []hash.Hash - for txHash := range w.watched { + for txHash := range w.transactions { txHashes = append(txHashes, txHash) } @@ -87,21 +94,18 @@ func (w *blockWatcher) checkBlock(blk *block.Block) error { } for txHash, tx := range matches { - watch := w.watched[txHash] - res := &watchResult{ + txReq := w.transactions[txHash] + txReq.result(&txResult{ result: tx.Output, - } - - // Ignore errors, the watch is getting deleted anyway. - _ = watch.send(res, 0) - close(watch.respCh) - delete(w.watched, txHash) + }) + close(txReq.respCh) + delete(w.transactions, txHash) } return nil } -func (w *blockWatcher) getGroupVersion(height int64) (int64, error) { +func (w *txSubmitter) getGroupVersion(height int64) (int64, error) { epoch, err := w.common.consensus.Beacon().GetEpoch(w.common.ctx, height) if err != nil { return 0, fmt.Errorf("failed querying for epoch: %w", err) @@ -109,18 +113,18 @@ func (w *blockWatcher) getGroupVersion(height int64) (int64, error) { return w.common.consensus.Beacon().GetEpochBlock(w.common.ctx, epoch) } -func (w *blockWatcher) watch() { +func (w *txSubmitter) start() { defer func() { - for _, watch := range w.watched { - close(watch.respCh) + for _, txReq := range w.transactions { + close(txReq.respCh) } - w.BaseBackgroundService.Stop() + close(w.quitCh) }() // Start watching roothash blocks. blocks, blocksSub, err := w.common.consensus.RootHash().WatchBlocks(w.id) if err != nil { - w.Logger.Error("failed to subscribe to roothash blocks", + w.logger.Error("failed to subscribe to roothash blocks", "err", err, ) return @@ -130,7 +134,7 @@ func (w *blockWatcher) watch() { // Start watching consensus blocks. consensusBlocks, consensusBlocksSub, err := w.common.consensus.WatchBlocks(w.common.ctx) if err != nil { - w.Logger.Error("failed to subscribe to consensus blocks", + w.logger.Error("failed to subscribe to consensus blocks", "err", err, ) return @@ -151,7 +155,7 @@ func (w *blockWatcher) watch() { latestHeight = blk.Height latestGroupVersion, err = w.getGroupVersion(blk.Height) if err != nil { - w.Logger.Error("failed querying for latest group version", + w.logger.Error("failed querying for latest group version", "err", err, ) return @@ -167,7 +171,7 @@ func (w *blockWatcher) watch() { var failedBlocks []*block.Block for _, b := range w.toBeChecked { if err = w.checkBlock(b); err != nil { - w.Logger.Error("error checking block", + w.logger.Error("error checking block", "err", err, "round", b.Header.Round, ) @@ -175,7 +179,7 @@ func (w *blockWatcher) watch() { } } if len(failedBlocks) > 0 { - w.Logger.Warn("failed roothash blocks", + w.logger.Warn("failed roothash blocks", "num_failed_blocks", len(failedBlocks), ) } @@ -190,21 +194,16 @@ func (w *blockWatcher) watch() { // Get group version. latestGroupVersion, err = w.getGroupVersion(blk.Height) if err != nil { - w.Logger.Error("failed querying for latest group version", + w.logger.Error("failed querying for latest group version", "err", err, ) continue } - // Tell every client to resubmit as messages with old groupVersion - // will be discarded. - for key, watch := range w.watched { - res := &watchResult{ - groupVersion: latestGroupVersion, - } - if watch.send(res, latestHeight) != nil { - delete(w.watched, key) - } + // Republish all transactions as messages with old groupVersion will + // be discarded. + for _, req := range w.transactions { + w.publishTx(req, latestGroupVersion) } case blk := <-consensusBlocks: if blk == nil { @@ -213,68 +212,61 @@ func (w *blockWatcher) watch() { latestHeight = blk.Height // Check if any transaction is considered expired. - for key, watch := range w.watched { - if (latestHeight - w.maxTransactionAge) < watch.height { + for key, req := range w.transactions { + if (latestHeight - w.maxTransactionAge) < req.height { continue } - w.Logger.Debug("expired transaction", + w.logger.Debug("expired transaction", "key", key, "latest_height", latestHeight, "max_transaction_age", w.maxTransactionAge, - "watch_height", watch.height, + "initial_height", req.height, ) - res := &watchResult{ + req.result(&txResult{ err: api.ErrTransactionExpired, - } - // Ignore errors, the watch is getting deleted anyway. - _ = watch.send(res, 0) - close(watch.respCh) - delete(w.watched, key) - } - case newWatch := <-w.newCh: - w.watched[newWatch.id] = newWatch - - res := &watchResult{ - groupVersion: latestGroupVersion, - } - if newWatch.send(res, latestHeight) != nil { - delete(w.watched, newWatch.id) + }) + close(req.respCh) + delete(w.transactions, key) } - + case newRequest := <-w.newCh: + w.transactions[newRequest.id] = newRequest + newRequest.height = latestHeight + w.publishTx(newRequest, latestGroupVersion) case <-w.stopCh: - w.Logger.Info("stop requested, aborting watcher") + w.logger.Info("stop requested, aborting watcher") return case <-w.common.ctx.Done(): - w.Logger.Info("context cancelled, aborting watcher") + w.logger.Info("context cancelled, aborting watcher") return } } } -// Start starts a new per-runtime block watcher. -func (w *blockWatcher) Start() error { - go w.watch() - return nil +func (w *txSubmitter) Quit() <-chan struct{} { + return w.quitCh +} + +func (w *txSubmitter) Start() { + go w.start() } -// Stop initiates watcher shutdown. -func (w *blockWatcher) Stop() { +func (w *txSubmitter) Stop() { close(w.stopCh) } -func newWatcher(common *clientCommon, id common.Namespace, p2pSvc *p2p.P2P, maxTransactionAge int64) (*blockWatcher, error) { +func newTxSubmitter(common *clientCommon, id common.Namespace, p2pSvc *p2p.P2P, maxTransactionAge int64) *txSubmitter { // Register handler. p2pSvc.RegisterHandler(id, &p2p.BaseHandler{}) - svc := service.NewBaseBackgroundService("client/watcher") - watcher := &blockWatcher{ - BaseBackgroundService: *svc, - common: common, - id: id, - maxTransactionAge: maxTransactionAge, - watched: make(map[hash.Hash]*watchRequest), - newCh: make(chan *watchRequest), - stopCh: make(chan struct{}), + txSubmitter := &txSubmitter{ + logger: logging.GetLogger("client/txsubmitter"), + common: common, + id: id, + maxTransactionAge: maxTransactionAge, + transactions: make(map[hash.Hash]*txRequest), + newCh: make(chan *txRequest), + stopCh: make(chan struct{}), + quitCh: make(chan struct{}), } - return watcher, nil + return txSubmitter } diff --git a/go/runtime/client/tests/tester.go b/go/runtime/client/tests/tester.go index ca0d1433898..a79c426f0c0 100644 --- a/go/runtime/client/tests/tester.go +++ b/go/runtime/client/tests/tester.go @@ -39,6 +39,13 @@ func ClientImplementationTests( defer cancelFunc() testQuery(ctx, t, runtimeID, client, testInput) }) + + noWaitInput := "squid at: " + time.Now().String() + t.Run("SubmitTxNoWait", func(t *testing.T) { + ctx, cancelFunc := context.WithTimeout(context.Background(), timeout) + defer cancelFunc() + testSubmitTransactionNoWait(ctx, t, runtimeID, client, noWaitInput) + }) } func testSubmitTransaction( @@ -201,3 +208,35 @@ func testQuery( }) require.NoError(t, err, "CheckTx") } + +func testSubmitTransactionNoWait( + ctx context.Context, + t *testing.T, + runtimeID common.Namespace, + c api.RuntimeClient, + input string, +) { + // Based on SubmitTx and the mock worker. + testInput := []byte(input) + testOutput := testInput + + // Query current block. + blkLatest, err := c.GetBlock(ctx, &api.GetBlockRequest{RuntimeID: runtimeID, Round: api.RoundLatest}) + require.NoError(t, err, "GetBlock(RoundLatest)") + + // Submit a test transaction. + err = c.SubmitTxNoWait(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID}) + + // Check if everything is in order. + require.NoError(t, err, "SubmitTxNoWait") + + // Ensure transaction was executed. + err = c.WaitBlockIndexed(ctx, &api.WaitBlockIndexedRequest{RuntimeID: runtimeID, Round: blkLatest.Header.Round + 1}) + require.NoError(t, err, "WaitBlockIndexed") + + // Get transaction by latest round. + tx, err := c.GetTx(ctx, &api.GetTxRequest{RuntimeID: runtimeID, Round: api.RoundLatest, Index: 0}) + require.NoError(t, err, "GetTx(RoundLatest)") + require.EqualValues(t, testInput, tx.Input) + require.EqualValues(t, testOutput, tx.Output) +}