Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NIT-2740] Horizontal Scaling of Validation Node #2354

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6f0bade
Use redis streams in bold
anodar May 30, 2024
2851ef3
Use different streams for bold
anodar May 31, 2024
d15ef6a
Initialize bold execution runner
anodar Jun 5, 2024
9100a8d
Merge branch 'master' into sepolia-tooling-merge-redis
anodar Jun 5, 2024
e2c20ce
Add stream connection and timeout logic
anodar Jun 5, 2024
40b6483
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Jul 1, 2024
5d30cc9
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
301ea72
fix config
amsanghi Aug 27, 2024
0bbc41e
clean up
amsanghi Aug 27, 2024
3cb9241
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 27, 2024
b9c8900
remove redudtant changes
amsanghi Aug 27, 2024
8fcd8b7
fix config setup
amsanghi Aug 28, 2024
f3fc6f3
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
ca909c8
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
f0bc4d4
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
b421fb0
Update validator/server_arb/redis/consumer.go
amsanghi Aug 28, 2024
aeb0e24
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 28, 2024
596ff81
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Aug 29, 2024
4fbd706
Add tests and fix some bugs
amsanghi Aug 29, 2024
5170a79
Update validator/client/redis/boldproducer.go
rauljordan Aug 29, 2024
545f9d2
add metrics
amsanghi Aug 30, 2024
e9e0224
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
9be8a33
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 9, 2024
161fcca
Merge branch 'bold-review' into sepolia-tooling-merge-redis
amsanghi Oct 28, 2024
d895bc4
fix lint
amsanghi Oct 29, 2024
8edfbda
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Dec 10, 2024
f765a89
fix lint
amsanghi Dec 10, 2024
a2cb261
fix lint
amsanghi Dec 10, 2024
bc70dfb
fix lint
amsanghi Dec 10, 2024
70f64a1
Merge branch 'master' into sepolia-tooling-merge-redis
eljobe Dec 17, 2024
b0adab0
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Jan 3, 2025
3da52e3
Changes based on PR comments
amsanghi Jan 3, 2025
1b1fc9f
Changes based on PR comments
amsanghi Jan 3, 2025
fa80856
Changes based on PR comments
amsanghi Jan 3, 2025
580db8d
Changes based on PR comments
amsanghi Jan 3, 2025
87b68d5
remove
amsanghi Jan 3, 2025
a9ab22e
Changes based on PR comments
amsanghi Jan 3, 2025
7999540
Merge branch 'master' into sepolia-tooling-merge-redis
amsanghi Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 49 additions & 45 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@ type BlockValidator struct {
}

type BlockValidatorConfig struct {
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
RecordingIterLimit uint64 `koanf:"recording-iter-limit"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
BatchCacheLimit uint32 `koanf:"batch-cache-limit"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`
Enable bool `koanf:"enable"`
RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"`
RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"`
ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"`
ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"`
ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"`
PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"`
RecordingIterLimit uint64 `koanf:"recording-iter-limit"`
ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"`
BatchCacheLimit uint32 `koanf:"batch-cache-limit"`
CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload
PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload
FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"`
Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"`
MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"`
ValidationServerConfigsList string `koanf:"validation-server-configs-list"`
// The directory to which the BlockValidator will write the
// block_inputs_<id>.json files when WriteToFile() is called.
BlockInputsFilePath string `koanf:"block-inputs-file-path"`
Expand Down Expand Up @@ -181,6 +182,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation")
rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer)
redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f)
redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f)
f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds")
f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations")
f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)")
Expand All @@ -200,39 +202,41 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) {
}

var DefaultBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
ValidationPoll: time.Second,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
BatchCacheLimit: 20,
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
BlockInputsFilePath: "./target/validation_inputs",
MemoryFreeLimit: "default",
RecordingIterLimit: 20,
Enable: false,
ValidationServerConfigsList: "default",
ValidationServer: rpcclient.DefaultClientConfig,
RedisValidationClientConfig: redis.DefaultValidationClientConfig,
RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig,
amsanghi marked this conversation as resolved.
Show resolved Hide resolved
ValidationPoll: time.Second,
ForwardBlocks: 128,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
BatchCacheLimit: 20,
CurrentModuleRoot: "current",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
BlockInputsFilePath: "./target/validation_inputs",
MemoryFreeLimit: "default",
RecordingIterLimit: 20,
}

var TestBlockValidatorConfig = BlockValidatorConfig{
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
BatchCacheLimit: 20,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
RecordingIterLimit: 20,
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
BlockInputsFilePath: "./target/validation_inputs",
MemoryFreeLimit: "default",
Enable: false,
ValidationServer: rpcclient.TestClientConfig,
ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig},
RedisValidationClientConfig: redis.TestValidationClientConfig,
RedisBoldValidationClientConfig: redis.TestValidationClientConfig,
ValidationPoll: 100 * time.Millisecond,
ForwardBlocks: 128,
BatchCacheLimit: 20,
PrerecordedBlocks: uint64(2 * runtime.NumCPU()),
RecordingIterLimit: 20,
CurrentModuleRoot: "latest",
PendingUpgradeModuleRoot: "latest",
FailureIsFatal: true,
Dangerous: DefaultBlockValidatorDangerousConfig,
BlockInputsFilePath: "./target/validation_inputs",
MemoryFreeLimit: "default",
}

