Skip to content

Commit

Permalink
go/worker/txnscheduler: Check txns before queuing them
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Jan 14, 2020
1 parent 5fb89e1 commit 84f9f98
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 69 deletions.
6 changes: 6 additions & 0 deletions .changelog/2502.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go/worker/txnscheduler: Check transactions before queuing them

The transaction scheduler can now optionally run runtimes and
check transactions before scheduling them (see issue #1963).
This functionality is disabled by default, enable it with
`worker.txn_scheduler.check_tx.enabled`.
1 change: 1 addition & 0 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
{workerCommon.CfgClientPort, workerClientPort},
{storageWorker.CfgWorkerEnabled, true},
{txnscheduler.CfgWorkerEnabled, true},
{txnscheduler.CfgCheckTxEnabled, false},
{mergeWorker.CfgWorkerEnabled, true},
{supplementarysanity.CfgEnabled, true},
{supplementarysanity.CfgInterval, 1},
Expand Down
5 changes: 5 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (args *argBuilder) workerTxnschedulerEnabled() *argBuilder {
return args
}

func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled)
return args
}

func (args *argBuilder) iasUseGenesis() *argBuilder {
args.vec = append(args.vec, "--ias.use_genesis")
return args
Expand Down
1 change: 1 addition & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (worker *Compute) startNode() error {
workerRuntimeLoader(worker.net.cfg.RuntimeLoaderBinary).
workerMergeEnabled().
workerTxnschedulerEnabled().
workerTxnschedulerCheckTxEnabled().
appendNetwork(worker.net).
appendEntity(worker.entity)
for _, v := range worker.net.runtimes {
Expand Down
3 changes: 3 additions & 0 deletions go/worker/txnscheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
// ErrNotReady is the error returned when the transaction scheduler is not
// yet ready to process transactions.
ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready")

// ErrCheckTxFailed is the error returned when CheckTx fails.
ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed")
)

// TransactionScheduler is the transaction scheduler API interface.
Expand Down
94 changes: 86 additions & 8 deletions go/worker/txnscheduler/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crash"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
Expand All @@ -23,6 +24,7 @@ import (
commonWorker "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/committee"
"github.com/oasislabs/oasis-core/go/worker/common/host"
"github.com/oasislabs/oasis-core/go/worker/common/host/protocol"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
computeCommittee "github.com/oasislabs/oasis-core/go/worker/compute/committee"
txnSchedulerAlgorithm "github.com/oasislabs/oasis-core/go/worker/txnscheduler/algorithm"
Expand Down Expand Up @@ -51,9 +53,11 @@ var (
)

// Node is a committee node.
type Node struct {
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

checkTxEnabled bool

commonNode *committee.Node
computeNode *computeCommittee.Node

Expand Down Expand Up @@ -132,6 +136,68 @@ func (n *Node) HandlePeerMessage(ctx context.Context, message *p2p.Message) (boo
return false, nil
}

// CheckTx checks the given call in the node's runtime.
func (n *Node) CheckTx(ctx context.Context, call []byte) error {
n.commonNode.CrossNode.Lock()
currentBlock := n.commonNode.CurrentBlock
n.commonNode.CrossNode.Unlock()

if currentBlock == nil {
return api.ErrNotReady
}

checkRq := &protocol.Body{
WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{
Inputs: transaction.RawBatch{call},
Block: *currentBlock,
},
}
workerHost := n.GetWorkerHost()
if workerHost == nil {
n.logger.Error("worker host not initialized")
return api.ErrNotReady
}
resp, err := workerHost.Call(ctx, checkRq)
if err != nil {
n.logger.Error("worker host CheckTx call error",
"err", err,
)
return err
}
if resp == nil {
n.logger.Error("worker host CheckTx reponse is nil")
return api.ErrCheckTxFailed
}
if resp.WorkerCheckTxBatchResponse.Results == nil {
n.logger.Error("worker host CheckTx response contains no results")
return api.ErrCheckTxFailed
}
if len(resp.WorkerCheckTxBatchResponse.Results) != 1 {
n.logger.Error("worker host CheckTx response doesn't contain exactly one result",
"num_results", len(resp.WorkerCheckTxBatchResponse.Results),
)
return api.ErrCheckTxFailed
}

// Interpret CheckTx result.
resultRaw := resp.WorkerCheckTxBatchResponse.Results[0]
var result transaction.TxnOutput
if err = cbor.Unmarshal(resultRaw, &result); err != nil {
n.logger.Error("worker host CheckTx response failed to deserialize",
"err", err,
)
return api.ErrCheckTxFailed
}
if result.Error != nil {
n.logger.Error("worker CheckTx failed with error",
"err", result.Error,
)
return fmt.Errorf("%w: %s", api.ErrCheckTxFailed, *result.Error)
}

return nil
}

// QueueCall queues a call for processing by this node.
func (n *Node) QueueCall(ctx context.Context, call []byte) error {
// Check if we are a leader. Note that we may be in the middle of a
Expand All @@ -140,6 +206,14 @@ func (n *Node) QueueCall(ctx context.Context, call []byte) error {
return api.ErrNotLeader
}

if n.checkTxEnabled {
// Check transaction before queuing it.
if err := n.CheckTx(ctx, call); err != nil {
return err
}
n.logger.Debug("worker CheckTx successful, queuing transaction")
}

n.algorithmMutex.RLock()
defer n.algorithmMutex.RUnlock()

Expand Down Expand Up @@ -389,14 +463,16 @@ func (n *Node) worker() {

n.logger.Info("starting committee node")

// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
if n.checkTxEnabled {
// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
}
defer n.StopRuntimeWorkerHost()
}
defer n.StopRuntimeWorkerHost()

// Initialize transaction scheduler's algorithm.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
Expand Down Expand Up @@ -451,6 +527,7 @@ func NewNode(
commonNode *committee.Node,
computeNode *computeCommittee.Node,
workerHostFactory host.Factory,
checkTxEnabled bool,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -460,6 +537,7 @@ func NewNode(

n := &Node{
RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory),
checkTxEnabled: checkTxEnabled,
commonNode: commonNode,
computeNode: computeNode,
ctx: ctx,
Expand Down
10 changes: 9 additions & 1 deletion go/worker/txnscheduler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
const (
// CfgWorkerEnabled enables the tx scheduler worker.
CfgWorkerEnabled = "worker.txn_scheduler.enabled"
// CfgCheckTxEnabled enables checking each transaction before scheduling it.
CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled"
)

// Flags has the configuration flags.
Expand All @@ -23,17 +25,23 @@ func Enabled() bool {
return viper.GetBool(CfgWorkerEnabled)
}

// CheckTxEnabled reads our CheckTx enabled flag from viper.
func CheckTxEnabled() bool {
return viper.GetBool(CfgCheckTxEnabled)
}

// New creates a new worker.
func New(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
) (*Worker, error) {
return newWorker(Enabled(), commonWorker, compute, registration)
return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled())
}

func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process")
Flags.Bool(CfgCheckTxEnabled, false, "Enable checking transactions before scheduling them")

_ = viper.BindPFlags(Flags)

Expand Down
88 changes: 28 additions & 60 deletions go/worker/txnscheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
type Worker struct {
*workerCommon.RuntimeHostWorker

enabled bool
enabled bool
checkTxEnabled bool

commonWorker *workerCommon.Worker
registration *registration.Worker
compute *compute.Worker

runtimes map[common.Namespace]*committee.Node

ctx context.Context
cancelCtx context.CancelFunc
quitCh chan struct{}
initCh chan struct{}
ctx context.Context
quitCh chan struct{}
initCh chan struct{}

logger *logging.Logger
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory)
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled)
if err != nil {
return err
}
Expand All @@ -172,29 +172,31 @@ func newWorker(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
checkTxEnabled bool,
) (*Worker, error) {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx := context.Background()

w := &Worker{
enabled: enabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
cancelCtx: cancelCtx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
enabled: enabled,
checkTxEnabled: checkTxEnabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
}

if enabled {
if !w.commonWorker.Enabled() {
panic("common worker should have been enabled for transaction scheduler")
}

// Create the runtime host worker.
var err error

// Create the runtime host worker.
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err
Expand All @@ -212,48 +214,14 @@ func newWorker(

// Register transaction scheduler worker role.
if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}

for _, rt := range n.Runtimes {
var grr error

workerRT := w.runtimes[rt.ID]
if workerRT == nil {
continue
}

workerHost := workerRT.GetWorkerHost()
if workerHost == nil {
w.logger.Debug("runtime has shut down",
"runtime", rt.ID,
)
continue
}
if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil {
w.logger.Error("failed to obtain CapabilityTEE",
"err", grr,
"runtime", rt.ID,
)
continue
}

runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx)
if grr == nil && runtimeVersion != nil {
rt.Version = *runtimeVersion
} else {
w.logger.Error("failed to obtain RuntimeVersion",
"err", grr,
"runtime", rt.ID,
"runtime_version", runtimeVersion,
)
continue
if w.checkTxEnabled {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}
}

Expand Down

0 comments on commit 84f9f98

Please sign in to comment.