Skip to content

Commit

Permalink
go/runtime/client: add SubmitTxNoWait method
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Mar 4, 2021
1 parent dde272f commit 37ca8ef
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 137 deletions.
4 changes: 4 additions & 0 deletions .changelog/3444.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/runtime/client: add SubmitTxNoWait method

SubmitTxNoWait method publishes the runtime transaction and doesn't wait for
results.
20 changes: 20 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/client_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"errors"
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client/api"
runtimeTransaction "github.com/oasisprotocol/oasis-core/go/runtime/transaction"
)

// ClientExpire is the ClientExpire node scenario.
Expand Down Expand Up @@ -61,6 +64,23 @@ func (sc *clientExpireImpl) Run(childEnv *env.Env) error {
return err
}

err = nodeCtrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Method: "insert",
Args: struct {
Key string `json:"key"`
Value string `json:"value"`
}{
Key: "hello",
Value: "test",
},
}),
})
if err != nil {
return fmt.Errorf("SubmitTxNoWait expected no error, got: %b", err)
}

err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, "hello", "test")
if !errors.Is(err, api.ErrTransactionExpired) {
return fmt.Errorf("expected error: %v, got: %v", api.ErrTransactionExpired, err)
Expand Down
16 changes: 16 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/late_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ func (sc *lateStartImpl) Run(childEnv *env.Env) error {
if err != nil {
return fmt.Errorf("failed to create controller for client: %w", err)
}
err = ctrl.RuntimeClient.SubmitTxNoWait(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Method: "insert",
Args: struct {
Key string `json:"key"`
Value string `json:"value"`
}{
Key: "hello",
Value: "test",
},
}),
})
if !errors.Is(err, api.ErrNotSynced) {
return fmt.Errorf("expected error: %v, got: %v", api.ErrNotSynced, err)
}
_, err = ctrl.RuntimeClient.SubmitTx(ctx, &runtimeClient.SubmitTxRequest{
RuntimeID: runtimeID,
Data: cbor.Marshal(&runtimeTransaction.TxnCall{
Expand Down
7 changes: 6 additions & 1 deletion go/runtime/client/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ var (
type RuntimeClient interface {
enclaverpc.Transport

// SubmitTx submits a transaction to the runtime transaction scheduler.
// SubmitTx submits a transaction to the runtime transaction scheduler and waits
// for transaction execution results.
SubmitTx(ctx context.Context, request *SubmitTxRequest) ([]byte, error)

// SubmitTxNoWait submits a transaction to the runtime transaction scheduler but does
// not wait for transaction execution.
SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error

// CheckTx asks the local runtime to check the specified transaction.
CheckTx(ctx context.Context, request *CheckTxRequest) error

Expand Down
33 changes: 33 additions & 0 deletions go/runtime/client/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (

// methodSubmitTx is the SubmitTx method.
methodSubmitTx = serviceName.NewMethod("SubmitTx", SubmitTxRequest{})
// methodSubmitTxNoWait is the SubmitTxNoWait method.
methodSubmitTxNoWait = serviceName.NewMethod("SubmitTxNoWait", SubmitTxRequest{})
// methodCheckTx is the CheckTx method.
methodCheckTx = serviceName.NewMethod("CheckTx", CheckTxRequest{})
// methodGetGenesisBlock is the GetGenesisBlock method.
Expand Down Expand Up @@ -59,6 +61,10 @@ var (
MethodName: methodSubmitTx.ShortName(),
Handler: handlerSubmitTx,
},
{
MethodName: methodSubmitTxNoWait.ShortName(),
Handler: handlerSubmitTxNoWait,
},
{
MethodName: methodCheckTx.ShortName(),
Handler: handlerCheckTx,
Expand Down Expand Up @@ -141,6 +147,29 @@ func handlerSubmitTx( // nolint: golint
return interceptor(ctx, &rq, info, handler)
}

func handlerSubmitTxNoWait( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var rq SubmitTxRequest
if err := dec(&rq); err != nil {
return nil, err
}
if interceptor == nil {
return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, &rq)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodSubmitTxNoWait.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(RuntimeClient).SubmitTxNoWait(ctx, req.(*SubmitTxRequest))
}
return interceptor(ctx, &rq, info, handler)
}

func handlerCheckTx( // nolint: golint
srv interface{},
ctx context.Context,
Expand Down Expand Up @@ -506,6 +535,10 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *SubmitTxRequest)
return rsp, nil
}

func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *SubmitTxRequest) error {
return c.conn.Invoke(ctx, methodSubmitTxNoWait.FullName(), request, nil)
}

func (c *runtimeClient) CheckTx(ctx context.Context, request *CheckTxRequest) error {
return c.conn.Invoke(ctx, methodCheckTx.FullName(), request, nil)
}
Expand Down
89 changes: 41 additions & 48 deletions go/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/common/p2p"
executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api"
)

const (
Expand Down Expand Up @@ -63,9 +62,9 @@ type runtimeClient struct {
common *clientCommon
quitCh chan struct{}

hosts map[common.Namespace]*clientHost
watchers map[common.Namespace]*blockWatcher
kmClients map[common.Namespace]*keymanager.Client
hosts map[common.Namespace]*clientHost
txSubmitters map[common.Namespace]*txSubmitter
kmClients map[common.Namespace]*keymanager.Client

maxTransactionAge int64

Expand All @@ -81,8 +80,7 @@ func (c *runtimeClient) tagIndexer(runtimeID common.Namespace) (tagindexer.Query
return rt.TagIndexer(), nil
}

// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) {
func (c *runtimeClient) submitTx(ctx context.Context, request *api.SubmitTxRequest) (<-chan *txResult, error) {
if c.common.p2p == nil {
return nil, fmt.Errorf("client: cannot submit transaction, p2p disabled")
}
Expand All @@ -107,29 +105,22 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
}
}

var watcher *blockWatcher
var submitter *txSubmitter
var ok bool
var err error
c.Lock()
if watcher, ok = c.watchers[request.RuntimeID]; !ok {
watcher, err = newWatcher(c.common, request.RuntimeID, c.common.p2p, c.maxTransactionAge)
if err != nil {
c.Unlock()
return nil, err
}
if err = watcher.Start(); err != nil {
c.Unlock()
return nil, err
}
c.watchers[request.RuntimeID] = watcher
if submitter, ok = c.txSubmitters[request.RuntimeID]; !ok {
submitter = newTxSubmitter(c.common, request.RuntimeID, c.common.p2p, c.maxTransactionAge)
submitter.Start()
c.txSubmitters[request.RuntimeID] = submitter
}
c.Unlock()

// Send a request for watching a new runtime transaction.
respCh := make(chan *watchResult)
req := &watchRequest{
respCh := make(chan *txResult)
req := &txRequest{
ctx: ctx,
respCh: respCh,
req: request,
}
req.id.FromBytes(request.Data)
select {
Expand All @@ -139,12 +130,22 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
case <-c.common.ctx.Done():
// Client is shutting down.
return nil, fmt.Errorf("client: shutting down")
case watcher.newCh <- req:
case submitter.newCh <- req:
}

return respCh, nil
}

// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxRequest) ([]byte, error) {
respCh, err := c.submitTx(ctx, request)
if err != nil {
return nil, err
}

// Wait for response, handling retries if/when needed.
// Wait for result.
for {
var resp *watchResult
var resp *txResult
var ok bool

select {
Expand All @@ -158,29 +159,17 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque
if !ok {
return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)")
}

if resp.err != nil {
return nil, resp.err
}

// The main event is getting a response from the watcher, handled below. If there is
// no result yet, this means that we need to retry publish.
if resp.result == nil {
break
}

return resp.result, nil
return resp.result, resp.err
}

c.common.p2p.Publish(context.Background(), request.RuntimeID, &p2p.Message{
Tx: &executor.Tx{
Data: request.Data,
},
GroupVersion: resp.groupVersion,
})
}
}

// Implements api.RuntimeClient.
func (c *runtimeClient) SubmitTxNoWait(ctx context.Context, request *api.SubmitTxRequest) error {
_, err := c.submitTx(ctx, request)
return err
}

// Implements api.RuntimeClient.
func (c *runtimeClient) CheckTx(ctx context.Context, request *api.CheckTxRequest) error {
hrt, ok := c.hosts[request.RuntimeID]
Expand Down Expand Up @@ -561,9 +550,11 @@ func (c *runtimeClient) Start() error {
// Implements service.BackgroundService.
func (c *runtimeClient) Stop() {
// Watchers.
for _, watcher := range c.watchers {
watcher.Stop()
c.Lock()
for _, submitter := range c.txSubmitters {
submitter.Stop()
}
c.Unlock()
// Hosts.
for _, host := range c.hosts {
host.Stop()
Expand All @@ -578,9 +569,11 @@ func (c *runtimeClient) Quit() <-chan struct{} {
// Cleanup waits for all block watchers to finish.
func (c *runtimeClient) Cleanup() {
// Watchers.
for _, watcher := range c.watchers {
<-watcher.Quit()
c.Lock()
for _, submitter := range c.txSubmitters {
<-submitter.Quit()
}
c.Unlock()
}

// New returns a new runtime client instance.
Expand All @@ -606,7 +599,7 @@ func New(
},
quitCh: make(chan struct{}),
hosts: make(map[common.Namespace]*clientHost),
watchers: make(map[common.Namespace]*blockWatcher),
txSubmitters: make(map[common.Namespace]*txSubmitter),
kmClients: make(map[common.Namespace]*keymanager.Client),
maxTransactionAge: maxTransactionAge,
logger: logging.GetLogger("runtime/client"),
Expand Down
Loading

0 comments on commit 37ca8ef

Please sign in to comment.