From 4ac0542bae2bb3da16794e173e85b6d40afce0a1 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 2 Mar 2021 15:50:27 +0100 Subject: [PATCH] go/runtime/client: add SubmitTxNoWait method --- .changelog/3444.feature.md | 4 + .../scenario/e2e/runtime/client_expire.go | 20 +++++ .../scenario/e2e/runtime/late_start.go | 16 ++++ go/runtime/client/api/api.go | 7 +- go/runtime/client/api/grpc.go | 33 +++++++++ go/runtime/client/client.go | 44 +++++------ go/runtime/client/tests/tester.go | 39 ++++++++++ go/runtime/client/watcher.go | 73 +++++++++---------- 8 files changed, 174 insertions(+), 62 deletions(-) create mode 100644 .changelog/3444.feature.md diff --git a/.changelog/3444.feature.md b/.changelog/3444.feature.md new file mode 100644 index 00000000000..1f3f9d24dbc --- /dev/null +++ b/.changelog/3444.feature.md @@ -0,0 +1,4 @@ +go/runtime/client: add SubmitTxNoWait method + +SubmitTxNoWait method publishes the runtime transaction and doesn't wait for +results. diff --git a/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go index 5ffa6af5168..54cbe2e5886 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go @@ -5,10 +5,13 @@ import ( "errors" "fmt" + "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" + runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api" + runtimeTransaction "github.com/oasisprotocol/oasis-core/go/runtime/transaction" ) // ClientExpire is the ClientExpire node scenario. @@ -61,6 +64,23 @@ func (sc *clientExpireImpl) Run(childEnv *env.Env) error { return err } + err = nodeCtrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{ + RuntimeID: runtimeID, + Data: cbor.Marshal(&runtimeTransaction.TxnCall{ + Method: "insert", + Args: struct { + Key string `json:"key"` + Value string `json:"value"` + }{ + Key: "hello", + Value: "test", + }, + }), + }) + if err != nil { + return fmt.Errorf("SubmitTxNoWait expected no error, got: %b", err) + } + err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, "hello", "test") if !errors.Is(err, api.ErrTransactionExpired) { return fmt.Errorf("expected error: %v, got: %v", api.ErrTransactionExpired, err) diff --git a/go/oasis-test-runner/scenario/e2e/runtime/late_start.go b/go/oasis-test-runner/scenario/e2e/runtime/late_start.go index fd703c59247..8d618147766 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/late_start.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/late_start.go @@ -76,6 +76,22 @@ func (sc *lateStartImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to create controller for client: %w", err) } + err = ctrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{ + RuntimeID: runtimeID, + Data: cbor.Marshal(&runtimeTransaction.TxnCall{ + Method: "insert", + Args: struct { + Key string `json:"key"` + Value string `json:"value"` + }{ + Key: "hello", + Value: "test", + }, + }), + }) + if !errors.Is(err, api.ErrNotSynced) { + return fmt.Errorf("expected error: %v, got: %v", api.ErrNotSynced, err) + } _, err = ctrl.RuntimeClient.SubmitTx(ctx, &runtimeClient.SubmitTxRequest{ RuntimeID: runtimeID, Data: cbor.Marshal(&runtimeTransaction.TxnCall{ diff --git a/go/runtime/client/api/api.go b/go/runtime/client/api/api.go index c39e7b4b6d5..e2fd1353a45 100644 --- a/go/runtime/client/api/api.go +++ b/go/runtime/client/api/api.go @@ -43,9 +43,14 @@ var ( type RuntimeClient interface { enclaverpc.Transport - // SubmitTx submits a transaction to the runtime transaction scheduler. + // SubmitTx submits a transaction to the runtime transaction scheduler and waits + // for transaction execution results. SubmitTx(ctx context.Context, request *SubmitTxRequest) ([]byte, error) + // SubmitTxNoWait submits a transaction to the runtime transaction scheduler but does + // not wait for transaction execution. + SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error + // CheckTx asks the local runtime to check the specified transaction. CheckTx(ctx context.Context, request *CheckTxRequest) error diff --git a/go/runtime/client/api/grpc.go b/go/runtime/client/api/grpc.go index fb402b8792c..8d9a1e5dd96 100644 --- a/go/runtime/client/api/grpc.go +++ b/go/runtime/client/api/grpc.go @@ -22,6 +22,8 @@ var ( // methodSubmitTx is the SubmitTx method. methodSubmitTx = serviceName.NewMethod("SubmitTx", SubmitTxRequest{}) + // methodSubmitTxNoWait is the SubmitTxNoWait method. + methodSubmitTxNoWait = serviceName.NewMethod("SubmitTxNoWait", SubmitTxRequest{}) // methodCheckTx is the CheckTx method. methodCheckTx = serviceName.NewMethod("CheckTx", CheckTxRequest{}) // methodGetGenesisBlock is the GetGenesisBlock method. @@ -59,6 +61,10 @@ var ( MethodName: methodSubmitTx.ShortName(), Handler: handlerSubmitTx, }, + { + MethodName: methodSubmitTxNoWait.ShortName(), + Handler: handlerSubmitTxNoWait, + }, { MethodName: methodCheckTx.ShortName(), Handler: handlerCheckTx, @@ -141,6 +147,29 @@ func handlerSubmitTx( // nolint: golint return interceptor(ctx, &rq, info, handler) } +func handlerSubmitTxNoWait( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var rq SubmitTxRequest + if err := dec(&rq); err != nil { + return nil, err + } + if interceptor == nil { + return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, &rq) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodSubmitTxNoWait.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, req.(*SubmitTxRequest)) + } + return interceptor(ctx, &rq, info, handler) +} + func handlerCheckTx( // nolint: golint srv interface{}, ctx context.Context, @@ -506,6 +535,10 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *SubmitTxRequest) return rsp, nil } +func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error { + return c.conn.Invoke(ctx, methodSubmitTxNoWait.FullName(), request, nil) +} + func (c *runtimeClient) CheckTx(ctx context.Context, request *CheckTxRequest) error { return c.conn.Invoke(ctx, methodCheckTx.FullName(), request, nil) } diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index 7d35e0e39db..b3a89fcb660 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -28,7 +28,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" - executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" ) const ( @@ -81,8 +80,7 @@ func (c *runtimeClient) tagIndexer(runtimeID common.Namespace) (tagindexer.Query return rt.TagIndexer(), nil } -// Implements api.RuntimeClient. -func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) { +func (c *runtimeClient) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *watchResult, error) { if c.common.p2p == nil { return nil, fmt.Errorf("client: cannot submit transaction, p2p disabled") } @@ -130,6 +128,7 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque req := &watchRequest{ ctx: ctx, respCh: respCh, + req: request, } req.id.FromBytes(request.Data) select { @@ -142,7 +141,17 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque case watcher.newCh <- req: } - // Wait for response, handling retries if/when needed. + return respCh, nil +} + +// Implements api.RuntimeClient. +func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) { + respCh, err := c.submitTx(ctx, request) + if err != nil { + return nil, err + } + + // Wait for result. for { var resp *watchResult var ok bool @@ -158,27 +167,18 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque if !ok { return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)") } - - if resp.err != nil { - return nil, resp.err - } - - // The main event is getting a response from the watcher, handled below. If there is - // no result yet, this means that we need to retry publish. - if resp.result == nil { - break - } - - return resp.result, nil + return resp.result, resp.err } + } +} - c.common.p2p.Publish(context.Background(), request.RuntimeID, &p2p.Message{ - Tx: &executor.Tx{ - Data: request.Data, - }, - GroupVersion: resp.groupVersion, - }) +// Implements api.RuntimeClient. +func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error { + _, err := c.submitTx(ctx, request) + if err != nil { + return err } + return nil } // Implements api.RuntimeClient. diff --git a/go/runtime/client/tests/tester.go b/go/runtime/client/tests/tester.go index ca0d1433898..a79c426f0c0 100644 --- a/go/runtime/client/tests/tester.go +++ b/go/runtime/client/tests/tester.go @@ -39,6 +39,13 @@ func ClientImplementationTests( defer cancelFunc() testQuery(ctx, t, runtimeID, client, testInput) }) + + noWaitInput := "squid at: " + time.Now().String() + t.Run("SubmitTxNoWait", func(t *testing.T) { + ctx, cancelFunc := context.WithTimeout(context.Background(), timeout) + defer cancelFunc() + testSubmitTransactionNoWait(ctx, t, runtimeID, client, noWaitInput) + }) } func testSubmitTransaction( @@ -201,3 +208,35 @@ func testQuery( }) require.NoError(t, err, "CheckTx") } + +func testSubmitTransactionNoWait( + ctx context.Context, + t *testing.T, + runtimeID common.Namespace, + c api.RuntimeClient, + input string, +) { + // Based on SubmitTx and the mock worker. + testInput := []byte(input) + testOutput := testInput + + // Query current block. + blkLatest, err := c.GetBlock(ctx, &api.GetBlockRequest{RuntimeID: runtimeID, Round: api.RoundLatest}) + require.NoError(t, err, "GetBlock(RoundLatest)") + + // Submit a test transaction. + err = c.SubmitTxNoWait(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID}) + + // Check if everything is in order. + require.NoError(t, err, "SubmitTxNoWait") + + // Ensure transaction was executed. + err = c.WaitBlockIndexed(ctx, &api.WaitBlockIndexedRequest{RuntimeID: runtimeID, Round: blkLatest.Header.Round + 1}) + require.NoError(t, err, "WaitBlockIndexed") + + // Get transaction by latest round. + tx, err := c.GetTx(ctx, &api.GetTxRequest{RuntimeID: runtimeID, Round: api.RoundLatest, Index: 0}) + require.NoError(t, err, "GetTx(RoundLatest)") + require.EqualValues(t, testInput, tx.Input) + require.EqualValues(t, testOutput, tx.Output) +} diff --git a/go/runtime/client/watcher.go b/go/runtime/client/watcher.go index fdbfbc11830..c6921192a1c 100644 --- a/go/runtime/client/watcher.go +++ b/go/runtime/client/watcher.go @@ -12,32 +12,32 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" + executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" ) +// TODO: submitRequest type watchRequest struct { - id hash.Hash - ctx context.Context - respCh chan *watchResult - height int64 -} + id hash.Hash + ctx context.Context + req *api.SubmitTxRequest + requestHeight int64 -func (w *watchRequest) send(res *watchResult, height int64) error { - w.height = height + respCh chan<- *watchResult +} +func (w *watchRequest) result(res *watchResult) { select { case <-w.ctx.Done(): - return w.ctx.Err() case w.respCh <- res: - return nil } } type watchResult struct { - err error - result []byte - groupVersion int64 + err error + result []byte } +// TODO: rename to transaction submitter? type blockWatcher struct { service.BaseBackgroundService @@ -54,6 +54,15 @@ type blockWatcher struct { stopCh chan struct{} } +func (w *blockWatcher) submit(wr *watchRequest, groupVersion int64) { + w.common.p2p.Publish(w.common.ctx, w.id, &p2p.Message{ + Tx: &executor.Tx{ + Data: wr.req.Data, + }, + GroupVersion: groupVersion, + }) +} + func (w *blockWatcher) checkBlock(blk *block.Block) error { if blk.Header.IORoot.IsEmpty() { return nil @@ -88,12 +97,9 @@ func (w *blockWatcher) checkBlock(blk *block.Block) error { for txHash, tx := range matches { watch := w.watched[txHash] - res := &watchResult{ + watch.result(&watchResult{ result: tx.Output, - } - - // Ignore errors, the watch is getting deleted anyway. - _ = watch.send(res, 0) + }) close(watch.respCh) delete(w.watched, txHash) } @@ -196,15 +202,10 @@ func (w *blockWatcher) watch() { continue } - // Tell every client to resubmit as messages with old groupVersion - // will be discarded. - for key, watch := range w.watched { - res := &watchResult{ - groupVersion: latestGroupVersion, - } - if watch.send(res, latestHeight) != nil { - delete(w.watched, key) - } + // Resubmit all messages as messages with old groupVersion will be + // discarded. + for _, watch := range w.watched { + w.submit(watch, latestGroupVersion) } case blk := <-consensusBlocks: if blk == nil { @@ -214,32 +215,26 @@ func (w *blockWatcher) watch() { // Check if any transaction is considered expired. for key, watch := range w.watched { - if (latestHeight - w.maxTransactionAge) < watch.height { + if (latestHeight - w.maxTransactionAge) < watch.requestHeight { continue } w.Logger.Debug("expired transaction", "key", key, "latest_height", latestHeight, "max_transaction_age", w.maxTransactionAge, - "watch_height", watch.height, + "watch_height", watch.requestHeight, ) - res := &watchResult{ - err: api.ErrTransactionExpired, - } // Ignore errors, the watch is getting deleted anyway. - _ = watch.send(res, 0) + watch.result(&watchResult{ + err: api.ErrTransactionExpired, + }) close(watch.respCh) delete(w.watched, key) } case newWatch := <-w.newCh: w.watched[newWatch.id] = newWatch - - res := &watchResult{ - groupVersion: latestGroupVersion, - } - if newWatch.send(res, latestHeight) != nil { - delete(w.watched, newWatch.id) - } + newWatch.requestHeight = latestHeight + w.submit(newWatch, latestGroupVersion) case <-w.stopCh: w.Logger.Info("stop requested, aborting watcher")