From 8a38e350327a6137ec2bb2a6b1d308009152b632 Mon Sep 17 00:00:00 2001 From: ptrus Date: Fri, 23 Oct 2020 13:53:04 +0200 Subject: [PATCH] go/runtime/client: Recheck failed blocks and implement max tx age --- .changelog/3412.bugfix.md | 1 + .changelog/3443.bugfix.md | 5 + .changelog/3443.cfg.md | 5 + go/oasis-node/cmd/node/node.go | 1 + go/oasis-test-runner/oasis/args.go | 8 + go/oasis-test-runner/oasis/client.go | 13 +- go/oasis-test-runner/oasis/fixture.go | 4 + .../scenario/e2e/runtime/client_expire.go | 70 +++++++++ .../scenario/e2e/runtime/runtime.go | 2 + go/runtime/client/api/api.go | 2 + go/runtime/client/client.go | 41 ++++- go/runtime/client/watcher.go | 143 ++++++++++++++---- 12 files changed, 256 insertions(+), 39 deletions(-) create mode 100644 .changelog/3412.bugfix.md create mode 100644 .changelog/3443.bugfix.md create mode 100644 .changelog/3443.cfg.md create mode 100644 go/oasis-test-runner/scenario/e2e/runtime/client_expire.go diff --git a/.changelog/3412.bugfix.md b/.changelog/3412.bugfix.md new file mode 100644 index 00000000000..7b169c8bc0c --- /dev/null +++ b/.changelog/3412.bugfix.md @@ -0,0 +1 @@ +go/runtime/client: Runtime client should retry processing any failed blocks diff --git a/.changelog/3443.bugfix.md b/.changelog/3443.bugfix.md new file mode 100644 index 00000000000..032a7f8ed6e --- /dev/null +++ b/.changelog/3443.bugfix.md @@ -0,0 +1,5 @@ +go/runtime/client: Wait for initial consensus block and group version + +Before, the runtime client would publish invalid messages before obtaining the +initial group version. The messages were correctly retired upon receiving the +group version, but this resulted in needless messages. diff --git a/.changelog/3443.cfg.md b/.changelog/3443.cfg.md new file mode 100644 index 00000000000..db60f98f4c0 --- /dev/null +++ b/.changelog/3443.cfg.md @@ -0,0 +1,5 @@ +go/runtime/client: Add max transaction age + +Added `runtime.client.max_transaction_age` flag to configure number of +consensus blocks after which a submitted runtime transaction is considered +expired. Expired transactions are dropped by the client. diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 313648c1659..9efd627a074 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -774,6 +774,7 @@ func init() { compute.Flags, p2p.Flags, registration.Flags, + runtimeClient.Flags, executor.Flags, workerCommon.Flags, workerStorage.Flags, diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index 2d24bbd0c2c..ea87bed5a8f 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -25,6 +25,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/grpc" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/metrics" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/debug/byzantine" + runtimeClient "github.com/oasisprotocol/oasis-core/go/runtime/client" runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" @@ -227,6 +228,13 @@ func (args *argBuilder) runtimeTagIndexerBackend(backend string) *argBuilder { return args } +func (args *argBuilder) runtimeClientMaxTransactionAge(maxTxAge int64) *argBuilder { + args.vec = append(args.vec, []string{ + "--" + runtimeClient.CfgMaxTransactionAge, strconv.Itoa(int(maxTxAge)), + }...) + return args +} + func (args *argBuilder) workerClientPort(port uint16) *argBuilder { args.vec = append(args.vec, []string{ "--" + workerCommon.CfgClientPort, strconv.Itoa(int(port)), diff --git a/go/oasis-test-runner/oasis/client.go b/go/oasis-test-runner/oasis/client.go index 2a7302bee0b..eb0a00afea9 100644 --- a/go/oasis-test-runner/oasis/client.go +++ b/go/oasis-test-runner/oasis/client.go @@ -10,6 +10,8 @@ import ( type Client struct { Node + maxTransactionAge int64 + consensusPort uint16 p2pPort uint16 } @@ -17,6 +19,8 @@ type Client struct { // ClientCfg is the Oasis client node provisioning configuration. type ClientCfg struct { NodeCfg + + MaxTransactionAge int64 } func (client *Client) startNode() error { @@ -33,6 +37,10 @@ func (client *Client) startNode() error { workerP2pEnabled(). runtimeTagIndexerBackend("bleve") + if client.maxTransactionAge != 0 { + args = args.runtimeClientMaxTransactionAge(client.maxTransactionAge) + } + for _, v := range client.net.runtimes { if v.kind != registry.KindCompute { continue @@ -73,8 +81,9 @@ func (net *Network) NewClient(cfg *ClientCfg) (*Client, error) { dir: clientDir, consensus: cfg.Consensus, }, - consensusPort: net.nextNodePort, - p2pPort: net.nextNodePort + 1, + maxTransactionAge: cfg.MaxTransactionAge, + consensusPort: net.nextNodePort, + p2pPort: net.nextNodePort + 1, } client.doStartNode = client.startNode diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index c5e6e6cc516..9eff989126d 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -458,6 +458,9 @@ func (f *SentryFixture) Create(net *Network) (*Sentry, error) { type ClientFixture struct { // Consensus contains configuration for the consensus backend. Consensus ConsensusFixture `json:"consensus"` + + // MaxTransactionAge configures the MaxTransactionAge configuration of the client. + MaxTransactionAge int64 `json:"max_transaction_age"` } // Create instantiates the client node described by the fixture. @@ -466,6 +469,7 @@ func (f *ClientFixture) Create(net *Network) (*Client, error) { NodeCfg: NodeCfg{ Consensus: f.Consensus, }, + MaxTransactionAge: f.MaxTransactionAge, }) } diff --git a/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go new file mode 100644 index 00000000000..5ffa6af5168 --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/runtime/client_expire.go @@ -0,0 +1,70 @@ +package runtime + +import ( + "context" + "errors" + "fmt" + + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario" + "github.com/oasisprotocol/oasis-core/go/runtime/client/api" +) + +// ClientExpire is the ClientExpire node scenario. +var ClientExpire scenario.Scenario = newClientExpireImpl("client-expire", "simple-keyvalue-client", nil) + +type clientExpireImpl struct { + runtimeImpl +} + +func newClientExpireImpl(name, clientBinary string, clientArgs []string) scenario.Scenario { + return &clientExpireImpl{ + runtimeImpl: *newRuntimeImpl(name, clientBinary, clientArgs), + } +} + +func (sc *clientExpireImpl) Clone() scenario.Scenario { + return &clientExpireImpl{ + runtimeImpl: *sc.runtimeImpl.Clone().(*runtimeImpl), + } +} + +func (sc *clientExpireImpl) Fixture() (*oasis.NetworkFixture, error) { + f, err := sc.runtimeImpl.Fixture() + if err != nil { + return nil, err + } + + // Make client expire all transactions instantly. + f.Clients[0].MaxTransactionAge = 1 + + return f, nil +} + +func (sc *clientExpireImpl) Run(childEnv *env.Env) error { + ctx := context.Background() + + // Start the network. + var err error + if err = sc.Net.Start(); err != nil { + return err + } + + // Wait for client to be ready. + client := sc.Net.Clients()[0] + nodeCtrl, err := oasis.NewController(client.SocketPath()) + if err != nil { + return err + } + if err = nodeCtrl.WaitReady(ctx); err != nil { + return err + } + + err = sc.submitKeyValueRuntimeInsertTx(ctx, runtimeID, "hello", "test") + if !errors.Is(err, api.ErrTransactionExpired) { + return fmt.Errorf("expected error: %v, got: %v", api.ErrTransactionExpired, err) + } + + return nil +} diff --git a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go index 35b0d452fb0..0f60454f60d 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/runtime.go @@ -532,6 +532,8 @@ func RegisterScenarios() error { RuntimeDynamic, // Transaction source test. TxSourceMultiShort, + // ClientExpire test. + ClientExpire, // Late start test. LateStart, // KeymanagerUpgrade test. diff --git a/go/runtime/client/api/api.go b/go/runtime/client/api/api.go index ebf691f1cfc..d4302e8be33 100644 --- a/go/runtime/client/api/api.go +++ b/go/runtime/client/api/api.go @@ -26,6 +26,8 @@ var ( ErrNotFound = errors.New(ModuleName, 1, "client: not found") // ErrInternal is an error returned when an unspecified internal error occurs. ErrInternal = errors.New(ModuleName, 2, "client: internal error") + // ErrTransactionExpired is an error returned when transaction expired. + ErrTransactionExpired = errors.New(ModuleName, 3, "client: transaction expired") ) // RuntimeClient is the runtime client interface. diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index 316ef3bd10a..a313f9abeb1 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -6,6 +6,9 @@ import ( "fmt" "sync" + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -13,6 +16,7 @@ import ( consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" keymanagerAPI "github.com/oasisprotocol/oasis-core/go/keymanager/api" keymanager "github.com/oasisprotocol/oasis-core/go/keymanager/client" + cmdFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" @@ -25,9 +29,20 @@ import ( executor "github.com/oasisprotocol/oasis-core/go/worker/compute/executor/api" ) +const ( + // CfgMaxTransactionAge is the number of consensus blocks after which + // submitted transactions will be considered expired. + CfgMaxTransactionAge = "runtime.client.max_transaction_age" + + minMaxTransactionAge = 30 +) + var ( _ api.RuntimeClient = (*runtimeClient)(nil) _ enclaverpc.Transport = (*runtimeClient)(nil) + + // Flags has the flags used by the runtime client. + Flags = flag.NewFlagSet("", flag.ContinueOnError) ) type clientCommon struct { @@ -48,6 +63,8 @@ type runtimeClient struct { watchers map[common.Namespace]*blockWatcher kmClients map[common.Namespace]*keymanager.Client + maxTransactionAge int64 + logger *logging.Logger } @@ -71,7 +88,7 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque var err error c.Lock() if watcher, ok = c.watchers[request.RuntimeID]; !ok { - watcher, err = newWatcher(c.common, request.RuntimeID, c.common.p2p) + watcher, err = newWatcher(c.common, request.RuntimeID, c.common.p2p, c.maxTransactionAge) if err != nil { c.Unlock() return nil, err @@ -118,6 +135,10 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)") } + if resp.err != nil { + return nil, resp.err + } + // The main event is getting a response from the watcher, handled below. If there is // no result yet, this means that we need to retry publish. if resp.result == nil { @@ -428,6 +449,11 @@ func New( runtimeRegistry runtimeRegistry.Registry, p2p *p2p.P2P, ) (api.RuntimeClient, error) { + maxTransactionAge := viper.GetInt64(CfgMaxTransactionAge) + if maxTransactionAge < minMaxTransactionAge && !cmdFlags.DebugDontBlameOasis() { + return nil, fmt.Errorf("max transaction age too low: %d, minimum: %d", maxTransactionAge, minMaxTransactionAge) + } + c := &runtimeClient{ common: &clientCommon{ storage: runtimeRegistry.StorageRouter(), @@ -436,9 +462,16 @@ func New( ctx: ctx, p2p: p2p, }, - watchers: make(map[common.Namespace]*blockWatcher), - kmClients: make(map[common.Namespace]*keymanager.Client), - logger: logging.GetLogger("runtime/client"), + watchers: make(map[common.Namespace]*blockWatcher), + kmClients: make(map[common.Namespace]*keymanager.Client), + maxTransactionAge: maxTransactionAge, + logger: logging.GetLogger("runtime/client"), } return c, nil } + +func init() { + Flags.Int64(CfgMaxTransactionAge, 1500, "number of consensus blocks after which submitted transactions will be considered expired") + + _ = viper.BindPFlags(Flags) +} diff --git a/go/runtime/client/watcher.go b/go/runtime/client/watcher.go index def3aa48a9d..125a8a0bcc3 100644 --- a/go/runtime/client/watcher.go +++ b/go/runtime/client/watcher.go @@ -2,12 +2,13 @@ package client import ( "context" + "fmt" "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/service" - "github.com/oasisprotocol/oasis-core/go/epochtime/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + "github.com/oasisprotocol/oasis-core/go/runtime/client/api" "github.com/oasisprotocol/oasis-core/go/runtime/transaction" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/common/p2p" @@ -17,9 +18,12 @@ type watchRequest struct { id hash.Hash ctx context.Context respCh chan *watchResult + height int64 } -func (w *watchRequest) send(res *watchResult) error { +func (w *watchRequest) send(res *watchResult, height int64) error { + w.height = height + select { case <-w.ctx.Done(): return w.ctx.Err() @@ -29,6 +33,7 @@ func (w *watchRequest) send(res *watchResult) error { } type watchResult struct { + err error result []byte groupVersion int64 } @@ -42,12 +47,21 @@ type blockWatcher struct { watched map[hash.Hash]*watchRequest newCh chan *watchRequest + maxTransactionAge int64 + + toBeChecked []*block.Block + stopCh chan struct{} } -func (w *blockWatcher) checkBlock(blk *block.Block) { +func (w *blockWatcher) checkBlock(blk *block.Block) error { if blk.Header.IORoot.IsEmpty() { - return + return nil + } + + // If there's no pending transactions, we can skip the check. + if len(w.watched) == 0 { + return nil } ctx := w.common.ctx @@ -68,8 +82,7 @@ func (w *blockWatcher) checkBlock(blk *block.Block) { matches, err := tree.GetTransactionMultiple(ctx, txHashes) if err != nil { - w.Logger.Error("can't get block I/O from storage", "err", err) - return + return fmt.Errorf("error getting block I/O from storage: %w", err) } for txHash, tx := range matches { @@ -79,10 +92,20 @@ func (w *blockWatcher) checkBlock(blk *block.Block) { } // Ignore errors, the watch is getting deleted anyway. - _ = watch.send(res) + _ = watch.send(res, 0) close(watch.respCh) delete(w.watched, txHash) } + + return nil +} + +func (w *blockWatcher) getGroupVersion(height int64) (int64, error) { + epoch, err := w.common.consensus.EpochTime().GetEpoch(w.common.ctx, height) + if err != nil { + return 0, fmt.Errorf("failed querying for epoch: %w", err) + } + return w.common.consensus.EpochTime().GetEpochBlock(w.common.ctx, epoch) } func (w *blockWatcher) watch() { @@ -103,44 +126,74 @@ func (w *blockWatcher) watch() { } defer blocksSub.Close() - // If we were just started, refresh the committee information from any - // block, otherwise just from epoch transition blocks. - var gotInitialCommittee bool + // Start watching consensus blocks. + consensusBlocks, consensusBlocksSub, err := w.common.consensus.WatchBlocks(w.common.ctx) + if err != nil { + w.Logger.Error("failed to subscribe to consensus blocks", + "err", err, + ) + return + } + defer consensusBlocksSub.Close() + + // latestHeight contains the latest known consensus block height. + var latestHeight int64 // latestGroupVersion contains the latest known committee group version. var latestGroupVersion int64 + // Wait for first consensus block before proceeding. + select { + case <-w.stopCh: + return + case <-w.common.ctx.Done(): + return + case blk := <-consensusBlocks: + latestHeight = blk.Height + latestGroupVersion, err = w.getGroupVersion(blk.Height) + if err != nil { + w.Logger.Error("failed querying for latest group version", + "err", err, + ) + return + } + } + for { // Wait for stuff to happen. select { case blk := <-blocks: - // Check block. - w.checkBlock(blk.Block) + w.toBeChecked = append(w.toBeChecked, blk.Block) - // If this is the initial block or an epoch transition block, - // update latest known group version and resend all transactions. - if gotInitialCommittee && blk.Block.Header.HeaderType != block.EpochTransition { - continue + var failedBlocks []*block.Block + for _, b := range w.toBeChecked { + if err = w.checkBlock(b); err != nil { + w.Logger.Error("error checking block", + "err", err, + "round", b.Header.Round, + ) + failedBlocks = append(failedBlocks, b) + } } - - // Get group version. - var ce api.EpochTime - ce, err = w.common.consensus.EpochTime().GetEpoch(w.common.ctx, blk.Height) - if err != nil { - w.Logger.Error("error getting epoch block", - "err", err, - "height", blk.Height, + if len(failedBlocks) > 0 { + w.Logger.Warn("failed roothash blocks", + "num_failed_blocks", len(failedBlocks), ) + } + w.toBeChecked = failedBlocks + + // If this is an epoch transition block, update latest known group + // version and resend all transactions. + if blk.Block.Header.HeaderType != block.EpochTransition { continue } - var ch int64 - ch, err = w.common.consensus.EpochTime().GetEpochBlock(w.common.ctx, ce) + + // Get group version. + latestGroupVersion, err = w.getGroupVersion(blk.Height) if err != nil { - w.Logger.Error("error getting epoch number", + w.Logger.Error("failed querying for latest group version", "err", err, - "height", blk.Height, ) continue } - latestGroupVersion = ch // Tell every client to resubmit as messages with old groupVersion // will be discarded. @@ -148,19 +201,42 @@ func (w *blockWatcher) watch() { res := &watchResult{ groupVersion: latestGroupVersion, } - if watch.send(res) != nil { + if watch.send(res, latestHeight) != nil { delete(w.watched, key) } } - gotInitialCommittee = true + case blk := <-consensusBlocks: + if blk == nil { + break + } + latestHeight = blk.Height + // Check if any transaction is considered expired. + for key, watch := range w.watched { + if (latestHeight - w.maxTransactionAge) < watch.height { + continue + } + w.Logger.Debug("expired transaction", + "key", key, + "latest_height", latestHeight, + "max_transaction_age", w.maxTransactionAge, + "watch_height", watch.height, + ) + res := &watchResult{ + err: api.ErrTransactionExpired, + } + // Ignore errors, the watch is getting deleted anyway. + _ = watch.send(res, 0) + close(watch.respCh) + delete(w.watched, key) + } case newWatch := <-w.newCh: w.watched[newWatch.id] = newWatch res := &watchResult{ groupVersion: latestGroupVersion, } - if newWatch.send(res) != nil { + if newWatch.send(res, latestHeight) != nil { delete(w.watched, newWatch.id) } @@ -185,7 +261,7 @@ func (w *blockWatcher) Stop() { close(w.stopCh) } -func newWatcher(common *clientCommon, id common.Namespace, p2pSvc *p2p.P2P) (*blockWatcher, error) { +func newWatcher(common *clientCommon, id common.Namespace, p2pSvc *p2p.P2P, maxTransactionAge int64) (*blockWatcher, error) { // Register handler. p2pSvc.RegisterHandler(id, &p2p.BaseHandler{}) @@ -194,6 +270,7 @@ func newWatcher(common *clientCommon, id common.Namespace, p2pSvc *p2p.P2P) (*bl BaseBackgroundService: *svc, common: common, id: id, + maxTransactionAge: maxTransactionAge, watched: make(map[hash.Hash]*watchRequest), newCh: make(chan *watchRequest), stopCh: make(chan struct{}),