Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CheckTx in transaction scheduler #2502

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/2502.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/worker/txnscheduler: Check transactions before queuing them

The transaction scheduler can now optionally run runtimes and
check transactions before scheduling them (see issue #1963).
This functionality is disabled by default, enable it with
`worker.txn_scheduler.check_tx.enabled`.
1 change: 1 addition & 0 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
{workerCommon.CfgClientPort, workerClientPort},
{storageWorker.CfgWorkerEnabled, true},
{txnscheduler.CfgWorkerEnabled, true},
{txnscheduler.CfgCheckTxEnabled, false},
{mergeWorker.CfgWorkerEnabled, true},
{supplementarysanity.CfgEnabled, true},
{supplementarysanity.CfgInterval, 1},
Expand Down
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (args *argBuilder) workerTxnschedulerEnabled() *argBuilder {
return args
}

func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled)
return args
}

func (args *argBuilder) iasUseGenesis() *argBuilder {
args.vec = append(args.vec, "--ias.use_genesis")
return args
Expand Down
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (worker *Compute) startNode() error {
workerRuntimeLoader(worker.net.cfg.RuntimeLoaderBinary).
workerMergeEnabled().
workerTxnschedulerEnabled().
workerTxnschedulerCheckTxEnabled().
appendNetwork(worker.net).
appendEntity(worker.entity)
for _, v := range worker.net.runtimes {
Expand Down
11 changes: 6 additions & 5 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ var (

// RuntimesRequiredRoles are the Node roles that require runtimes.
RuntimesRequiredRoles = node.RoleComputeWorker |
node.RoleKeyManager
node.RoleKeyManager |
node.RoleTransactionScheduler

// ConsensusAddressRequiredRoles are the Node roles that require Consensus Address.
ConsensusAddressRequiredRoles = node.RoleValidator
Expand Down Expand Up @@ -1228,12 +1229,12 @@ func SanityCheckNodes(nodes []*node.SignedNode, seenEntities map[signature.Publi
return fmt.Errorf("registry: sanity check failed: key manager node must have runtime(s)")
}

if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
if n.HasRoles(node.RoleTransactionScheduler) && len(n.Runtimes) == 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node must have runtime(s)")
}

if n.HasRoles(node.RoleTransactionScheduler) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: transaction scheduler node shouldn't have any runtimes")
if n.HasRoles(node.RoleStorageWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
return fmt.Errorf("registry: sanity check failed: storage worker node shouldn't have any runtimes")
}

if n.HasRoles(node.RoleMergeWorker) && !n.HasRoles(node.RoleComputeWorker) && !n.HasRoles(node.RoleKeyManager) && len(n.Runtimes) > 0 {
Expand Down
3 changes: 3 additions & 0 deletions go/worker/txnscheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
// ErrNotReady is the error returned when the transaction scheduler is not
// yet ready to process transactions.
ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready")

// ErrCheckTxFailed is the error returned when CheckTx fails.
ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed")
)

// TransactionScheduler is the transaction scheduler API interface.
Expand Down
95 changes: 94 additions & 1 deletion go/worker/txnscheduler/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crash"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
Expand All @@ -20,7 +21,10 @@ import (
"github.com/oasislabs/oasis-core/go/roothash/api/block"
"github.com/oasislabs/oasis-core/go/runtime/transaction"
storage "github.com/oasislabs/oasis-core/go/storage/api"
commonWorker "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/committee"
"github.com/oasislabs/oasis-core/go/worker/common/host"
"github.com/oasislabs/oasis-core/go/worker/common/host/protocol"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee"
txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm"
Expand Down Expand Up @@ -49,7 +53,11 @@ var (
)

// Node is a committee node.
type Node struct {
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

checkTxEnabled bool

commonNode *committee.Node
computeNode *computeCommittee.Node

Expand Down Expand Up @@ -128,6 +136,68 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) (boo
return false, nil
}

// CheckTx checks the given call in the node's runtime.
func (n *Node) CheckTx(ctx context.Context, call []byte) error {
n.commonNode.CrossNode.Lock()
currentBlock := n.commonNode.CurrentBlock
n.commonNode.CrossNode.Unlock()

if currentBlock == nil {
return api.ErrNotReady
}

checkRq := &protocol.Body{
WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{
Inputs: transaction.RawBatch{call},
Block: *currentBlock,
},
}
workerHost := n.GetWorkerHost()
if workerHost == nil {
n.logger.Error("worker host not initialized")
return api.ErrNotReady
}
resp, err := workerHost.Call(ctx, checkRq)
if err != nil {
n.logger.Error("worker host CheckTx call error",
"err", err,
)
return err
}
if resp == nil {
n.logger.Error("worker host CheckTx reponse is nil")
return api.ErrCheckTxFailed
}
if resp.WorkerCheckTxBatchResponse.Results == nil {
n.logger.Error("worker host CheckTx response contains no results")
return api.ErrCheckTxFailed
}
if len(resp.WorkerCheckTxBatchResponse.Results) != 1 {
n.logger.Error("worker host CheckTx response doesn't contain exactly one result",
"num_results", len(resp.WorkerCheckTxBatchResponse.Results),
)
return api.ErrCheckTxFailed
}

// Interpret CheckTx result.
resultRaw := resp.WorkerCheckTxBatchResponse.Results[0]
var result transaction.TxnOutput
if err = cbor.Unmarshal(resultRaw, &result); err != nil {
n.logger.Error("worker host CheckTx response failed to deserialize",
"err", err,
)
return api.ErrCheckTxFailed
}
if result.Error != nil {
n.logger.Error("worker CheckTx failed with error",
"err", result.Error,
)
return fmt.Errorf("%w: %s", api.ErrCheckTxFailed, *result.Error)
}

return nil
}

// QueueCall queues a call for processing by this node.
func (n *Node) QueueCall(ctx context.Context, call []byte) error {
// Check if we are a leader. Note that we may be in the middle of a
Expand All @@ -136,6 +206,14 @@ func (n *Node) QueueCall(ctx context.Context, call []byte) error {
return api.ErrNotLeader
}

if n.checkTxEnabled {
// Check transaction before queuing it.
if err := n.CheckTx(ctx, call); err != nil {
return err
}
n.logger.Debug("worker CheckTx successful, queuing transaction")
}

n.algorithmMutex.RLock()
defer n.algorithmMutex.RUnlock()

Expand Down Expand Up @@ -385,6 +463,17 @@ func (n *Node) worker() {

n.logger.Info("starting committee node")

if n.checkTxEnabled {
// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
}
defer n.StopRuntimeWorkerHost()
}

// Initialize transaction scheduler's algorithm.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
if err != nil {
Expand Down Expand Up @@ -437,6 +526,8 @@ func (n *Node) worker() {
func NewNode(
commonNode *committee.Node,
computeNode *computeCommittee.Node,
workerHostFactory host.Factory,
checkTxEnabled bool,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -445,6 +536,8 @@ func NewNode(
ctx, cancel := context.WithCancel(context.Background())

n := &Node{
RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory),
checkTxEnabled: checkTxEnabled,
commonNode: commonNode,
computeNode: computeNode,
ctx: ctx,
Expand Down
10 changes: 9 additions & 1 deletion go/worker/txnscheduler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
const (
// CfgWorkerEnabled enables the tx scheduler worker.
CfgWorkerEnabled = "worker.txn_scheduler.enabled"
// CfgCheckTxEnabled enables checking each transaction before scheduling it.
CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled"
)

// Flags has the configuration flags.
Expand All @@ -23,17 +25,23 @@ func Enabled() bool {
return viper.GetBool(CfgWorkerEnabled)
}

// CheckTxEnabled reads our CheckTx enabled flag from viper.
func CheckTxEnabled() bool {
return viper.GetBool(CfgCheckTxEnabled)
}

// New creates a new worker.
func New(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
) (*Worker, error) {
return newWorker(Enabled(), commonWorker, compute, registration)
return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled())
}

func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process")
Flags.Bool(CfgCheckTxEnabled, false, "Enable checking transactions before scheduling them")

_ = viper.BindPFlags(Flags)

Expand Down
64 changes: 51 additions & 13 deletions go/worker/txnscheduler/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package txnscheduler

import (
"context"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
Expand All @@ -14,14 +16,18 @@ import (

// Worker is a transaction scheduler handling many runtimes.
type Worker struct {
enabled bool
*workerCommon.RuntimeHostWorker

enabled bool
checkTxEnabled bool

commonWorker *workerCommon.Worker
registration *registration.Worker
compute *compute.Worker

runtimes map[common.Namespace]*committee.Node

ctx context.Context
quitCh chan struct{}
initCh chan struct{}

Expand Down Expand Up @@ -139,7 +145,14 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
// Get other nodes from this runtime.
computeNode := w.compute.GetRuntime(id)

node, err := committee.NewNode(commonNode, computeNode)
// Create worker host for the given runtime.
workerHostFactory, err := w.NewRuntimeWorkerHostFactory(node.RoleTransactionScheduler, id)
if err != nil {
return err
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled)
if err != nil {
return err
}
Expand All @@ -159,36 +172,61 @@ func newWorker(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
checkTxEnabled bool,
) (*Worker, error) {
ctx := context.Background()

w := &Worker{
enabled: enabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
enabled: enabled,
checkTxEnabled: checkTxEnabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
}

if enabled {
if !w.commonWorker.Enabled() {
panic("common worker should have been enabled for transaction scheduler")
}

var err error

// Create the runtime host worker.
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err
}

// Use existing gRPC server passed from the node.
api.RegisterService(commonWorker.Grpc.Server(), w)

// Register all configured runtimes.
for _, rt := range commonWorker.GetRuntimes() {
if err := w.registerRuntime(rt); err != nil {
if err = w.registerRuntime(rt); err != nil {
return nil, err
}
}

// Register transaction scheduler worker role.
if err := w.registration.RegisterRole(node.RoleTransactionScheduler,
func(n *node.Node) error { return nil }); err != nil {
if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error {
if w.checkTxEnabled {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}
}

return nil
}); err != nil {
return nil, err
}
}
Expand Down