Skip to content

Commit

Permalink
go/runtime/client: add SubmitTxNoWait method
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Mar 2, 2021
1 parent 917b9aa commit 4ac0542
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 62 deletions.
4 changes: 4 additions & 0 deletions .changelog/3444.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/runtime/client: add SubmitTxNoWait method

SubmitTxNoWait method publishes the runtime transaction and doesn't wait for
results.
20 changes: 20 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/client_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"errors"
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api"
runtimeTransaction "github.com/oasisprotocol/oasis-core/go/runtime/transaction"
)

// ClientExpire is the ClientExpire node scenario.
Expand Down Expand Up @@ -61,6 +64,23 @@ func (sc *clientExpireImpl) Run(childEnv *env.Env) error {
return err
}

err = nodeCtrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Method: "insert",
Args: struct {
Key string `json:"key"`
Value string `json:"value"`
}{
Key: "hello",
Value: "test",
},
}),
})
if err != nil {
return fmt.Errorf("SubmitTxNoWait expected no error, got: %b", err)
}

err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, "hello", "test")
if !errors.Is(err, api.ErrTransactionExpired) {
return fmt.Errorf("expected error: %v, got: %v", api.ErrTransactionExpired, err)
Expand Down
16 changes: 16 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/late_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func (sc *lateStartImpl) Run(childEnv *env.Env) error {
if err != nil {
return fmt.Errorf("failed to create controller for client: %w", err)
}
err = ctrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Method: "insert",
Args: struct {
Key string `json:"key"`
Value string `json:"value"`
}{
Key: "hello",
Value: "test",
},
}),
})
if !errors.Is(err, api.ErrNotSynced) {
return fmt.Errorf("expected error: %v, got: %v", api.ErrNotSynced, err)
}
_, err = ctrl.RuntimeClient.SubmitTx(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Expand Down
7 changes: 6 additions & 1 deletion go/runtime/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ var (
type RuntimeClient interface {
enclaverpc.Transport

// SubmitTx submits a transaction to the runtime transaction scheduler.
// SubmitTx submits a transaction to the runtime transaction scheduler and waits
// for transaction execution results.
SubmitTx(ctx context.Context, request *SubmitTxRequest) ([]byte, error)

// SubmitTxNoWait submits a transaction to the runtime transaction scheduler but does
// not wait for transaction execution.
SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error

// CheckTx asks the local runtime to check the specified transaction.
CheckTx(ctx context.Context, request *CheckTxRequest) error

Expand Down
33 changes: 33 additions & 0 deletions go/runtime/client/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (

// methodSubmitTx is the SubmitTx method.
methodSubmitTx = serviceName.NewMethod("SubmitTx", SubmitTxRequest{})
// methodSubmitTxNoWait is the SubmitTxNoWait method.
methodSubmitTxNoWait = serviceName.NewMethod("SubmitTxNoWait", SubmitTxRequest{})
// methodCheckTx is the CheckTx method.
methodCheckTx = serviceName.NewMethod("CheckTx", CheckTxRequest{})
// methodGetGenesisBlock is the GetGenesisBlock method.
Expand Down Expand Up @@ -59,6 +61,10 @@ var (
MethodName: methodSubmitTx.ShortName(),
Handler: handlerSubmitTx,
},
{
MethodName: methodSubmitTxNoWait.ShortName(),
Handler: handlerSubmitTxNoWait,
},
{
MethodName: methodCheckTx.ShortName(),
Handler: handlerCheckTx,
Expand Down Expand Up @@ -141,6 +147,29 @@ func handlerSubmitTx( // nolint: golint
return interceptor(ctx, &rq, info, handler)
}

func handlerSubmitTxNoWait( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var rq SubmitTxRequest
if err := dec(&rq); err != nil {
return nil, err
}
if interceptor == nil {
return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, &rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodSubmitTxNoWait.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, req.(*SubmitTxRequest))
}
return interceptor(ctx, &rq, info, handler)
}

func handlerCheckTx( // nolint: golint
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -506,6 +535,10 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *SubmitTxRequest)
return rsp, nil
}

func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error {
return c.conn.Invoke(ctx, methodSubmitTxNoWait.FullName(), request, nil)
}

func (c *runtimeClient) CheckTx(ctx context.Context, request *CheckTxRequest) error {
return c.conn.Invoke(ctx, methodCheckTx.FullName(), request, nil)
}
Expand Down
44 changes: 22 additions & 22 deletions go/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/common/p2p"
executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api"
)

const (
Expand Down Expand Up @@ -81,8 +80,7 @@ func (c *runtimeClient) tagIndexer(runtimeID common.Namespace) (tagindexer.Query
return rt.TagIndexer(), nil
}

// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) {
func (c *runtimeClient) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *watchResult, error) {
if c.common.p2p == nil {
return nil, fmt.Errorf("client: cannot submit transaction, p2p disabled")
}
Expand Down Expand Up @@ -130,6 +128,7 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
req := &watchRequest{
ctx: ctx,
respCh: respCh,
req: request,
}
req.id.FromBytes(request.Data)
select {
Expand All @@ -142,7 +141,17 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
case watcher.newCh <- req:
}

// Wait for response, handling retries if/when needed.
return respCh, nil
}

// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) {
respCh, err := c.submitTx(ctx, request)
if err != nil {
return nil, err
}

// Wait for result.
for {
var resp *watchResult
var ok bool
Expand All @@ -158,27 +167,18 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
if !ok {
return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)")
}

if resp.err != nil {
return nil, resp.err
}

// The main event is getting a response from the watcher, handled below. If there is
// no result yet, this means that we need to retry publish.
if resp.result == nil {
break
}

return resp.result, nil
return resp.result, resp.err
}
}
}

