Skip to content

Commit

Permalink
Merge pull request #2502 from oasislabs/andrej/feature/txsched-checktx
Browse files Browse the repository at this point in the history
Use CheckTx in transaction scheduler
  • Loading branch information
abukosek authored Jan 14, 2020
2 parents 51ed3fa + 25e1f50 commit d5df3bd
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 20 deletions.
6 changes: 6 additions & 0 deletions .changelog/2502.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/worker/txnscheduler: Check transactions before queuing them

The transaction scheduler can now optionally run runtimes and
check transactions before scheduling them (see issue #1963).
This functionality is disabled by default, enable it with
`worker.txn_scheduler.check_tx.enabled`.
1 change: 1 addition & 0 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
{workerCommon.CfgClientPort, workerClientPort},
{storageWorker.CfgWorkerEnabled, true},
{txnscheduler.CfgWorkerEnabled, true},
{txnscheduler.CfgCheckTxEnabled, false},
{mergeWorker.CfgWorkerEnabled, true},
{supplementarysanity.CfgEnabled, true},
{supplementarysanity.CfgInterval, 1},
Expand Down
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (args *argBuilder) workerTxnschedulerEnabled() *argBuilder {
return args
}

func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled)
return args
}

func (args *argBuilder) iasUseGenesis() *argBuilder {
args.vec = append(args.vec, "--ias.use_genesis")
return args
Expand Down
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (worker *Compute) startNode() error {
workerRuntimeLoader(worker.net.cfg.RuntimeLoaderBinary).
workerMergeEnabled().
workerTxnschedulerEnabled().
workerTxnschedulerCheckTxEnabled().
appendNetwork(worker.net).
appendEntity(worker.entity)
for _, v := range worker.net.runtimes {
Expand Down
11 changes: 6 additions & 5 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ var (

// RuntimesRequiredRoles are the Node roles that require runtimes.
RuntimesRequiredRoles = node.RoleComputeWorker |
node.RoleKeyManager
node.RoleKeyManager |
node.RoleTransactionScheduler

// ConsensusAddressRequiredRoles are the Node roles that require Consensus Address.
ConsensusAddressRequiredRoles = node.RoleValidator
Expand Down Expand Up @@ -1228,12 +1229,12 @@ func SanityCheckNodes(nodes []*node.SignedNode, seenEntities map[signature.Publi
return fmt.Errorf("registry: sanity check failed: key manager node must have runtime(s)")
}

if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
if n.HasRoles(node.RoleTransactionScheduler) && len(n.Runtimes) == 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node must have runtime(s)")
}

if n.HasRoles(node.RoleTransactionScheduler) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node shouldn't have any runtimes")
if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
}

if n.HasRoles(node.RoleMergeWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
Expand Down
3 changes: 3 additions & 0 deletions go/worker/txnscheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
// ErrNotReady is the error returned when the transaction scheduler is not
// yet ready to process transactions.
ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready")

// ErrCheckTxFailed is the error returned when CheckTx fails.
ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed")
)

// TransactionScheduler is the transaction scheduler API interface.
Expand Down
95 changes: 94 additions & 1 deletion go/worker/txnscheduler/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crash"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
Expand All @@ -20,7 +21,10 @@ import (
"github.com/oasislabs/oasis-core/go/roothash/api/block"
"github.com/oasislabs/oasis-core/go/runtime/transaction"
storage "github.com/oasislabs/oasis-core/go/storage/api"
commonWorker "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/committee"
"github.com/oasislabs/oasis-core/go/worker/common/host"
"github.com/oasislabs/oasis-core/go/worker/common/host/protocol"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee"
txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm"
Expand Down Expand Up @@ -49,7 +53,11 @@ var (
)

// Node is a committee node.
type Node struct {
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

checkTxEnabled bool

commonNode *committee.Node
computeNode *computeCommittee.Node

Expand Down Expand Up @@ -128,6 +136,68 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) (boo
return false, nil
}

// CheckTx checks the given call in the node's runtime.
func (n *Node) CheckTx(ctx context.Context, call []byte) error {
n.commonNode.CrossNode.Lock()
currentBlock := n.commonNode.CurrentBlock
n.commonNode.CrossNode.Unlock()

if currentBlock == nil {
return api.ErrNotReady
}

checkRq := &protocol.Body{
WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{
Inputs: transaction.RawBatch{call},
Block: *currentBlock,
},
}
workerHost := n.GetWorkerHost()
if workerHost == nil {
n.logger.Error("worker host not initialized")
return api.ErrNotReady
}
resp, err := workerHost.Call(ctx, checkRq)
if err != nil {
n.logger.Error("worker host CheckTx call error",
"err", err,
)
return err
}
if resp == nil {
n.logger.Error("worker host CheckTx reponse is nil")
return api.ErrCheckTxFailed
}
if resp.WorkerCheckTxBatchResponse.Results == nil {
n.logger.Error("worker host CheckTx response contains no results")
return api.ErrCheckTxFailed
}
if len(resp.WorkerCheckTxBatchResponse.Results) != 1 {
n.logger.Error("worker host CheckTx response doesn't contain exactly one result",
"num_results", len(resp.WorkerCheckTxBatchResponse.Results),
)
return api.ErrCheckTxFailed
}

// Interpret CheckTx result.
resultRaw := resp.WorkerCheckTxBatchResponse.Results[0]
var result transaction.TxnOutput
if err = cbor.Unmarshal(resultRaw, &result); err != nil {
n.logger.Error("worker host CheckTx response failed to deserialize",
"err", err,
)
return api.ErrCheckTxFailed
}
if result.Error != nil {
n.logger.Error("worker CheckTx failed with error",
"err", result.Error,
)
return fmt.Errorf("%w: %s", api.ErrCheckTxFailed, *result.Error)
}

return nil
}

// QueueCall queues a call for processing by this node.
func (n *Node) QueueCall(ctx context.Context, call []byte) error {
// Check if we are a leader. Note that we may be in the middle of a
Expand All @@ -136,6 +206,14 @@ func (n *Node) QueueCall(ctx context.Context, call []byte) error {
return api.ErrNotLeader
}

if n.checkTxEnabled {
// Check transaction before queuing it.
if err := n.CheckTx(ctx, call); err != nil {
return err
}
n.logger.Debug("worker CheckTx successful, queuing transaction")
}

n.algorithmMutex.RLock()
defer n.algorithmMutex.RUnlock()

Expand Down Expand Up @@ -385,6 +463,17 @@ func (n *Node) worker() {

n.logger.Info("starting committee node")

if n.checkTxEnabled {
// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
}
defer n.StopRuntimeWorkerHost()
}

// Initialize transaction scheduler's algorithm.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
if err != nil {
Expand Down Expand Up @@ -437,6 +526,8 @@ func (n *Node) worker() {
func NewNode(
commonNode *committee.Node,
computeNode *computeCommittee.Node,
workerHostFactory host.Factory,
checkTxEnabled bool,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -445,6 +536,8 @@ func NewNode(
ctx, cancel := context.WithCancel(context.Background())

n := &Node{
RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory),
checkTxEnabled: checkTxEnabled,
commonNode: commonNode,
computeNode: computeNode,
ctx: ctx,
Expand Down
10 changes: 9 additions & 1 deletion go/worker/txnscheduler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
const (
// CfgWorkerEnabled enables the tx scheduler worker.
CfgWorkerEnabled = "worker.txn_scheduler.enabled"
// CfgCheckTxEnabled enables checking each transaction before scheduling it.
CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled"
)

// Flags has the configuration flags.
Expand All @@ -23,17 +25,23 @@ func Enabled() bool {
return viper.GetBool(CfgWorkerEnabled)
}

// CheckTxEnabled reads our CheckTx enabled flag from viper.
func CheckTxEnabled() bool {
return viper.GetBool(CfgCheckTxEnabled)
}

// New creates a new worker.
func New(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
) (*Worker, error) {
return newWorker(Enabled(), commonWorker, compute, registration)
return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled())
}

func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process")
Flags.Bool(CfgCheckTxEnabled, false, "Enable checking transactions before scheduling them")

_ = viper.BindPFlags(Flags)

Expand Down
64 changes: 51 additions & 13 deletions go/worker/txnscheduler/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package txnscheduler

import (
"context"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
Expand All @@ -14,14 +16,18 @@ import (

// Worker is a transaction scheduler handling many runtimes.
type Worker struct {
enabled bool
*workerCommon.RuntimeHostWorker

enabled bool
checkTxEnabled bool

commonWorker *workerCommon.Worker
registration *registration.Worker
compute *compute.Worker

runtimes map[common.Namespace]*committee.Node

ctx context.Context
quitCh chan struct{}
initCh chan struct{}

Expand Down Expand Up @@ -139,7 +145,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
// Get other nodes from this runtime.
computeNode := w.compute.GetRuntime(id)

node, err := committee.NewNode(commonNode, computeNode)
// Create worker host for the given runtime.
workerHostFactory, err := w.NewRuntimeWorkerHostFactory(node.RoleTransactionScheduler, id)
if err != nil {
return err
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled)
if err != nil {
return err
}
Expand All @@ -159,36 +172,61 @@ func newWorker(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
checkTxEnabled bool,
) (*Worker, error) {
ctx := context.Background()

w := &Worker{
enabled: enabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
enabled: enabled,
checkTxEnabled: checkTxEnabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
}

if enabled {
if !w.commonWorker.Enabled() {
panic("common worker should have been enabled for transaction scheduler")
}

var err error

// Create the runtime host worker.
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err
}

// Use existing gRPC server passed from the node.
api.RegisterService(commonWorker.Grpc.Server(), w)

// Register all configured runtimes.
for _, rt := range commonWorker.GetRuntimes() {
if err := w.registerRuntime(rt); err != nil {
if err = w.registerRuntime(rt); err != nil {
return nil, err
}
}

// Register transaction scheduler worker role.
if err := w.registration.RegisterRole(node.RoleTransactionScheduler,
func(n *node.Node) error { return nil }); err != nil {
if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error {
if w.checkTxEnabled {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}
}

return nil
}); err != nil {
return nil, err
}
}
Expand Down

0 comments on commit d5df3bd

Please sign in to comment.