Skip to content

Commit

Permalink
go/worker/executor: cache of last seen runtime transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Sep 16, 2020
1 parent 93bf5a3 commit 6a28068
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changelog/3274.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/executor: Cache last seen runtime transactions

To enable a basic form of runtime transaction replay prevention, the
transaction scheduler maintains a LRU cache of last seen runtime transactions
keyed by transaction hash.
10 changes: 10 additions & 0 deletions go/common/cache/lru/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (c *Cache) Keys() []interface{} {
return vec
}

// Clear empties the cache.
func (c *Cache) Clear() {
c.Lock()
defer c.Unlock()

c.size = 0
c.lru = list.New()
c.entries = make(map[interface{}]*list.Element)
}

// Size returns the current cache size in the units specified by a `Capacity`
// option at creation time.
func (c *Cache) Size() uint64 {
Expand Down
7 changes: 7 additions & 0 deletions go/common/cache/lru/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func TestLRUCapacityEntries(t *testing.T) {
require.Equal(updateVal, v, "Get - update")

require.Equal(uint64(cacheSize), cache.Size(), "Size")

// Clear cache.
cache.Clear()
_, ok = cache.Peek(entries[0].key)
require.False(ok, "Peek - expected entry to not exist after removal")
require.Empty(cache.Keys(), "Empty keys")
require.EqualValues(0, cache.Size(), "Empty size")
}

func TestLRUCapacityBytes(t *testing.T) {
Expand Down
18 changes: 16 additions & 2 deletions go/oasis-node/cmd/debug/txsource/workload/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ func (r *runtime) doInsertRequest(ctx context.Context, rng *rand.Rand, rtc runti
Args: struct {
Key string `json:"key"`
Value string `json:"value"`
Nonce uint64 `json:"nonce"`
}{
Key: key,
Value: value,
Nonce: rng.Uint64(),
},
}
rsp, err := r.submitRuntimeRquest(ctx, rtc, req)
Expand Down Expand Up @@ -186,7 +188,13 @@ func (r *runtime) doGetRequest(ctx context.Context, rng *rand.Rand, rtc runtimeC
// Submit request.
req := &runtimeTransaction.TxnCall{
Method: "get",
Args: key,
Args: struct {
Key string `json:"key"`
Nonce uint64 `json:"nonce"`
}{
Key: key,
Nonce: rng.Uint64(),
},
}
rsp, err := r.submitRuntimeRquest(ctx, rtc, req)
if err != nil {
Expand Down Expand Up @@ -223,7 +231,13 @@ func (r *runtime) doRemoveRequest(ctx context.Context, rng *rand.Rand, rtc runti
// Submit request.
req := &runtimeTransaction.TxnCall{
Method: "remove",
Args: key,
Args: struct {
Key string `json:"key"`
Nonce uint64 `json:"nonce"`
}{
Key: key,
Nonce: rng.Uint64(),
},
}
rsp, err := r.submitRuntimeRquest(ctx, rtc, req)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions go/oasis-test-runner/scenario/e2e/runtime/keymanager_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ func newKmRestartImpl() scenario.Scenario {
runtimeImpl: *newRuntimeImpl(
"keymanager-restart",
"simple-keyvalue-enc-client",
[]string{"--key", "key1"},
[]string{
"--key", "key1",
"--seed", "first_seed",
},
),
}
}
Expand Down Expand Up @@ -69,7 +72,10 @@ func (sc *kmRestartImpl) Run(childEnv *env.Env) error {
// Run the second client on a different key so that it will require
// a second trip to the keymanager.
sc.Logger.Info("starting a second client to check if key manager works")
sc.runtimeImpl.clientArgs = []string{"--key", "key2"}
sc.runtimeImpl.clientArgs = []string{
"--key", "key2",
"--seed", "second_seed",
}
cmd, err = sc.startClient(childEnv)
if err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions go/oasis-test-runner/scenario/e2e/runtime/keymanager_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func newKmUpgradeImpl() scenario.Scenario {
runtimeImpl: *newRuntimeImpl(
"keymanager-upgrade",
"simple-keyvalue-enc-client",
nil,
[]string{
"--key", "key1",
"--seed", "first_seed",
},
),
}
}
Expand Down Expand Up @@ -279,7 +282,10 @@ func (sc *kmUpgradeImpl) Run(childEnv *env.Env) error {

// Run client again.
sc.Logger.Info("starting a second client to check if key manager works")
sc.runtimeImpl.clientArgs = []string{"--key", "key2"}
sc.runtimeImpl.clientArgs = []string{
"--key", "key2",
"--seed", "second_seed",
}
cmd, err = sc.startClient(childEnv)
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion go/oasis-test-runner/scenario/e2e/runtime/runtime_dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,13 @@ func (sc *runtimeDynamicImpl) Run(childEnv *env.Env) error { // nolint: gocyclo
sc.Logger.Info("checking if genesis state has been initialized")
var rawRsp cbor.RawMessage
var err error
if rawRsp, err = sc.submitRuntimeTx(ctx, runtimeID, "get", runtimeDynamicTestKey); err != nil {
if rawRsp, err = sc.submitRuntimeTx(ctx, runtimeID, "get", struct {
Key string `json:"key"`
Nonce uint64 `json:"nonce"`
}{
Key: runtimeDynamicTestKey,
Nonce: 1234567890,
}); err != nil {
return fmt.Errorf("failed to submit get tx to runtime: %w", err)
}
var rsp string
Expand Down
5 changes: 4 additions & 1 deletion go/oasis-test-runner/scenario/e2e/runtime/runtime_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ func (sc *runtimeUpgradeImpl) Run(childEnv *env.Env) error {

// Run client again.
sc.Logger.Info("starting a second client to check if runtime works")
sc.runtimeImpl.clientArgs = []string{"--key", "key2"}
sc.runtimeImpl.clientArgs = []string{
"--key", "key2",
"--seed", "second_seed",
}
cmd, err = sc.startClient(childEnv)
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions go/runtime/client/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -135,8 +136,8 @@ func testQuery(
require.EqualValues(t, 2, tx.Block.Header.Round)
require.EqualValues(t, 0, tx.Index)
// Check for values from TestNode/ExecutorWorker/QueueTx
require.EqualValues(t, []byte("hello world"), tx.Input)
require.EqualValues(t, []byte("hello world"), tx.Output)
require.True(t, strings.HasPrefix(string(tx.Input), "hello world"))
require.True(t, strings.HasPrefix(string(tx.Output), "hello world"))

// Transactions (check the mock worker for content).
txns, err := c.GetTxs(ctx, &api.GetTxsRequest{RuntimeID: runtimeID, Round: blk.Header.Round, IORoot: blk.Header.IORoot})
Expand All @@ -163,8 +164,8 @@ func testQuery(
require.EqualValues(t, 2, results[0].Block.Header.Round)
require.EqualValues(t, 0, results[0].Index)
// Check for values from TestNode/ExecutorWorker/QueueTx
require.EqualValues(t, []byte("hello world"), results[0].Input)
require.EqualValues(t, []byte("hello world"), results[0].Output)
require.True(t, strings.HasPrefix(string(results[0].Input), "hello world"))
require.True(t, strings.HasPrefix(string(results[0].Output), "hello world"))

// Query genesis block again.
genBlk2, err := c.GetGenesisBlock(ctx, runtimeID)
Expand Down
30 changes: 30 additions & 0 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"

"github.com/oasisprotocol/oasis-core/go/common/cache/lru"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
Expand Down Expand Up @@ -52,6 +53,7 @@ var (
errNotReady = fmt.Errorf("executor: runtime not ready")
errCheckTxFailed = fmt.Errorf("executor: CheckTx failed")
errNotTxnScheduler = fmt.Errorf("executor: not transaction scheduler in this round")
errDuplicateTx = fmt.Errorf("executor: duplicate transaction")

// Duration to wait before submitting the propose timeout request.
proposeTimeoutDelay = 2 * time.Second
Expand Down Expand Up @@ -132,6 +134,8 @@ var (
type Node struct { // nolint: maligned
*commonWorker.RuntimeHostNode

lastScheduledCache *lru.Cache

scheduleCheckTxEnabled bool
scheduleMaxQueueSize uint64

Expand Down Expand Up @@ -432,6 +436,7 @@ func (n *Node) HandleEpochTransitionLocked(epoch *committee.EpochSnapshot) {
if !epoch.IsExecutorWorker() {
// Clear incoming queue if we are not a worker anymore.
n.scheduler.Clear()
n.lastScheduledCache.Clear()
incomingQueueSize.With(n.getMetricLabels()).Set(0)
}

Expand Down Expand Up @@ -593,10 +598,28 @@ func (n *Node) QueueTx(call []byte) error {
if n.scheduler == nil || !n.scheduler.IsInitialized() {
return errNotReady
}

callHash := hash.NewFromBytes(call)
if _, b := n.lastScheduledCache.Peek(callHash); b {
if err := n.scheduler.Flush(false); err != nil {
n.logger.Error("failed flushing queue",
"err", err,
)
}
return errDuplicateTx
}

if err := n.scheduler.ScheduleTx(call); err != nil {
return err
}

if err := n.lastScheduledCache.Put(callHash, true); err != nil {
// cache.Put can only error if capacity in bytes is used and the
// inserted value is too large. This should never happen in here.
n.logger.Error("cache put error",
"err", err,
)
}
incomingQueueSize.With(n.getMetricLabels()).Set(float64(n.scheduler.UnscheduledSize()))

return nil
Expand Down Expand Up @@ -1412,6 +1435,7 @@ func NewNode(
roleProvider registration.RoleProvider,
scheduleCheckTxEnabled bool,
scheduleMaxQueueSize uint64,
lastScheduledCacheSize uint64,
) (*Node, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(nodeCollectors...)
Expand All @@ -1423,6 +1447,11 @@ func NewNode(
return nil, err
}

cache, err := lru.New(lru.Capacity(lastScheduledCacheSize, false))
if err != nil {
return nil, fmt.Errorf("error creating cache: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())

n := &Node{
Expand All @@ -1432,6 +1461,7 @@ func NewNode(
roleProvider: roleProvider,
scheduleCheckTxEnabled: scheduleCheckTxEnabled,
scheduleMaxQueueSize: scheduleMaxQueueSize,
lastScheduledCache: cache,
ctx: ctx,
cancelCtx: cancel,
stopCh: make(chan struct{}),
Expand Down
5 changes: 4 additions & 1 deletion go/worker/compute/executor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const (
// scheduling it.
CfgScheduleCheckTxEnabled = "worker.executor.schedule_check_tx.enabled"

cfgMaxQueueSize = "worker.executor.schedule_max_queue_size"
cfgMaxQueueSize = "worker.executor.schedule_max_queue_size"
cfgScheduleTxCacheSize = "worker.executor.schedule_tx_cache_size"
)

// Flags has the configuration flags.
Expand All @@ -33,12 +34,14 @@ func New(
registration,
viper.GetBool(CfgScheduleCheckTxEnabled),
viper.GetUint64(cfgMaxQueueSize),
viper.GetUint64(cfgScheduleTxCacheSize),
)
}

func init() {
Flags.Bool(CfgScheduleCheckTxEnabled, false, "Enable checking transactions before scheduling them")
Flags.Uint64(cfgMaxQueueSize, 10000, "Maximum size of the scheduling queue")
Flags.Uint64(cfgScheduleTxCacheSize, 10000, "Cache size of recently scheduled transactions to prevent re-scheduling")

_ = viper.BindPFlags(Flags)
}
10 changes: 8 additions & 2 deletions go/worker/compute/executor/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func testQueueTx(
}

// Queue a test call.
testCall := []byte("hello world")
// Include a timestamp so each test invocation uses an unique key.
testCall := []byte("hello world at: " + time.Now().String())
err = rtNode.QueueTx(testCall)
require.NoError(t, err, "QueueCall")

Expand Down Expand Up @@ -129,7 +130,8 @@ blockLoop:
})
defer tree.Close()

txs, err := tree.GetTransactions(ctx)
var txs []*transaction.Transaction
txs, err = tree.GetTransactions(ctx)
require.NoError(t, err, "GetTransactions")
require.Len(t, txs, 1, "there should be one transaction")
require.EqualValues(t, testCall, txs[0].Input)
Expand All @@ -145,4 +147,8 @@ blockLoop:
t.Fatalf("failed to receive block")
}
}

// Requeuing same call should fail.
err = rtNode.QueueTx(testCall)
require.Error(t, err, "QueueCall duplicate transaction")
}
5 changes: 4 additions & 1 deletion go/worker/compute/executor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Worker struct {

scheduleCheckTxEnabled bool
scheduleMaxQueueSize uint64
scheduleTxCacheSize uint64

commonWorker *workerCommon.Worker
registration *registration.Worker
Expand Down Expand Up @@ -148,7 +149,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
}

// Create committee node for the given runtime.
node, err := committee.NewNode(commonNode, w.commonWorker.GetConfig(), rp, w.scheduleCheckTxEnabled, w.scheduleMaxQueueSize)
node, err := committee.NewNode(commonNode, w.commonWorker.GetConfig(), rp, w.scheduleCheckTxEnabled, w.scheduleMaxQueueSize, w.scheduleTxCacheSize)
if err != nil {
return err
}
Expand All @@ -170,6 +171,7 @@ func newWorker(
registration *registration.Worker,
scheduleCheckTxEnabled bool,
scheduleMaxQueueSize uint64,
scheduleTxCacheSize uint64,
) (*Worker, error) {
ctx, cancelCtx := context.WithCancel(context.Background())

Expand All @@ -178,6 +180,7 @@ func newWorker(
commonWorker: commonWorker,
scheduleCheckTxEnabled: scheduleCheckTxEnabled,
scheduleMaxQueueSize: scheduleMaxQueueSize,
scheduleTxCacheSize: scheduleTxCacheSize,
registration: registration,
runtimes: make(map[common.Namespace]*committee.Node),
ctx: ctx,
Expand Down

0 comments on commit 6a28068

Please sign in to comment.