Skip to content

Commit

Permalink
go/worker/txnscheduler: Check txns before queuing them
Browse files Browse the repository at this point in the history
  • Loading branch information
abukosek committed Dec 30, 2019
1 parent b7c3b55 commit de8651c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 65 deletions.
1 change: 1 addition & 0 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var (
{workerCommon.CfgClientPort, workerClientPort},
{storageWorker.CfgWorkerEnabled, true},
{txnscheduler.CfgWorkerEnabled, true},
{txnscheduler.CfgCheckTxEnabled, false},
{mergeWorker.CfgWorkerEnabled, true},
{supplementarysanity.CfgEnabled, true},
{supplementarysanity.CfgInterval, 1},
Expand Down
3 changes: 3 additions & 0 deletions go/worker/txnscheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
// ErrNotReady is the error returned when the transaction scheduler is not
// yet ready to process transactions.
ErrNotReady = errors.New(ModuleName, 3, "txnscheduler: not ready")

// ErrCheckTxFailed is the error returned when CheckTx fails.
ErrCheckTxFailed = errors.New(ModuleName, 4, "txnscheduler: CheckTx failed")
)

// TransactionScheduler is the transaction scheduler API interface.
Expand Down
22 changes: 14 additions & 8 deletions go/worker/txnscheduler/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ var (
)

