Skip to content

Commit

Permalink
Merge pull request #3222 from oasisprotocol/ptrus/feature/scheduler-u…
Browse files Browse the repository at this point in the history
…pdate-parameters

go/worker/executor: update scheduling parameters on runtime updates
  • Loading branch information
ptrus authored Aug 28, 2020
2 parents 1b7ede7 + 45ca039 commit e732593
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 27 deletions.
4 changes: 4 additions & 0 deletions .changelog/3203.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Executor should refresh runtime scheduling parameters

Fixes the executor node to watch for runtime scheduling parameter changes and
if needed update its scheduling configuration.
7 changes: 7 additions & 0 deletions go/runtime/scheduling/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package api

import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
)

// Scheduler defines an algorithm for scheduling incoming transactions.
type Scheduler interface {
// Name is the scheduler algorithm name.
Name() string

// Initialize initializes the internal scheduler state.
// Scheduler should use the provided transaction dispatcher to dispatch
// transactions.
Expand Down Expand Up @@ -41,6 +45,9 @@ type Scheduler interface {
// IsQueued returns if a transaction is queued.
IsQueued(hash.Hash) bool

// UpdateParameters updates the scheduling parameters.
UpdateParameters(registry.TxnSchedulerParameters) error

// Clear clears the transaction queue.
Clear()
}
Expand Down
9 changes: 5 additions & 4 deletions go/runtime/scheduling/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package scheduling
import (
"fmt"

registry "github.com/oasisprotocol/oasis-core/go/registry/api"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling/api"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling/simple"
)

// New creates a new scheduler.
func New(name string, maxQueueSize, maxBatchSize, maxBatchSizeBytes uint64) (api.Scheduler, error) {
switch name {
func New(maxQueueSize uint64, params registry.TxnSchedulerParameters) (api.Scheduler, error) {
switch params.Algorithm {
case simple.Name:
return simple.New(maxQueueSize, maxBatchSize, maxBatchSizeBytes)
return simple.New(maxQueueSize, params)
default:
return nil, fmt.Errorf("invalid transaction scheduler algorithm: %s", name)
return nil, fmt.Errorf("invalid transaction scheduler algorithm: %s", params.Algorithm)
}
}
7 changes: 7 additions & 0 deletions go/runtime/scheduling/simple/incoming_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ func (q *incomingQueue) Take(force bool) (transaction.RawBatch, error) {
return batch, nil
}

func (q *incomingQueue) updateConfig(maxBatchSize, maxBatchSizeBytes uint64) {
q.Lock()
defer q.Unlock()
q.maxBatchSize = maxBatchSize
q.maxBatchSizeBytes = maxBatchSizeBytes
}

func newIncomingQueue(maxQueueSize, maxBatchSize, maxBatchSizeBytes uint64) *incomingQueue {
return &incomingQueue{
callHashes: make(map[hash.Hash]bool),
Expand Down
32 changes: 18 additions & 14 deletions go/runtime/scheduling/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package simple

import (
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
Expand All @@ -14,20 +16,13 @@ const (
)

type scheduler struct {
cfg config
incomingQueue *incomingQueue

dispatcher api.TransactionDispatcher

logger *logging.Logger
}

type config struct {
maxQueueSize uint64
maxBatchSize uint64
maxBatchSizeBytes uint64
}

func (s *scheduler) scheduleBatch(force bool) error {
batch, err := s.incomingQueue.Take(force)
if err != nil && err != errNoBatchAvailable {
Expand Down Expand Up @@ -132,16 +127,25 @@ func (s *scheduler) IsInitialized() bool {
return s.dispatcher != nil
}

func (s *scheduler) UpdateParameters(params registry.TxnSchedulerParameters) error {
if params.Algorithm != Name {
return fmt.Errorf("unexpected transaction scheduling algorithm: %s", params.Algorithm)
}
s.incomingQueue.updateConfig(params.MaxBatchSize, params.MaxBatchSizeBytes)
return nil
}

func (s *scheduler) Name() string {
return Name
}

// New creates a new simple scheduler.
func New(maxQueueSize, maxBatchSize, maxBatchSizeBytes uint64) (api.Scheduler, error) {
cfg := config{
maxQueueSize: maxQueueSize,
maxBatchSize: maxBatchSize,
maxBatchSizeBytes: maxBatchSizeBytes,
func New(maxQueueSize uint64, params registry.TxnSchedulerParameters) (api.Scheduler, error) {
if params.Algorithm != Name {
return nil, fmt.Errorf("unexpected transaction scheduling algorithm: %s", params.Algorithm)
}
scheduler := &scheduler{
cfg: cfg,
incomingQueue: newIncomingQueue(cfg.maxQueueSize, cfg.maxBatchSize, cfg.maxBatchSizeBytes),
incomingQueue: newIncomingQueue(maxQueueSize, params.MaxBatchSize, params.MaxBatchSizeBytes),
logger: logging.GetLogger("runtime/scheduling").With("scheduler", "simple"),
}

Expand Down
8 changes: 7 additions & 1 deletion go/runtime/scheduling/simple/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import (

"github.com/stretchr/testify/require"

registry "github.com/oasisprotocol/oasis-core/go/registry/api"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling/tests"
)

func TestSimpleScheduler(t *testing.T) {
algo, err := New(100, 10, 16*1024*1024)
params := registry.TxnSchedulerParameters{
Algorithm: Name,
MaxBatchSize: 10,
MaxBatchSizeBytes: 16 * 1024 * 1024,
}
algo, err := New(100, params)
require.NoError(t, err, "New()")

tests.SchedulerImplementationTests(t, algo)
Expand Down
38 changes: 36 additions & 2 deletions go/runtime/scheduling/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
"github.com/oasisprotocol/oasis-core/go/runtime/scheduling/api"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func testScheduleTransactions(t *testing.T, td *testDispatcher, scheduler api.Sc
// Test FlushTx.
err = scheduler.Flush(false)
require.NoError(t, err, "Flush(force=false)")
require.Equal(t, 1, scheduler.UnscheduledSize(), "transaction should remain scheduled after a non-forced flush")
require.Equal(t, 1, scheduler.UnscheduledSize(), "transaction should remain unscheduled after a non-forced flush")
require.True(t, scheduler.IsQueued(txBytes), "IsQueued(tx)")
err = scheduler.Flush(true)
require.NoError(t, err, "Flush(force=true)")
Expand Down Expand Up @@ -91,6 +92,7 @@ func testScheduleTransactions(t *testing.T, td *testDispatcher, scheduler api.Sc
require.False(t, scheduler.IsQueued(tx2Bytes), "IsQueued(tx)")
require.Equal(t, 1, len(td.DispatchedBatches), "one batch should be dispatched")
require.EqualValues(t, transaction.RawBatch{testTx2}, td.DispatchedBatches[0], "transaction should be dispatched")
td.Clear()

// Test schedule batch.
testBatch := [][]byte{
Expand All @@ -104,6 +106,38 @@ func testScheduleTransactions(t *testing.T, td *testDispatcher, scheduler api.Sc
for _, tx := range testBatch {
require.True(t, scheduler.IsQueued(hash.NewFromBytes(tx)), fmt.Sprintf("IsQueued(%s)", tx))
}
// Clear the queue.
err = scheduler.Flush(true)
require.NoError(t, err, "Flush(force=true)")
require.Equal(t, 0, scheduler.UnscheduledSize(), "no transactions after flushing")

td.Clear()
// Test Update configuration.
// First insert a transaction.
err = scheduler.ScheduleTx(testTx)
require.NoError(t, err, "ScheduleTx(testTx)")
// Make sure transaction doesn't get scheduled.
err = scheduler.Flush(false)
require.NoError(t, err, "Flush(force=false)")
require.Equal(t, 1, scheduler.UnscheduledSize(), "transaction should remain unscheduled after a non-forced flush")
// Update configuration to BatchSize=1.
err = scheduler.UpdateParameters(
registry.TxnSchedulerParameters{
Algorithm: scheduler.Name(),
MaxBatchSize: 1,
MaxBatchSizeBytes: 10000,
},
)
require.NoError(t, err, "UpdateParameters")
// Make sure transaction gets scheduled now.
err = scheduler.Flush(false)
require.NoError(t, err, "Flush(force=false)")
require.Equal(t, 0, scheduler.UnscheduledSize(), "transaction should get scheduled after a non-forced flush")

// Test invalid udpate.
err = scheduler.UpdateParameters(
registry.TxnSchedulerParameters{
Algorithm: "invalid",
},
)
require.Error(t, err, "UpdateParameters invalid udpate")
}
31 changes: 25 additions & 6 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,8 +1258,6 @@ func (n *Node) worker() {
defer hrtNotifier.Stop()

// Initialize transaction scheduling algorithm.
// TODO: watch for updates.
// https://github.com/oasisprotocol/oasis-core/issues/3203
runtime, err := n.commonNode.Runtime.RegistryDescriptor(n.ctx)
if err != nil {
n.logger.Error("failed to fetch runtime registry descriptor",
Expand All @@ -1268,18 +1266,16 @@ func (n *Node) worker() {
return
}
scheduler, err := scheduling.New(
runtime.TxnScheduler.Algorithm,
n.scheduleMaxQueueSize,
runtime.TxnScheduler.MaxBatchSize,
runtime.TxnScheduler.MaxBatchSizeBytes,
runtime.TxnScheduler,
)
if err != nil {
n.logger.Error("failed to create new transaction scheduler algorithm",
"err", err,
)
return
}
if err := scheduler.Initialize(n); err != nil {
if err = scheduler.Initialize(n); err != nil {
n.logger.Error("failed initializing transaction scheduler algorithm",
"err", err,
)
Expand All @@ -1293,6 +1289,16 @@ func (n *Node) worker() {
txnScheduleTicker := time.NewTicker(runtime.TxnScheduler.BatchFlushTimeout)
defer txnScheduleTicker.Stop()

// Watch runtime descriptor updates.
rtCh, rtSub, err := n.commonNode.Runtime.WatchRegistryDescriptor()
if err != nil {
n.logger.Error("failed to watch runtimes",
"err", err,
)
return
}
defer rtSub.Close()

// We are initialized.
close(n.initCh)

Expand Down Expand Up @@ -1373,6 +1379,19 @@ func (n *Node) worker() {
}
n.proposeBatchLocked(batch)
}()
case runtime := <-rtCh:
// XXX: Once there is more than one scheduling algorithm available
// this might need to recreate the scheduler and reinsert
// the transactions.
// At that point also update the schedulerMutex usage across the
// code, as it will be no longer be true that the scheduler
// variable never gets updated.
if err = n.scheduler.UpdateParameters(runtime.TxnScheduler); err != nil {
n.logger.Error("error updating scheduler parameters",
"err", err,
)
return
}
case <-txnScheduleTicker.C:
// Flush a batch from algorithm.
n.scheduler.Flush(true)
Expand Down

0 comments on commit e732593

Please sign in to comment.