var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{
Expand Down
1 change: 0 additions & 1 deletion staker/bold/bold_state_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (s *BOLDStateProvider) CollectMachineHashes(
if err != nil {
return nil, err
}
// TODO: Enable Redis streams.
wasmModRoot := cfg.AssertionMetadata.WasmModuleRoot
execRun, err := s.statelessValidator.ExecutionSpawners()[0].CreateExecutionRun(wasmModRoot, input, true).Await(ctx)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ func NewStatelessBlockValidator(
for i := range configs {
i := i
confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] }
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack))
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack))
if i == 0 {
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack))
}
}

if len(executionSpawners) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion system_tests/full_challenge_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall
var mockSpawn *mockSpawner
builder.valnodeConfig.Wasm.RootPath = wasmRootDir
if useStubs {
mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator)
mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator, "")
} else {
// For now validation only works with HashScheme set
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
Expand Down
59 changes: 49 additions & 10 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/validator"
validatorclient "github.com/offchainlabs/nitro/validator/client"
clientredis "github.com/offchainlabs/nitro/validator/client/redis"
"github.com/offchainlabs/nitro/validator/server_api"
"github.com/offchainlabs/nitro/validator/server_arb"
arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis"
"github.com/offchainlabs/nitro/validator/valnode"
"github.com/offchainlabs/nitro/validator/valnode/redis"
)

type mockSpawner struct {
Expand Down Expand Up @@ -161,7 +165,7 @@ func (r *mockExecRun) CheckAlive(ctx context.Context) error {

func (r *mockExecRun) Close() {}

func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig) (*mockSpawner, *node.Node) {
func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig, redisURL string) (*mockSpawner, *node.Node) {
stackConf := node.DefaultConfig
stackConf.HTTPPort = 0
stackConf.DataDir = ""
Expand All @@ -179,7 +183,18 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_
}
configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config }
spawner := &mockSpawner{}
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher)
var redisExecSpawner *arbredis.ExecutionSpawner
if redisURL != "" {
redisValidationServerConfig := redis.TestValidationServerConfig
redisValidationServerConfig.RedisURL = redisURL
redisValidationServerConfig.ModuleRoots = make([]string, len(mockWasmModuleRoots))
for i := range redisValidationServerConfig.ModuleRoots {
redisValidationServerConfig.ModuleRoots[i] = mockWasmModuleRoots[i].Hex()
}
redisExecSpawner, err = arbredis.NewExecutionSpawner(&redisValidationServerConfig, spawner)
Require(t, err)
}
serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, redisExecSpawner, configFetcher)

valAPIs := []rpc.API{{
Namespace: server_api.Namespace,
Expand All @@ -206,11 +221,28 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_

// mostly tests translation to/from json and running over network
func TestValidationServerAPI(t *testing.T) {
testValidationServerAPI(t, false)
}

// mostly tests translation to/from json and running over network with bold validation redis consumer/producer
func TestValidationServerAPIWithBoldValidationConsumerProducer(t *testing.T) {
testValidationServerAPI(t, true)
}
func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bool) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault)
var redisURL string
var redisBoldValidationClientConfig *clientredis.ValidationClientConfig
if withBoldValidationConsumerProducer {
redisURL = redisutil.CreateTestRedis(ctx, t)
redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig
redisBoldValidationClientConfig.RedisURL = redisURL
redisBoldValidationClientConfig.CreateStreams = true
}

_, validationDefault := createMockValidationNode(t, ctx, nil, redisURL)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -280,14 +312,21 @@ func TestValidationServerAPI(t *testing.T) {
if !bytes.Equal(proof, mockProof) {
t.Error("mock proof not expected")
}

hashes := execRun.GetMachineHashesWithStepSize(0, 1, 5)
hashesRes, err := hashes.Await(ctx)
Require(t, err)
if len(hashesRes) != 5 {
t.Error("unexpected number of hashes")
}
}

func TestValidationClientRoom(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack)
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil, "")
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -368,16 +407,16 @@ func TestExecutionKeepAlive(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, validationDefault := createMockValidationNode(t, ctx, nil)
_, validationDefault := createMockValidationNode(t, ctx, nil, "")
shortTimeoutConfig := server_arb.DefaultArbitratorSpawnerConfig
shortTimeoutConfig.ExecutionRunTimeout = time.Second
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig)
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig, "")
configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig)

clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault)
clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault)
err := clientDefault.Start(ctx)
Require(t, err)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO)
err = clientShortTO.Start(ctx)
Require(t, err)

Expand Down
86 changes: 86 additions & 0 deletions validator/client/redis/boldproducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package redis

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator/server_api"
)

// BoldValidationClient implements bold validation client through redis streams.
type BoldValidationClient struct {
stopwaiter.StopWaiter
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]
config *ValidationClientConfig
}

func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) {
return &BoldValidationClient{
producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]),
config: cfg,
}, nil
}

func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
if c.config.RedisURL == "" {
return fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL)
if err != nil {
return err
}
for _, mr := range moduleRoots {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already exists for module root", "hash", mr)
continue
}
p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](
redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
}
c.producers[mr] = p
}
return nil
}

func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] {
producer, found := c.producers[req.ModuleRoot]
if !found {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot))
}
promise, err := producer.Produce(c.GetContext(), req)
if err != nil {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err))
}
return promise
}

func (c *BoldValidationClient) Start(ctx_in context.Context) error {
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *BoldValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}
Loading
Loading