c.common.p2p.Publish(context.Background(), request.RuntimeID, &p2p.Message{
Tx: &executor.Tx{
Data: request.Data,
},
GroupVersion: resp.groupVersion,
})
// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error {
_, err := c.submitTx(ctx, request)
if err != nil {
return err
}
return nil
}

// Implements api.RuntimeClient.
Expand Down
39 changes: 39 additions & 0 deletions go/runtime/client/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func ClientImplementationTests(
defer cancelFunc()
testQuery(ctx, t, runtimeID, client, testInput)
})

noWaitInput := "squid at: " + time.Now().String()
t.Run("SubmitTxNoWait", func(t *testing.T) {
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
testSubmitTransactionNoWait(ctx, t, runtimeID, client, noWaitInput)
})
}

func testSubmitTransaction(
Expand Down Expand Up @@ -201,3 +208,35 @@ func testQuery(
})
require.NoError(t, err, "CheckTx")
}

func testSubmitTransactionNoWait(
ctx context.Context,
t *testing.T,
runtimeID common.Namespace,
c api.RuntimeClient,
input string,
) {
// Based on SubmitTx and the mock worker.
testInput := []byte(input)
testOutput := testInput

// Query current block.
blkLatest, err := c.GetBlock(ctx, &api.GetBlockRequest{RuntimeID: runtimeID, Round: api.RoundLatest})
require.NoError(t, err, "GetBlock(RoundLatest)")

// Submit a test transaction.
err = c.SubmitTxNoWait(ctx, &api.SubmitTxRequest{Data: testInput, RuntimeID: runtimeID})

// Check if everything is in order.
require.NoError(t, err, "SubmitTxNoWait")

// Ensure transaction was executed.
err = c.WaitBlockIndexed(ctx, &api.WaitBlockIndexedRequest{RuntimeID: runtimeID, Round: blkLatest.Header.Round + 1})
require.NoError(t, err, "WaitBlockIndexed")

// Get transaction by latest round.
tx, err := c.GetTx(ctx, &api.GetTxRequest{RuntimeID: runtimeID, Round: api.RoundLatest, Index: 0})
require.NoError(t, err, "GetTx(RoundLatest)")
require.EqualValues(t, testInput, tx.Input)
require.EqualValues(t, testOutput, tx.Output)
}
Loading

0 comments on commit 4ac0542

Please sign in to comment.