// Node is a committee node.
type Node struct {
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

checkTxEnabled bool

commonNode *committee.Node
computeNode *computeCommittee.Node

Expand Down Expand Up @@ -389,14 +391,16 @@ func (n *Node) worker() {

n.logger.Info("starting committee node")

// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
if n.checkTxEnabled {
// Initialize worker host for the new runtime.
if err := n.InitializeRuntimeWorkerHost(n.ctx); err != nil {
n.logger.Error("failed to initialize worker host",
"err", err,
)
return
}
defer n.StopRuntimeWorkerHost()
}
defer n.StopRuntimeWorkerHost()

// Initialize transaction scheduler's algorithm.
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
Expand Down Expand Up @@ -451,6 +455,7 @@ func NewNode(
commonNode *committee.Node,
computeNode *computeCommittee.Node,
workerHostFactory host.Factory,
checkTxEnabled bool,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -460,6 +465,7 @@ func NewNode(

n := &Node{
RuntimeHostNode: commonWorker.NewRuntimeHostNode(commonNode, workerHostFactory),
checkTxEnabled: checkTxEnabled,
commonNode: commonNode,
computeNode: computeNode,
ctx: ctx,
Expand Down
10 changes: 9 additions & 1 deletion go/worker/txnscheduler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
const (
// CfgWorkerEnabled enables the tx scheduler worker.
CfgWorkerEnabled = "worker.txn_scheduler.enabled"
// CfgCheckTxEnabled enables checking each transaction before scheduling it.
CfgCheckTxEnabled = "worker.txn_scheduler.check_tx.enabled"
)

// Flags has the configuration flags.
Expand All @@ -23,17 +25,23 @@ func Enabled() bool {
return viper.GetBool(CfgWorkerEnabled)
}

// CheckTxEnabled reads our CheckTx enabled flag from viper.
func CheckTxEnabled() bool {
return viper.GetBool(CfgCheckTxEnabled)
}

// New creates a new worker.
func New(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
) (*Worker, error) {
return newWorker(Enabled(), commonWorker, compute, registration)
return newWorker(Enabled(), commonWorker, compute, registration, CheckTxEnabled())
}

func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable transaction scheduler process")
Flags.Bool(CfgCheckTxEnabled, true, "Enable checking transactions before scheduling them")

_ = viper.BindPFlags(Flags)

Expand Down
43 changes: 43 additions & 0 deletions go/worker/txnscheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package txnscheduler
import (
"context"

"github.com/pkg/errors"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/runtime/transaction"
"github.com/oasislabs/oasis-core/go/worker/common/host/protocol"
"github.com/oasislabs/oasis-core/go/worker/txnscheduler/api"
)

Expand All @@ -15,6 +20,44 @@ func (w *Worker) SubmitTx(ctx context.Context, rq *api.SubmitTxRequest) (*api.Su
return nil, api.ErrUnknownRuntime
}

if w.checkTxEnabled {
// Check transaction before queuing it.
checkRq := &protocol.Body{
WorkerCheckTxBatchRequest: &protocol.WorkerCheckTxBatchRequest{
Inputs: transaction.RawBatch{rq.Data},
},
}
workerHost := runtime.GetWorkerHost()
if workerHost == nil {
w.logger.Error("worker host not initialized")
return nil, api.ErrNotReady
}
resp, err := workerHost.Call(ctx, checkRq)
if err != nil {
return nil, err
}
if resp == nil {
return nil, api.ErrCheckTxFailed
}
if resp.WorkerCheckTxBatchResponse.Results == nil {
return nil, api.ErrCheckTxFailed
}
if len(resp.WorkerCheckTxBatchResponse.Results) != 1 {
return nil, api.ErrCheckTxFailed
}

// Interpret CheckTx result.
resultRaw := resp.WorkerCheckTxBatchResponse.Results[0]
var result transaction.TxnOutput
cbor.MustUnmarshal(resultRaw, &result)
if result.Error != nil {
return nil, errors.Wrap(api.ErrCheckTxFailed, *result.Error)
}
if result.Success == nil {
return nil, api.ErrCheckTxFailed
}
}

if err := runtime.QueueCall(ctx, rq.Data); err != nil {
return nil, err
}
Expand Down
120 changes: 64 additions & 56 deletions go/worker/txnscheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
type Worker struct {
*workerCommon.RuntimeHostWorker

enabled bool
enabled bool
checkTxEnabled bool

commonWorker *workerCommon.Worker
registration *registration.Worker
Expand Down Expand Up @@ -152,7 +153,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory)
node, err := committee.NewNode(commonNode, computeNode, workerHostFactory, w.checkTxEnabled)
if err != nil {
return err
}
Expand All @@ -172,32 +173,37 @@ func newWorker(
commonWorker *workerCommon.Worker,
compute *compute.Worker,
registration *registration.Worker,
checkTxEnabled bool,
) (*Worker, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

w := &Worker{
enabled: enabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[signature.PublicKey]*committee.Node),
ctx: ctx,
cancelCtx: cancelCtx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
enabled: enabled,
checkTxEnabled: checkTxEnabled,
commonWorker: commonWorker,
registration: registration,
compute: compute,
runtimes: make(map[signature.PublicKey]*committee.Node),
ctx: ctx,
cancelCtx: cancelCtx,
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
logger: logging.GetLogger("worker/txnscheduler"),
}

if enabled {
if !w.commonWorker.Enabled() {
panic("common worker should have been enabled for transaction scheduler")
}

// Create the runtime host worker.
var err error
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err

if w.checkTxEnabled {
// Create the runtime host worker.
w.RuntimeHostWorker, err = workerCommon.NewRuntimeHostWorker(commonWorker)
if err != nil {
return nil, err
}
}

// Use existing gRPC server passed from the node.
Expand All @@ -212,48 +218,50 @@ func newWorker(

// Register transaction scheduler worker role.
if err = w.registration.RegisterRole(node.RoleTransactionScheduler, func(n *node.Node) error {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}

for _, rt := range n.Runtimes {
var grr error

workerRT := w.runtimes[rt.ID]
if workerRT == nil {
continue
}

workerHost := workerRT.GetWorkerHost()
if workerHost == nil {
w.logger.Debug("runtime has shut down",
"runtime", rt.ID,
)
continue
}
if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil {
w.logger.Error("failed to obtain CapabilityTEE",
"err", grr,
"runtime", rt.ID,
)
continue
if w.checkTxEnabled {
// Wait until all the runtimes are initialized.
for _, rt := range w.runtimes {
select {
case <-rt.Initialized():
case <-w.ctx.Done():
return w.ctx.Err()
}
}

runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx)
if grr == nil && runtimeVersion != nil {
rt.Version = *runtimeVersion
} else {
w.logger.Error("failed to obtain RuntimeVersion",
"err", grr,
"runtime", rt.ID,
"runtime_version", runtimeVersion,
)
continue
for _, rt := range n.Runtimes {
var grr error

workerRT := w.runtimes[rt.ID]
if workerRT == nil {
continue
}

workerHost := workerRT.GetWorkerHost()
if workerHost == nil {
w.logger.Debug("runtime has shut down",
"runtime", rt.ID,
)
continue
}
if rt.Capabilities.TEE, grr = workerHost.WaitForCapabilityTEE(w.ctx); grr != nil {
w.logger.Error("failed to obtain CapabilityTEE",
"err", grr,
"runtime", rt.ID,
)
continue
}

runtimeVersion, grr := workerHost.WaitForRuntimeVersion(w.ctx)
if grr == nil && runtimeVersion != nil {
rt.Version = *runtimeVersion
} else {
w.logger.Error("failed to obtain RuntimeVersion",
"err", grr,
"runtime", rt.ID,
"runtime_version", runtimeVersion,
)
continue
}
}
}

Expand Down

0 comments on commit de8651c

Please sign in to comment.