From 6f0bade28bd6268e0e7029e51685d4134ae379cd Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Thu, 30 May 2024 13:18:40 +0200 Subject: [PATCH 01/23] Use redis streams in bold --- staker/block_validator.go | 85 +++++++++--------- staker/stateless_block_validator.go | 14 ++- system_tests/validation_mock_test.go | 2 +- validator/client/redis/boldproducer.go | 106 +++++++++++++++++++++++ validator/client/validation_client.go | 35 ++++++-- validator/server_api/json.go | 9 ++ validator/server_arb/redis/consumer.go | 115 +++++++++++++++++++++++++ validator/valnode/validation_api.go | 14 ++- validator/valnode/valnode.go | 44 +++++++--- 9 files changed, 361 insertions(+), 63 deletions(-) create mode 100644 validator/client/redis/boldproducer.go create mode 100644 validator/server_arb/redis/consumer.go diff --git a/staker/block_validator.go b/staker/block_validator.go index ddf658d810..1083fd3b26 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -91,21 +91,22 @@ type BlockValidator struct { } type BlockValidatorConfig struct { - Enable bool `koanf:"enable"` - Evil bool `koanf:"evil"` - EvilInterceptDepositGwei uint64 `koanf:"evil-intercept-deposit-gwei"` - RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` - ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` - ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` - ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` - PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` - ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` - CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload - PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload - FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` - Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` - MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` - ValidationServerConfigsList string `koanf:"validation-server-configs-list"` + Enable bool `koanf:"enable"` + Evil bool `koanf:"evil"` + EvilInterceptDepositGwei uint64 `koanf:"evil-intercept-deposit-gwei"` + RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` + RedisBoldValidationClientConfig redis.BoldValidationClientConfig `koanf:"redis-bold-validation-client-config"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` + ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` + PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` + ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` + CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload + PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload + FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` + Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` memoryFreeLimit int } @@ -167,35 +168,37 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) { } var DefaultBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - ValidationServerConfigsList: "default", - ValidationServer: rpcclient.DefaultClientConfig, - RedisValidationClientConfig: redis.DefaultValidationClientConfig, - ValidationPoll: time.Second, - ForwardBlocks: 1024, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - CurrentModuleRoot: "current", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - MemoryFreeLimit: "default", - EvilInterceptDepositGwei: 1_000_000, // 1M gwei or 0.001 ETH + Enable: false, + ValidationServerConfigsList: "default", + ValidationServer: rpcclient.DefaultClientConfig, + RedisValidationClientConfig: redis.DefaultValidationClientConfig, + RedisBoldValidationClientConfig: redis.DefaultBoldValidationClientConfig, + ValidationPoll: time.Second, + ForwardBlocks: 1024, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + CurrentModuleRoot: "current", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "default", + EvilInterceptDepositGwei: 1_000_000, // 1M gwei or 0.001 ETH } var TestBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - EvilInterceptDepositGwei: 1_000_000, // 1M gwei or 0.001 ETH - ValidationServer: rpcclient.TestClientConfig, - ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, - RedisValidationClientConfig: redis.TestValidationClientConfig, - ValidationPoll: 100 * time.Millisecond, - ForwardBlocks: 128, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - CurrentModuleRoot: "latest", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - MemoryFreeLimit: "default", + Enable: false, + EvilInterceptDepositGwei: 1_000_000, // 1M gwei or 0.001 ETH + ValidationServer: rpcclient.TestClientConfig, + ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, + RedisValidationClientConfig: redis.TestValidationClientConfig, + RedisBoldValidationClientConfig: redis.TestBoldValidationClientConfig, + ValidationPoll: 100 * time.Millisecond, + ForwardBlocks: 128, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + CurrentModuleRoot: "latest", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + MemoryFreeLimit: "default", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 96bd618eb6..cc463cc448 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -231,8 +231,11 @@ func NewStatelessBlockValidator( config func() *BlockValidatorConfig, stack *node.Node, ) (*StatelessBlockValidator, error) { - var executionSpawners []validator.ExecutionSpawner - var redisValClient *redis.ValidationClient + var ( + executionSpawners []validator.ExecutionSpawner + redisValClient *redis.ValidationClient + redisBoldValClient *redis.BoldValidationClient + ) if config().RedisValidationClientConfig.Enabled() { var err error @@ -241,6 +244,13 @@ func NewStatelessBlockValidator( return nil, fmt.Errorf("creating new redis validation client: %w", err) } } + if config().RedisBoldValidationClientConfig.Enabled() { + var err error + redisBoldValClient, err = redis.NewBoldValidationClient(&config().RedisBoldValidationClientConfig) + if err != nil { + return nil, fmt.Errorf("creating new redis bold validation client: %w", err) + } + } configs := config().ValidationServerConfigs for i := range configs { i := i diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 47b80d6f8a..a07a73219c 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -174,7 +174,7 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ } configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config } spawner := &mockSpawner{} - serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher) + serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, nil, configFetcher) valAPIs := []rpc.API{{ Namespace: server_api.Namespace, diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go new file mode 100644 index 0000000000..48da2e3b30 --- /dev/null +++ b/validator/client/redis/boldproducer.go @@ -0,0 +1,106 @@ +package redis + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" + "github.com/offchainlabs/nitro/pubsub" + "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator/server_api" + "github.com/spf13/pflag" +) + +type BoldValidationClientConfig struct { + RedisURL string `koanf:"redis-url"` +} + +func (c BoldValidationClientConfig) Enabled() bool { + return c.RedisURL != "" +} + +var DefaultBoldValidationClientConfig = BoldValidationClientConfig{ + RedisURL: "", +} + +var TestBoldValidationClientConfig = BoldValidationClientConfig{ + RedisURL: "", +} + +func BoldValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { + pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) +} + +// BoldValidationClient implements bold validation client through redis streams. +type BoldValidationClient struct { + stopwaiter.StopWaiter + // producers stores moduleRoot to producer mapping. + producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + producerConfig pubsub.ProducerConfig + redisClient redis.UniversalClient + moduleRoots []common.Hash +} + +func NewBoldValidationClient(cfg *BoldValidationClientConfig) (*BoldValidationClient, error) { + if cfg.RedisURL == "" { + return nil, fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) + if err != nil { + return nil, err + } + return &BoldValidationClient{ + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + redisClient: redisClient, + }, nil +} + +func (c *BoldValidationClient) Initialize(moduleRoots []common.Hash) error { + for _, mr := range moduleRoots { + if _, exists := c.producers[mr]; exists { + log.Warn("Producer already existsw for module root", "hash", mr) + continue + } + p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( + c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) + if err != nil { + log.Warn("failed init redis for %v: %w", mr, err) + continue + } + p.Start(c.GetContext()) + c.producers[mr] = p + c.moduleRoots = append(c.moduleRoots, mr) + } + return nil +} + +func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] { + producer, found := c.producers[req.ModuleRoot] + if !found { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot)) + } + promise, err := producer.Produce(c.GetContext(), req) + if err != nil { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err)) + } + return promise +} + +func (c *BoldValidationClient) Start(ctx_in context.Context) error { + for _, p := range c.producers { + p.Start(ctx_in) + } + c.StopWaiter.Start(ctx_in, c) + return nil +} + +func (c *BoldValidationClient) Stop() { + for _, p := range c.producers { + p.StopAndWait() + } + c.StopWaiter.StopAndWait() +} diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 4d15edd5e4..ae25e70414 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -17,6 +17,7 @@ import ( "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_common" @@ -115,11 +116,21 @@ func (c *ValidationClient) Room() int { type ExecutionClient struct { ValidationClient + boldValClient *redis.BoldValidationClient } -func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient { +func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig redis.BoldValidationClientConfig, stack *node.Node) *ExecutionClient { + var boldClient *redis.BoldValidationClient + if redisBoldValidationClientConfig.Enabled() { + var err error + boldClient, err = redis.NewBoldValidationClient(&redisBoldValidationClientConfig) + if err != nil { + log.Error("Creating new redis bold validation client", "error", err) + } + } return &ExecutionClient{ ValidationClient: *NewValidationClient(config, stack), + boldValClient: boldClient, } } @@ -131,8 +142,10 @@ func (c *ExecutionClient) CreateBoldExecutionRun(wasmModuleRoot common.Hash, ste return nil, err } run := &ExecutionClientRun{ - client: c, - id: res, + client: c, + id: res, + wasmModuleRoot: wasmModuleRoot, + input: input, } run.Start(c.GetContext()) // note: not this temporary thread's context! return run, nil @@ -157,8 +170,10 @@ func (c *ExecutionClient) CreateExecutionRun(wasmModuleRoot common.Hash, input * type ExecutionClientRun struct { stopwaiter.StopWaiter - client *ExecutionClient - id uint64 + client *ExecutionClient + id uint64 + wasmModuleRoot common.Hash + input *validator.ValidationInput } func (c *ExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] { @@ -213,6 +228,16 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[* } func (r *ExecutionClientRun) GetLeavesWithStepSize(fromBatch, machineStartIndex, stepSize, numDesiredLeaves uint64) containers.PromiseInterface[[]common.Hash] { + if r.client.boldValClient != nil { + return r.client.boldValClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{ + ModuleRoot: r.wasmModuleRoot, + FromBatch: fromBatch, + MachineStartIndex: machineStartIndex, + StepSize: stepSize, + NumDesiredLeaves: numDesiredLeaves, + ValidationInput: r.input, + }) + } return stopwaiter.LaunchPromiseThread[[]common.Hash](r, func(ctx context.Context) ([]common.Hash, error) { var resJson []common.Hash err := r.client.client.CallContext(ctx, &resJson, server_api.Namespace+"_getLeavesWithStepSize", r.id, fromBatch, machineStartIndex, stepSize, numDesiredLeaves) diff --git a/validator/server_api/json.go b/validator/server_api/json.go index e30a4c72f7..98712e33be 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -150,3 +150,12 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro } return valInput, nil } + +type GetLeavesWithStepSizeInput struct { + ModuleRoot common.Hash + FromBatch uint64 + MachineStartIndex uint64 + StepSize uint64 + NumDesiredLeaves uint64 + ValidationInput *validator.ValidationInput +} diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go new file mode 100644 index 0000000000..5b72b93b24 --- /dev/null +++ b/validator/server_arb/redis/consumer.go @@ -0,0 +1,115 @@ +package redis + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/pubsub" + "github.com/offchainlabs/nitro/util/redisutil" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator" + "github.com/offchainlabs/nitro/validator/server_api" + "github.com/spf13/pflag" +) + +type ExecutionSpawnerConfig struct { + RedisURL string `koanf:"redis-url"` + ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` + // Supported wasm module roots. + ModuleRoots []string `koanf:"module-roots"` +} + +var DefaultExecutionSpawnerConfig = ExecutionSpawnerConfig{ + RedisURL: "", + ConsumerConfig: pubsub.DefaultConsumerConfig, + ModuleRoots: []string{}, +} + +var TestExecutionSpawnerConfig = ExecutionSpawnerConfig{ + RedisURL: "", + ConsumerConfig: pubsub.TestConsumerConfig, + ModuleRoots: []string{}, +} + +func ExecutionSpawnerConfigAddOptions(prefix string, f *pflag.FlagSet) { + pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) + f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") +} + +func (cfg *ExecutionSpawnerConfig) Enabled() bool { + return cfg.RedisURL != "" +} + +type ExecutionSpawner struct { + stopwaiter.StopWaiter + spawner validator.ExecutionSpawner + + // consumers stores moduleRoot to consumer mapping. + consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] +} + +func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { + if cfg.RedisURL == "" { + return nil, fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) + if err != nil { + return nil, err + } + consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) + for _, hash := range cfg.ModuleRoots { + mr := common.HexToHash(hash) + c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig) + if err != nil { + return nil, fmt.Errorf("creating consumer for validation: %w", err) + } + consumers[mr] = c + } + return &ExecutionSpawner{ + consumers: consumers, + spawner: spawner, + }, nil +} + +func (s *ExecutionSpawner) Start(ctx_in context.Context) { + s.StopWaiter.Start(ctx_in, s) + s.spawner.Start(ctx_in) + for moduleRoot, c := range s.consumers { + c := c + c.Start(ctx_in) + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + run, err := s.spawner.CreateBoldExecutionRun(moduleRoot, req.Value.StepSize, + req.Value.ValidationInput).Await(ctx) + if err != nil { + log.Error("Creationg BOLD execution", "error", err) + return 0 + } + hashes, err := run.GetLeavesWithStepSize( + req.Value.FromBatch, + req.Value.MachineStartIndex, + req.Value.StepSize, + req.Value.NumDesiredLeaves).Await(ctx) + if err != nil { + log.Error("Getting leave hashes", "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, hashes); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) + return 0 + } + return time.Second + }) + } +} diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index 835641f9c3..4ec16a228e 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -17,6 +17,7 @@ import ( "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" ) type ValidationServerAPI struct { @@ -56,7 +57,8 @@ type execRunEntry struct { type ExecServerAPI struct { stopwaiter.StopWaiter ValidationServerAPI - execSpawner validator.ExecutionSpawner + execSpawner validator.ExecutionSpawner + redisExecSpawner *arbredis.ExecutionSpawner config server_arb.ArbitratorSpawnerConfigFecher @@ -65,10 +67,15 @@ type ExecServerAPI struct { runs map[uint64]*execRunEntry } -func NewExecutionServerAPI(valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { +func NewExecutionServerAPI( + valSpawner validator.ValidationSpawner, + execution validator.ExecutionSpawner, + redisExecSpawner *arbredis.ExecutionSpawner, + config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { return &ExecServerAPI{ ValidationServerAPI: *NewValidationServerAPI(valSpawner), execSpawner: execution, + redisExecSpawner: redisExecSpawner, nextId: rand.Uint64(), // good-enough to aver reusing ids after reboot runs: make(map[uint64]*execRunEntry), config: config, @@ -128,6 +135,9 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration { func (a *ExecServerAPI) Start(ctx_in context.Context) { a.StopWaiter.Start(ctx_in, a) a.CallIteratively(a.removeOldRuns) + if a.redisExecSpawner != nil { + a.redisExecSpawner.Start(ctx_in) + } } func (a *ExecServerAPI) WriteToFile(ctx context.Context, jsonInput *server_api.InputJSON, expOut validator.GoGlobalState, moduleRoot common.Hash) error { diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 972e11189d..230c918dd7 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -14,6 +14,8 @@ import ( "github.com/offchainlabs/nitro/validator/server_jit" "github.com/offchainlabs/nitro/validator/valnode/redis" "github.com/spf13/pflag" + + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" ) type WasmConfig struct { @@ -35,12 +37,13 @@ var DefaultWasmConfig = WasmConfig{ } type Config struct { - UseJit bool `koanf:"use-jit"` - ApiAuth bool `koanf:"api-auth"` - ApiPublic bool `koanf:"api-public"` - Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` - Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` - Wasm WasmConfig `koanf:"wasm"` + UseJit bool `koanf:"use-jit"` + ApiAuth bool `koanf:"api-auth"` + ApiPublic bool `koanf:"api-public"` + Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` + RedisExecRunner arbredis.ExecutionSpawnerConfig `koanf:"redis-exec-runnner"` + Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` + Wasm WasmConfig `koanf:"wasm"` } type ValidationConfigFetcher func() *Config @@ -77,7 +80,8 @@ type ValidationNode struct { arbSpawner *server_arb.ArbitratorSpawner jitSpawner *server_jit.JitSpawner - redisConsumer *redis.ValidationServer + redisConsumer *redis.ValidationServer + redisExecSpawner *arbredis.ExecutionSpawner } func EnsureValidationExposedViaAuthRPC(stackConf *node.Config) { @@ -106,8 +110,18 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - var serverAPI *ExecServerAPI - var jitSpawner *server_jit.JitSpawner + var ( + serverAPI *ExecServerAPI + jitSpawner *server_jit.JitSpawner + redisSpawner *arbredis.ExecutionSpawner + ) + if config.RedisExecRunner.Enabled() { + es, err := arbredis.NewExecutionSpawner(&config.RedisExecRunner, arbSpawner) + if err != nil { + log.Error("creating redis execution spawner", "error", err) + } + redisSpawner = es + } if config.UseJit { jitConfigFetcher := func() *server_jit.JitSpawnerConfig { return &configFetcher().Jit } var err error @@ -115,9 +129,9 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, redisSpawner, arbConfigFetcher) } else { - serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, redisSpawner, arbConfigFetcher) } var redisConsumer *redis.ValidationServer redisValidationConfig := arbConfigFetcher().RedisValidationServerConfig @@ -136,7 +150,13 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod }} stack.RegisterAPIs(valAPIs) - return &ValidationNode{configFetcher, arbSpawner, jitSpawner, redisConsumer}, nil + return &ValidationNode{ + config: configFetcher, + arbSpawner: arbSpawner, + jitSpawner: jitSpawner, + redisConsumer: redisConsumer, + redisExecSpawner: redisSpawner, + }, nil } func (v *ValidationNode) Start(ctx context.Context) error { From 2851ef3368a7f85114fcebe44387d9f0a4c8a864 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Fri, 31 May 2024 16:15:16 +0200 Subject: [PATCH 02/23] Use different streams for bold --- validator/client/redis/boldproducer.go | 2 +- validator/server_api/json.go | 4 ++++ validator/server_arb/redis/consumer.go | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index 48da2e3b30..93bf5866a4 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -66,7 +66,7 @@ func (c *BoldValidationClient) Initialize(moduleRoots []common.Hash) error { continue } p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( - c.redisClient, server_api.RedisStreamForRoot(mr), &c.producerConfig) + c.redisClient, server_api.RedisBoldStreamForRoot(mr), &c.producerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 98712e33be..ca2b69a241 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -47,6 +47,10 @@ func RedisStreamForRoot(moduleRoot common.Hash) string { return fmt.Sprintf("stream:%s", moduleRoot.Hex()) } +func RedisBoldStreamForRoot(moduleRoot common.Hash) string { + return fmt.Sprintf("stream-bold:%s", moduleRoot.Hex()) +} + type Request struct { Input *InputJSON ModuleRoot common.Hash diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index 5b72b93b24..63c7ab24ae 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -62,7 +62,7 @@ func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.Executio consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } From d15ef6acac0edb1b15857f4887697ef90b4c7935 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 5 Jun 2024 10:30:42 +0200 Subject: [PATCH 03/23] Initialize bold execution runner --- staker/stateless_block_validator.go | 18 +++++++----------- system_tests/validation_mock_test.go | 8 ++++---- validator/client/validation_client.go | 6 +++--- validator/server_arb/redis/consumer.go | 5 ++++- validator/valnode/valnode.go | 2 +- 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index cc463cc448..a2f88830f9 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -232,9 +232,8 @@ func NewStatelessBlockValidator( stack *node.Node, ) (*StatelessBlockValidator, error) { var ( - executionSpawners []validator.ExecutionSpawner - redisValClient *redis.ValidationClient - redisBoldValClient *redis.BoldValidationClient + executionSpawners []validator.ExecutionSpawner + redisValClient *redis.ValidationClient ) if config().RedisValidationClientConfig.Enabled() { @@ -244,18 +243,15 @@ func NewStatelessBlockValidator( return nil, fmt.Errorf("creating new redis validation client: %w", err) } } - if config().RedisBoldValidationClientConfig.Enabled() { - var err error - redisBoldValClient, err = redis.NewBoldValidationClient(&config().RedisBoldValidationClientConfig) - if err != nil { - return nil, fmt.Errorf("creating new redis bold validation client: %w", err) - } - } + configs := config().ValidationServerConfigs for i := range configs { i := i confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } - executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack)) + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack)) + if i == 0 { + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack)) + } } if len(executionSpawners) == 0 { diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index a07a73219c..90c41de138 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -205,7 +205,7 @@ func TestValidationServerAPI(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() _, validationDefault := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), validationDefault) + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, validationDefault) err := client.Start(ctx) Require(t, err) @@ -282,7 +282,7 @@ func TestValidationClientRoom(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack) + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack) err := client.Start(ctx) Require(t, err) @@ -369,10 +369,10 @@ func TestExecutionKeepAlive(t *testing.T) { _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) - clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault) + clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault) err := clientDefault.Start(ctx) Require(t, err) - clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO) + clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO) err = clientShortTO.Start(ctx) Require(t, err) diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index ae25e70414..0c9a627576 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -119,11 +119,11 @@ type ExecutionClient struct { boldValClient *redis.BoldValidationClient } -func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig redis.BoldValidationClientConfig, stack *node.Node) *ExecutionClient { +func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.BoldValidationClientConfig, stack *node.Node) *ExecutionClient { var boldClient *redis.BoldValidationClient - if redisBoldValidationClientConfig.Enabled() { + if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() { var err error - boldClient, err = redis.NewBoldValidationClient(&redisBoldValidationClientConfig) + boldClient, err = redis.NewBoldValidationClient(redisBoldValidationClientConfig) if err != nil { log.Error("Creating new redis bold validation client", "error", err) } diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index 63c7ab24ae..49c7b5f757 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -76,7 +76,10 @@ func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.Executio func (s *ExecutionSpawner) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) - s.spawner.Start(ctx_in) + if err := s.spawner.Start(ctx_in); err != nil { + log.Error("Starting spawner", "error", err) + return + } for moduleRoot, c := range s.consumers { c := c c.Start(ctx_in) diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 230c918dd7..21103754d5 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -41,7 +41,7 @@ type Config struct { ApiAuth bool `koanf:"api-auth"` ApiPublic bool `koanf:"api-public"` Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` - RedisExecRunner arbredis.ExecutionSpawnerConfig `koanf:"redis-exec-runnner"` + RedisExecRunner arbredis.ExecutionSpawnerConfig `koanf:"redis-exec-runner"` Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` Wasm WasmConfig `koanf:"wasm"` } From e2c20ce54535ac0eb90d77337ebdb5c2b914ae25 Mon Sep 17 00:00:00 2001 From: Nodar Ambroladze Date: Wed, 5 Jun 2024 10:38:10 +0200 Subject: [PATCH 04/23] Add stream connection and timeout logic --- validator/client/redis/boldproducer.go | 23 +++-- validator/server_arb/redis/consumer.go | 116 +++++++++++++++++-------- 2 files changed, 98 insertions(+), 41 deletions(-) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index 93bf5866a4..9b4dcc8e9a 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -16,7 +16,8 @@ import ( ) type BoldValidationClientConfig struct { - RedisURL string `koanf:"redis-url"` + RedisURL string `koanf:"redis-url"` + CreateStreams bool `koanf:"create-streams"` } func (c BoldValidationClientConfig) Enabled() bool { @@ -24,15 +25,18 @@ func (c BoldValidationClientConfig) Enabled() bool { } var DefaultBoldValidationClientConfig = BoldValidationClientConfig{ - RedisURL: "", + RedisURL: "", + CreateStreams: true, } var TestBoldValidationClientConfig = BoldValidationClientConfig{ - RedisURL: "", + RedisURL: "", + CreateStreams: false, } func BoldValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) + f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") } // BoldValidationClient implements bold validation client through redis streams. @@ -43,6 +47,7 @@ type BoldValidationClient struct { producerConfig pubsub.ProducerConfig redisClient redis.UniversalClient moduleRoots []common.Hash + createStreams bool } func NewBoldValidationClient(cfg *BoldValidationClientConfig) (*BoldValidationClient, error) { @@ -54,13 +59,19 @@ func NewBoldValidationClient(cfg *BoldValidationClientConfig) (*BoldValidationCl return nil, err } return &BoldValidationClient{ - producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), - redisClient: redisClient, + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + redisClient: redisClient, + createStreams: cfg.CreateStreams, }, nil } -func (c *BoldValidationClient) Initialize(moduleRoots []common.Hash) error { +func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { + if c.createStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } + } if _, exists := c.producers[mr]; exists { log.Warn("Producer already existsw for module root", "hash", mr) continue diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index 49c7b5f757..54a2fea3d9 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -20,23 +20,28 @@ type ExecutionSpawnerConfig struct { ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` // Supported wasm module roots. ModuleRoots []string `koanf:"module-roots"` + // Timeout on polling for existence of each redis stream. + StreamTimeout time.Duration `koanf:"stream-timeout"` } var DefaultExecutionSpawnerConfig = ExecutionSpawnerConfig{ RedisURL: "", ConsumerConfig: pubsub.DefaultConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: 10 * time.Minute, } var TestExecutionSpawnerConfig = ExecutionSpawnerConfig{ RedisURL: "", ConsumerConfig: pubsub.TestConsumerConfig, ModuleRoots: []string{}, + StreamTimeout: time.Minute, } func ExecutionSpawnerConfigAddOptions(prefix string, f *pflag.FlagSet) { pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") + f.Duration(prefix+".stream-timeout", DefaultExecutionSpawnerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") } func (cfg *ExecutionSpawnerConfig) Enabled() bool { @@ -48,7 +53,8 @@ type ExecutionSpawner struct { spawner validator.ExecutionSpawner // consumers stores moduleRoot to consumer mapping. - consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + streamTimeout time.Duration } func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { @@ -69,50 +75,90 @@ func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.Executio consumers[mr] = c } return &ExecutionSpawner{ - consumers: consumers, - spawner: spawner, + consumers: consumers, + spawner: spawner, + streamTimeout: cfg.StreamTimeout, }, nil } func (s *ExecutionSpawner) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) - if err := s.spawner.Start(ctx_in); err != nil { - log.Error("Starting spawner", "error", err) - return - } + // Channel that all consumers use to indicate their readiness. + readyStreams := make(chan struct{}, len(s.consumers)) for moduleRoot, c := range s.consumers { c := c + moduleRoot := moduleRoot c.Start(ctx_in) - s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - req, err := c.Consume(ctx) - if err != nil { - log.Error("Consuming request", "error", err) - return 0 - } - if req == nil { - // There's nothing in the queue. - return time.Second - } - run, err := s.spawner.CreateBoldExecutionRun(moduleRoot, req.Value.StepSize, - req.Value.ValidationInput).Await(ctx) - if err != nil { - log.Error("Creationg BOLD execution", "error", err) - return 0 - } - hashes, err := run.GetLeavesWithStepSize( - req.Value.FromBatch, - req.Value.MachineStartIndex, - req.Value.StepSize, - req.Value.NumDesiredLeaves).Await(ctx) - if err != nil { - log.Error("Getting leave hashes", "error", err) - return 0 + // Channel for single consumer, once readiness is indicated in this, + // consumer will start consuming iteratively. + ready := make(chan struct{}, 1) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { + ready <- struct{}{} + readyStreams <- struct{}{} + return + } + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } } - if err := c.SetResult(ctx, req.ID, hashes); err != nil { - log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) - return 0 + }) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-ready: // Wait until the stream exists and start consuming iteratively. } - return time.Second + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + run, err := s.spawner.CreateBoldExecutionRun(moduleRoot, req.Value.StepSize, + req.Value.ValidationInput).Await(ctx) + if err != nil { + log.Error("Creationg BOLD execution", "error", err) + return 0 + } + hashes, err := run.GetLeavesWithStepSize( + req.Value.FromBatch, + req.Value.MachineStartIndex, + req.Value.StepSize, + req.Value.NumDesiredLeaves).Await(ctx) + if err != nil { + log.Error("Getting leave hashes", "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, hashes); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) + return 0 + } + return time.Second + }) }) } + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.streamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx.Done(): + log.Info(("Context expired, failed to start")) + return + } + } + }) } From 301ea7279c37ebb5ece9dc2e47b094aec3fe04b8 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 27 Aug 2024 11:29:12 +0530 Subject: [PATCH 05/23] fix config --- staker/block_validator.go | 32 ++++++++--------- validator/client/redis/boldproducer.go | 48 ++++++-------------------- validator/client/validation_client.go | 2 +- validator/valnode/valnode.go | 1 + 4 files changed, 29 insertions(+), 54 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index a92ee9ddfc..24757ad7ed 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -100,20 +100,20 @@ type BlockValidator struct { } type BlockValidatorConfig struct { - Enable bool `koanf:"enable"` - RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` - RedisBoldValidationClientConfig redis.BoldValidationClientConfig `koanf:"redis-bold-validation-client-config"` - ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` - ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` - ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` - PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` - ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` - CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload - PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload - FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` - Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` - MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` - ValidationServerConfigsList string `koanf:"validation-server-configs-list"` + Enable bool `koanf:"enable"` + RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` + RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` + ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` + PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` + ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` + CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload + PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload + FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` + Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` memoryFreeLimit int } @@ -190,7 +190,7 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ ValidationServerConfigsList: "default", ValidationServer: rpcclient.DefaultClientConfig, RedisValidationClientConfig: redis.DefaultValidationClientConfig, - RedisBoldValidationClientConfig: redis.DefaultBoldValidationClientConfig, + RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig, ValidationPoll: time.Second, ForwardBlocks: 1024, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), @@ -206,7 +206,7 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ ValidationServer: rpcclient.TestClientConfig, ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, RedisValidationClientConfig: redis.TestValidationClientConfig, - RedisBoldValidationClientConfig: redis.TestBoldValidationClientConfig, + RedisBoldValidationClientConfig: redis.TestValidationClientConfig, ValidationPoll: 100 * time.Millisecond, ForwardBlocks: 128, PrerecordedBlocks: uint64(2 * runtime.NumCPU()), diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index 9b4dcc8e9a..2329404031 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -12,45 +12,19 @@ import ( "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/validator/server_api" - "github.com/spf13/pflag" ) -type BoldValidationClientConfig struct { - RedisURL string `koanf:"redis-url"` - CreateStreams bool `koanf:"create-streams"` -} - -func (c BoldValidationClientConfig) Enabled() bool { - return c.RedisURL != "" -} - -var DefaultBoldValidationClientConfig = BoldValidationClientConfig{ - RedisURL: "", - CreateStreams: true, -} - -var TestBoldValidationClientConfig = BoldValidationClientConfig{ - RedisURL: "", - CreateStreams: false, -} - -func BoldValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) { - pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f) - f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist") -} - // BoldValidationClient implements bold validation client through redis streams. type BoldValidationClient struct { stopwaiter.StopWaiter // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] - producerConfig pubsub.ProducerConfig - redisClient redis.UniversalClient - moduleRoots []common.Hash - createStreams bool + producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + redisClient redis.UniversalClient + moduleRoots []common.Hash + config *ValidationClientConfig } -func NewBoldValidationClient(cfg *BoldValidationClientConfig) (*BoldValidationClient, error) { +func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) { if cfg.RedisURL == "" { return nil, fmt.Errorf("redis url cannot be empty") } @@ -59,16 +33,16 @@ func NewBoldValidationClient(cfg *BoldValidationClientConfig) (*BoldValidationCl return nil, err } return &BoldValidationClient{ - producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), - redisClient: redisClient, - createStreams: cfg.CreateStreams, + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + redisClient: redisClient, + config: cfg, }, nil } func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { for _, mr := range moduleRoots { - if c.createStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(mr), c.redisClient); err != nil { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil { return fmt.Errorf("creating redis stream: %w", err) } } @@ -77,7 +51,7 @@ func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []com continue } p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( - c.redisClient, server_api.RedisBoldStreamForRoot(mr), &c.producerConfig) + c.redisClient, server_api.RedisBoldStreamForRoot(mr), &c.config.ProducerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 1daa07c424..de6c8a1072 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -150,7 +150,7 @@ type ExecutionClient struct { boldValClient *redis.BoldValidationClient } -func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.BoldValidationClientConfig, stack *node.Node) *ExecutionClient { +func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *ExecutionClient { var boldClient *redis.BoldValidationClient if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() { var err error diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 21103754d5..e2b63c1d4a 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -73,6 +73,7 @@ func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { server_arb.ArbitratorSpawnerConfigAddOptions(prefix+".arbitrator", f) server_jit.JitSpawnerConfigAddOptions(prefix+".jit", f) WasmConfigAddOptions(prefix+".wasm", f) + arbredis.ExecutionSpawnerConfigAddOptions(prefix+".redis-exec-runner", f) } type ValidationNode struct { From 0bbc41e5712204477fa7eca43043f4e690f2e32b Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 27 Aug 2024 11:56:06 +0530 Subject: [PATCH 06/23] clean up --- staker/bold_state_provider.go | 1 - validator/client/redis/boldproducer.go | 31 +++++++-------- validator/server_api/json.go | 4 +- validator/server_arb/redis/consumer.go | 53 +++++--------------------- validator/valnode/valnode.go | 4 +- 5 files changed, 27 insertions(+), 66 deletions(-) diff --git a/staker/bold_state_provider.go b/staker/bold_state_provider.go index 0d2ecf5301..f7d2b7f797 100644 --- a/staker/bold_state_provider.go +++ b/staker/bold_state_provider.go @@ -375,7 +375,6 @@ func (s *BOLDStateProvider) CollectMachineHashes( if err != nil { return nil, err } - // TODO: Enable Redis streams. execRun, err := s.statelessValidator.execSpawners[0].CreateExecutionRun(cfg.WasmModuleRoot, input).Await(ctx) if err != nil { return nil, err diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index 2329404031..e0d83a3271 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -6,7 +6,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/go-redis/redis/v8" "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" @@ -18,31 +17,28 @@ import ( type BoldValidationClient struct { stopwaiter.StopWaiter // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] - redisClient redis.UniversalClient - moduleRoots []common.Hash - config *ValidationClientConfig + producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *ValidationClientConfig } func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) { - if cfg.RedisURL == "" { - return nil, fmt.Errorf("redis url cannot be empty") - } - redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) - if err != nil { - return nil, err - } return &BoldValidationClient{ - producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), - redisClient: redisClient, - config: cfg, + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + config: cfg, }, nil } func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { + if c.config.RedisURL == "" { + return fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL) + if err != nil { + return err + } for _, mr := range moduleRoots { if c.config.CreateStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisStreamForRoot(c.config.StreamPrefix, mr), c.redisClient); err != nil { + if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil { return fmt.Errorf("creating redis stream: %w", err) } } @@ -51,14 +47,13 @@ func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []com continue } p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( - c.redisClient, server_api.RedisBoldStreamForRoot(mr), &c.config.ProducerConfig) + redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) if err != nil { log.Warn("failed init redis for %v: %w", mr, err) continue } p.Start(c.GetContext()) c.producers[mr] = p - c.moduleRoots = append(c.moduleRoots, mr) } return nil } diff --git a/validator/server_api/json.go b/validator/server_api/json.go index 63ba66c750..010119696e 100644 --- a/validator/server_api/json.go +++ b/validator/server_api/json.go @@ -51,8 +51,8 @@ func RedisStreamForRoot(prefix string, moduleRoot common.Hash) string { return fmt.Sprintf("%sstream:%s", prefix, moduleRoot.Hex()) } -func RedisBoldStreamForRoot(moduleRoot common.Hash) string { - return fmt.Sprintf("stream-bold:%s", moduleRoot.Hex()) +func RedisBoldStreamForRoot(prefix string, moduleRoot common.Hash) string { + return fmt.Sprintf("%sstream-bold:%s", prefix, moduleRoot.Hex()) } type Request struct { diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index b1753fa203..d53069ecc9 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -12,52 +12,19 @@ import ( "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" - "github.com/spf13/pflag" + "github.com/offchainlabs/nitro/validator/valnode/redis" ) -type ExecutionSpawnerConfig struct { - RedisURL string `koanf:"redis-url"` - ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` - // Supported wasm module roots. - ModuleRoots []string `koanf:"module-roots"` - // Timeout on polling for existence of each redis stream. - StreamTimeout time.Duration `koanf:"stream-timeout"` -} - -var DefaultExecutionSpawnerConfig = ExecutionSpawnerConfig{ - RedisURL: "", - ConsumerConfig: pubsub.DefaultConsumerConfig, - ModuleRoots: []string{}, - StreamTimeout: 10 * time.Minute, -} - -var TestExecutionSpawnerConfig = ExecutionSpawnerConfig{ - RedisURL: "", - ConsumerConfig: pubsub.TestConsumerConfig, - ModuleRoots: []string{}, - StreamTimeout: time.Minute, -} - -func ExecutionSpawnerConfigAddOptions(prefix string, f *pflag.FlagSet) { - pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f) - f.StringSlice(prefix+".module-roots", nil, "Supported module root hashes") - f.Duration(prefix+".stream-timeout", DefaultExecutionSpawnerConfig.StreamTimeout, "Timeout on polling for existence of redis streams") -} - -func (cfg *ExecutionSpawnerConfig) Enabled() bool { - return cfg.RedisURL != "" -} - type ExecutionSpawner struct { stopwaiter.StopWaiter spawner validator.ExecutionSpawner // consumers stores moduleRoot to consumer mapping. - consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] - streamTimeout time.Duration + consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *redis.ValidationServerConfig } -func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { +func NewExecutionSpawner(cfg *redis.ValidationServerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { if cfg.RedisURL == "" { return nil, fmt.Errorf("redis url cannot be empty") } @@ -68,16 +35,16 @@ func NewExecutionSpawner(cfg *ExecutionSpawnerConfig, spawner validator.Executio consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) for _, hash := range cfg.ModuleRoots { mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(mr), &cfg.ConsumerConfig) + c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) if err != nil { return nil, fmt.Errorf("creating consumer for validation: %w", err) } consumers[mr] = c } return &ExecutionSpawner{ - consumers: consumers, - spawner: spawner, - streamTimeout: cfg.StreamTimeout, + consumers: consumers, + spawner: spawner, + config: cfg, }, nil } @@ -152,10 +119,10 @@ func (s *ExecutionSpawner) Start(ctx_in context.Context) { case <-readyStreams: log.Trace("At least one stream is ready") return // Don't block Start if at least one of the stream is ready. - case <-time.After(s.streamTimeout): + case <-time.After(s.config.StreamTimeout): log.Error("Waiting for redis streams timed out") case <-ctx.Done(): - log.Info(("Context expired, failed to start")) + log.Info("Context expired, failed to start") return } } diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index e2b63c1d4a..297c05c203 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -41,7 +41,7 @@ type Config struct { ApiAuth bool `koanf:"api-auth"` ApiPublic bool `koanf:"api-public"` Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` - RedisExecRunner arbredis.ExecutionSpawnerConfig `koanf:"redis-exec-runner"` + RedisExecRunner redis.ValidationServerConfig `koanf:"redis-exec-runner"` Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` Wasm WasmConfig `koanf:"wasm"` } @@ -73,7 +73,7 @@ func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { server_arb.ArbitratorSpawnerConfigAddOptions(prefix+".arbitrator", f) server_jit.JitSpawnerConfigAddOptions(prefix+".jit", f) WasmConfigAddOptions(prefix+".wasm", f) - arbredis.ExecutionSpawnerConfigAddOptions(prefix+".redis-exec-runner", f) + redis.ValidationServerConfigAddOptions(prefix+".redis-exec-runner", f) } type ValidationNode struct { From b9c89000cf54b1db93a3c76409d6b5cd4b8e73a0 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 27 Aug 2024 12:10:49 +0530 Subject: [PATCH 07/23] remove redudtant changes --- staker/stateless_block_validator.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 4b7a29e3d0..353e979e8b 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -214,10 +214,8 @@ func NewStatelessBlockValidator( config func() *BlockValidatorConfig, stack *node.Node, ) (*StatelessBlockValidator, error) { - var ( - executionSpawners []validator.ExecutionSpawner - redisValClient *redis.ValidationClient - ) + var executionSpawners []validator.ExecutionSpawner + var redisValClient *redis.ValidationClient if config().RedisValidationClientConfig.Enabled() { var err error @@ -226,7 +224,6 @@ func NewStatelessBlockValidator( return nil, fmt.Errorf("creating new redis validation client: %w", err) } } - configs := config().ValidationServerConfigs for i := range configs { i := i From 8fcd8b7c7d65fa5815ebcd5e7f2a222871c13db9 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Wed, 28 Aug 2024 19:06:55 +0530 Subject: [PATCH 08/23] fix config setup --- staker/block_validator.go | 1 + validator/valnode/valnode.go | 26 ++++++++++++++------------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index 24757ad7ed..658b9ea099 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -170,6 +170,7 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation") rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer) redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) + redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f) f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (small footprint)") diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 297c05c203..f3e1639436 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -49,21 +49,23 @@ type Config struct { type ValidationConfigFetcher func() *Config var DefaultValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: true, - ApiPublic: false, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: true, + ApiPublic: false, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + RedisExecRunner: redis.DefaultValidationServerConfig, + Wasm: DefaultWasmConfig, } var TestValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: false, - ApiPublic: true, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: false, + ApiPublic: true, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + RedisExecRunner: redis.TestValidationServerConfig, + Wasm: DefaultWasmConfig, } func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { From f0bc4d41dbc2b9982520e2ad739bc3f87db4571f Mon Sep 17 00:00:00 2001 From: amsanghi <102982411+amsanghi@users.noreply.github.com> Date: Wed, 28 Aug 2024 20:17:16 +0530 Subject: [PATCH 09/23] Update validator/server_arb/redis/consumer.go Co-authored-by: Raul Jordan --- validator/server_arb/redis/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index d53069ecc9..fa6e75a1b2 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -102,7 +102,7 @@ func (s *ExecutionSpawner) Start(ctx_in context.Context) { req.Value.StepSize, req.Value.NumDesiredLeaves).Await(ctx) if err != nil { - log.Error("Getting leave hashes", "error", err) + log.Error("Getting machine hashes", "error", err) return 0 } if err := c.SetResult(ctx, req.ID, hashes); err != nil { From b421fb0dbc4a54d0480ca4dada9f53777baf73d2 Mon Sep 17 00:00:00 2001 From: amsanghi <102982411+amsanghi@users.noreply.github.com> Date: Wed, 28 Aug 2024 20:17:24 +0530 Subject: [PATCH 10/23] Update validator/server_arb/redis/consumer.go Co-authored-by: Raul Jordan --- validator/server_arb/redis/consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index fa6e75a1b2..769416e509 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -94,7 +94,7 @@ func (s *ExecutionSpawner) Start(ctx_in context.Context) { run, err := s.spawner.CreateExecutionRun(moduleRoot, req.Value.ValidationInput).Await(ctx) if err != nil { - log.Error("Creationg BOLD execution", "error", err) + log.Error("Creating BOLD execution", "error", err) return 0 } hashes, err := run.GetMachineHashesWithStepSize( From 4fbd706e51df555d45ddd21c3d365eb896b36388 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Thu, 29 Aug 2024 22:13:16 +0530 Subject: [PATCH 11/23] Add tests and fix some bugs --- system_tests/full_challenge_impl_test.go | 2 +- system_tests/validation_mock_test.go | 53 ++++++++++++++++++++---- validator/client/redis/boldproducer.go | 1 - validator/client/validation_client.go | 21 ++++++++-- validator/valnode/valnode.go | 2 +- 5 files changed, 66 insertions(+), 13 deletions(-) diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index ddc229074c..7149dbc377 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -261,7 +261,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall var mockSpawn *mockSpawner builder.valnodeConfig.Wasm.RootPath = wasmRootDir if useStubs { - mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator) + mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator, "") } else { // For now validation only works with HashScheme set builder.execConfig.Caching.StateScheme = rawdb.HashScheme diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index c2fbbd9c97..52ddcaeed0 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -20,11 +20,15 @@ import ( "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/validator" + clientredis "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" + arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" "github.com/offchainlabs/nitro/validator/valnode" + "github.com/offchainlabs/nitro/validator/valnode/redis" validatorclient "github.com/offchainlabs/nitro/validator/client" ) @@ -166,7 +170,7 @@ func (r *mockExecRun) CheckAlive(ctx context.Context) error { func (r *mockExecRun) Close() {} -func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig) (*mockSpawner, *node.Node) { +func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig, redisURL string) (*mockSpawner, *node.Node) { stackConf := node.DefaultConfig stackConf.HTTPPort = 0 stackConf.DataDir = "" @@ -184,7 +188,18 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ } configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config } spawner := &mockSpawner{} - serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, nil, configFetcher) + var redisExecSpawner *arbredis.ExecutionSpawner + if redisURL != "" { + redisValidationServerConfig := redis.TestValidationServerConfig + redisValidationServerConfig.RedisURL = redisURL + redisValidationServerConfig.ModuleRoots = make([]string, len(mockWasmModuleRoots)) + for i := range redisValidationServerConfig.ModuleRoots { + redisValidationServerConfig.ModuleRoots[i] = mockWasmModuleRoots[i].Hex() + } + redisExecSpawner, err = arbredis.NewExecutionSpawner(&redisValidationServerConfig, spawner) + Require(t, err) + } + serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, redisExecSpawner, configFetcher) valAPIs := []rpc.API{{ Namespace: server_api.Namespace, @@ -211,11 +226,28 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ // mostly tests translation to/from json and running over network func TestValidationServerAPI(t *testing.T) { + testValidationServerAPI(t, false) +} + +// mostly tests translation to/from json and running over network with bold validation redis consumer/producer +func TestValidationServerAPIWithBoldValidationConsumerProducer(t *testing.T) { + testValidationServerAPI(t, true) +} +func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bool) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, validationDefault := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, validationDefault) + var redisURL string + var redisBoldValidationClientConfig *clientredis.ValidationClientConfig + if withBoldValidationConsumerProducer { + redisURL = redisutil.CreateTestRedis(ctx, t) + redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig + redisBoldValidationClientConfig.RedisURL = redisURL + redisBoldValidationClientConfig.CreateStreams = true + } + + _, validationDefault := createMockValidationNode(t, ctx, nil, redisURL) + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault) err := client.Start(ctx) Require(t, err) @@ -285,13 +317,20 @@ func TestValidationServerAPI(t *testing.T) { if !bytes.Equal(proof, mockProof) { t.Error("mock proof not expected") } + + hashes := execRun.GetMachineHashesWithStepSize(0, 1, 5) + hashesRes, err := hashes.Await(ctx) + Require(t, err) + if len(hashesRes) != 5 { + t.Error("unexpected number of hashes") + } } func TestValidationClientRoom(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) + mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil, "") client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack) err := client.Start(ctx) Require(t, err) @@ -373,10 +412,10 @@ func TestExecutionKeepAlive(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, validationDefault := createMockValidationNode(t, ctx, nil) + _, validationDefault := createMockValidationNode(t, ctx, nil, "") shortTimeoutConfig := server_arb.DefaultArbitratorSpawnerConfig shortTimeoutConfig.ExecutionRunTimeout = time.Second - _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) + _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig, "") configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index e0d83a3271..58662ad72c 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -52,7 +52,6 @@ func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []com log.Warn("failed init redis for %v: %w", mr, err) continue } - p.Start(c.GetContext()) c.producers[mr] = p } return nil diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index de6c8a1072..816b44e73f 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -164,7 +164,20 @@ func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidatio boldValClient: boldClient, } } - +func (c *ExecutionClient) Start(ctx context.Context) error { + if err := c.ValidationClient.Start(ctx); err != nil { + return err + } + if c.boldValClient != nil { + if err := c.boldValClient.Initialize(ctx, c.wasmModuleRoots); err != nil { + return err + } + if err := c.boldValClient.Start(ctx); err != nil { + return err + } + } + return nil +} func (c *ExecutionClient) CreateExecutionRun(wasmModuleRoot common.Hash, input *validator.ValidationInput) containers.PromiseInterface[validator.ExecutionRun] { return stopwaiter.LaunchPromiseThread[validator.ExecutionRun](c, func(ctx context.Context) (validator.ExecutionRun, error) { var res uint64 @@ -173,8 +186,10 @@ func (c *ExecutionClient) CreateExecutionRun(wasmModuleRoot common.Hash, input * return nil, err } run := &ExecutionClientRun{ - client: c, - id: res, + wasmModuleRoot: wasmModuleRoot, + client: c, + id: res, + input: input, } run.Start(c.GetContext()) // note: not this temporary thread's context! return run, nil diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index f3e1639436..c5a12b9142 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -64,7 +64,7 @@ var TestValidationConfig = Config{ ApiAuth: false, ApiPublic: true, Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - RedisExecRunner: redis.TestValidationServerConfig, + RedisExecRunner: redis.DefaultValidationServerConfig, Wasm: DefaultWasmConfig, } From 5170a7904f5ca2f5277650e2fe375606c8c13db0 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 29 Aug 2024 12:53:19 -0500 Subject: [PATCH 12/23] Update validator/client/redis/boldproducer.go --- validator/client/redis/boldproducer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index 58662ad72c..d441e38f45 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -43,7 +43,7 @@ func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []com } } if _, exists := c.producers[mr]; exists { - log.Warn("Producer already existsw for module root", "hash", mr) + log.Warn("Producer already exists for module root", "hash", mr) continue } p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( From 545f9d2c25a7e0f7b8e6ccc4835a053e9561f5ea Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 30 Aug 2024 18:16:45 +0530 Subject: [PATCH 13/23] add metrics --- pubsub/consumer.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pubsub/consumer.go b/pubsub/consumer.go index df3695606d..7c3bfbe85a 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/ethereum/go-ethereum/metrics" "time" "github.com/ethereum/go-ethereum/log" @@ -72,6 +73,7 @@ func (c *Consumer[Request, Response]) Start(ctx context.Context) { c.StopWaiter.CallIteratively( func(ctx context.Context) time.Duration { c.heartBeat(ctx) + c.metrics(ctx) return c.cfg.KeepAliveTimeout / 10 }, ) @@ -109,6 +111,18 @@ func (c *Consumer[Request, Response]) deleteHeartBeat(ctx context.Context) { } } +func (c *Consumer[Request, Response]) metrics(ctx context.Context) { + res, err := c.client.XPending(ctx, c.redisStream, c.redisGroup).Result() + if errors.Is(err, redis.Nil) { + return + } + if err != nil { + log.Error("Getting pending messages", "error", err) + return + } + metrics.GetOrRegisterGauge(c.redisStream+"/consumer/pending", nil).Update(res.Count) +} + // heartBeat updates the heartBeat key indicating aliveness. func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) { if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil { From d895bc46738dfa97a655304f20583d393f744855 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 29 Oct 2024 20:26:03 +0530 Subject: [PATCH 14/23] fix lint --- staker/multi_protocol/multi_protocol_staker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/staker/multi_protocol/multi_protocol_staker.go b/staker/multi_protocol/multi_protocol_staker.go index d4d4e1b54f..f8bc46fa2b 100644 --- a/staker/multi_protocol/multi_protocol_staker.go +++ b/staker/multi_protocol/multi_protocol_staker.go @@ -5,12 +5,12 @@ import ( "github.com/offchainlabs/nitro/staker" "time" - "github.com/offchainlabs/bold/solgen/go/bridgegen" - boldrollup "github.com/offchainlabs/bold/solgen/go/rollupgen" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/bold/solgen/go/bridgegen" + boldrollup "github.com/offchainlabs/bold/solgen/go/rollupgen" boldstaker "github.com/offchainlabs/nitro/staker/bold" legacystaker "github.com/offchainlabs/nitro/staker/legacy" From f765a89c7831b03914bb905e3703c22052d8e894 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 10 Dec 2024 20:16:49 +0530 Subject: [PATCH 15/23] fix lint --- staker/multi_protocol/multi_protocol_staker.go | 1 - validator/client/redis/boldproducer.go | 1 + validator/client/validation_client.go | 14 +++++++------- validator/server_arb/redis/consumer.go | 3 ++- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/staker/multi_protocol/multi_protocol_staker.go b/staker/multi_protocol/multi_protocol_staker.go index f4adcac53d..0c104094ed 100644 --- a/staker/multi_protocol/multi_protocol_staker.go +++ b/staker/multi_protocol/multi_protocol_staker.go @@ -12,7 +12,6 @@ import ( "github.com/offchainlabs/bold/solgen/go/bridgegen" boldrollup "github.com/offchainlabs/bold/solgen/go/rollupgen" - "github.com/offchainlabs/nitro/staker" boldstaker "github.com/offchainlabs/nitro/staker/bold" legacystaker "github.com/offchainlabs/nitro/staker/legacy" diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go index d441e38f45..d834b0fefe 100644 --- a/validator/client/redis/boldproducer.go +++ b/validator/client/redis/boldproducer.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/redisutil" diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index 9bb7da8d04..ba8c399f03 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -11,6 +11,13 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -18,13 +25,6 @@ import ( "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_common" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/rpc" ) type ValidationClient struct { diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index 769416e509..15d9ea2e46 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -92,7 +93,7 @@ func (s *ExecutionSpawner) Start(ctx_in context.Context) { return time.Second } run, err := s.spawner.CreateExecutionRun(moduleRoot, - req.Value.ValidationInput).Await(ctx) + req.Value.ValidationInput, true).Await(ctx) if err != nil { log.Error("Creating BOLD execution", "error", err) return 0 From a2cb2615fd6747cbb38562f3d96ba594042f4f8b Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 10 Dec 2024 20:17:27 +0530 Subject: [PATCH 16/23] fix lint --- validator/server_arb/redis/consumer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index 15d9ea2e46..f67cffa3de 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" From bc70dfbee14603f9acfea43e614543c626f50260 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Tue, 10 Dec 2024 20:18:08 +0530 Subject: [PATCH 17/23] fix lint --- validator/server_arb/redis/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go index f67cffa3de..15d9ea2e46 100644 --- a/validator/server_arb/redis/consumer.go +++ b/validator/server_arb/redis/consumer.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/stopwaiter" From 3da52e35bc6859ac96fdfbba3b983e9f7cf1520c Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 18:43:45 +0530 Subject: [PATCH 18/23] Changes based on PR comments --- system_tests/full_challenge_impl_test.go | 2 +- system_tests/validation_mock_test.go | 29 ++++++------------------ validator/valnode/validation_api.go | 9 +------- validator/valnode/valnode.go | 7 ++++-- 4 files changed, 14 insertions(+), 33 deletions(-) diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index 549a608afd..4d902f87ba 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -261,7 +261,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall var mockSpawn *mockSpawner builder.valnodeConfig.Wasm.RootPath = wasmRootDir if useStubs { - mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator, "") + mockSpawn, valStack = createMockValidationNode(t, ctx, &builder.valnodeConfig.Arbitrator) } else { // For now validation only works with HashScheme set builder.execConfig.Caching.StateScheme = rawdb.HashScheme diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 199ebb07a8..2eedf32123 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -27,9 +27,7 @@ import ( clientredis "github.com/offchainlabs/nitro/validator/client/redis" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" - arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" "github.com/offchainlabs/nitro/validator/valnode" - "github.com/offchainlabs/nitro/validator/valnode/redis" ) type mockSpawner struct { @@ -165,7 +163,7 @@ func (r *mockExecRun) CheckAlive(ctx context.Context) error { func (r *mockExecRun) Close() {} -func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig, redisURL string) (*mockSpawner, *node.Node) { +func createMockValidationNode(t *testing.T, ctx context.Context, config *server_arb.ArbitratorSpawnerConfig) (*mockSpawner, *node.Node) { stackConf := node.DefaultConfig stackConf.HTTPPort = 0 stackConf.DataDir = "" @@ -183,18 +181,7 @@ func createMockValidationNode(t *testing.T, ctx context.Context, config *server_ } configFetcher := func() *server_arb.ArbitratorSpawnerConfig { return config } spawner := &mockSpawner{} - var redisExecSpawner *arbredis.ExecutionSpawner - if redisURL != "" { - redisValidationServerConfig := redis.TestValidationServerConfig - redisValidationServerConfig.RedisURL = redisURL - redisValidationServerConfig.ModuleRoots = make([]string, len(mockWasmModuleRoots)) - for i := range redisValidationServerConfig.ModuleRoots { - redisValidationServerConfig.ModuleRoots[i] = mockWasmModuleRoots[i].Hex() - } - redisExecSpawner, err = arbredis.NewExecutionSpawner(&redisValidationServerConfig, spawner) - Require(t, err) - } - serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, redisExecSpawner, configFetcher) + serverAPI := valnode.NewExecutionServerAPI(spawner, spawner, configFetcher) valAPIs := []rpc.API{{ Namespace: server_api.Namespace, @@ -232,16 +219,14 @@ func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bo t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var redisURL string var redisBoldValidationClientConfig *clientredis.ValidationClientConfig if withBoldValidationConsumerProducer { - redisURL = redisutil.CreateTestRedis(ctx, t) redisBoldValidationClientConfig = &clientredis.TestValidationClientConfig - redisBoldValidationClientConfig.RedisURL = redisURL + redisBoldValidationClientConfig.RedisURL = redisutil.CreateTestRedis(ctx, t) redisBoldValidationClientConfig.CreateStreams = true } - _, validationDefault := createMockValidationNode(t, ctx, nil, redisURL) + _, validationDefault := createMockValidationNode(t, ctx, nil) client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault) err := client.Start(ctx) Require(t, err) @@ -325,7 +310,7 @@ func TestValidationClientRoom(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil, "") + mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack) err := client.Start(ctx) Require(t, err) @@ -407,10 +392,10 @@ func TestExecutionKeepAlive(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, validationDefault := createMockValidationNode(t, ctx, nil, "") + _, validationDefault := createMockValidationNode(t, ctx, nil) shortTimeoutConfig := server_arb.DefaultArbitratorSpawnerConfig shortTimeoutConfig.ExecutionRunTimeout = time.Second - _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig, "") + _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault) diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index 58393d0844..83e691f8c4 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -18,7 +18,6 @@ import ( "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" - arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" ) type ValidationServerAPI struct { @@ -62,8 +61,7 @@ type execRunEntry struct { type ExecServerAPI struct { stopwaiter.StopWaiter ValidationServerAPI - execSpawner validator.ExecutionSpawner - redisExecSpawner *arbredis.ExecutionSpawner + execSpawner validator.ExecutionSpawner config server_arb.ArbitratorSpawnerConfigFecher @@ -75,12 +73,10 @@ type ExecServerAPI struct { func NewExecutionServerAPI( valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, - redisExecSpawner *arbredis.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { return &ExecServerAPI{ ValidationServerAPI: *NewValidationServerAPI(valSpawner), execSpawner: execution, - redisExecSpawner: redisExecSpawner, nextId: rand.Uint64(), // good-enough to aver reusing ids after reboot runs: make(map[uint64]*execRunEntry), config: config, @@ -127,9 +123,6 @@ func (a *ExecServerAPI) removeOldRuns(ctx context.Context) time.Duration { func (a *ExecServerAPI) Start(ctx_in context.Context) { a.StopWaiter.Start(ctx_in, a) a.CallIteratively(a.removeOldRuns) - if a.redisExecSpawner != nil { - a.redisExecSpawner.Start(ctx_in) - } } var errRunNotFound error = errors.New("run not found") diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index f60f963712..3cdc985843 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -132,9 +132,9 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, redisSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(jitSpawner, arbSpawner, arbConfigFetcher) } else { - serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, redisSpawner, arbConfigFetcher) + serverAPI = NewExecutionServerAPI(arbSpawner, arbSpawner, arbConfigFetcher) } var redisConsumer *redis.ValidationServer redisValidationConfig := arbConfigFetcher().RedisValidationServerConfig @@ -174,6 +174,9 @@ func (v *ValidationNode) Start(ctx context.Context) error { if v.redisConsumer != nil { v.redisConsumer.Start(ctx) } + if v.redisExecSpawner != nil { + v.redisExecSpawner.Start(ctx) + } return nil } From 1b1fc9f40bdee51f455f5e9e0aa1761ac916a8a1 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 18:48:00 +0530 Subject: [PATCH 19/23] Changes based on PR comments --- validator/valnode/valnode.go | 44 ++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 3cdc985843..40a208a966 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -37,35 +37,32 @@ var DefaultWasmConfig = WasmConfig{ } type Config struct { - UseJit bool `koanf:"use-jit"` - ApiAuth bool `koanf:"api-auth"` - ApiPublic bool `koanf:"api-public"` - Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` - RedisExecRunner redis.ValidationServerConfig `koanf:"redis-exec-runner"` - Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` - Wasm WasmConfig `koanf:"wasm"` + UseJit bool `koanf:"use-jit"` + ApiAuth bool `koanf:"api-auth"` + ApiPublic bool `koanf:"api-public"` + Arbitrator server_arb.ArbitratorSpawnerConfig `koanf:"arbitrator" reload:"hot"` + Jit server_jit.JitSpawnerConfig `koanf:"jit" reload:"hot"` + Wasm WasmConfig `koanf:"wasm"` } type ValidationConfigFetcher func() *Config var DefaultValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: true, - ApiPublic: false, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - RedisExecRunner: redis.DefaultValidationServerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: true, + ApiPublic: false, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + Wasm: DefaultWasmConfig, } var TestValidationConfig = Config{ - UseJit: true, - Jit: server_jit.DefaultJitSpawnerConfig, - ApiAuth: false, - ApiPublic: true, - Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, - RedisExecRunner: redis.DefaultValidationServerConfig, - Wasm: DefaultWasmConfig, + UseJit: true, + Jit: server_jit.DefaultJitSpawnerConfig, + ApiAuth: false, + ApiPublic: true, + Arbitrator: server_arb.DefaultArbitratorSpawnerConfig, + Wasm: DefaultWasmConfig, } func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { @@ -75,7 +72,6 @@ func ValidationConfigAddOptions(prefix string, f *pflag.FlagSet) { server_arb.ArbitratorSpawnerConfigAddOptions(prefix+".arbitrator", f) server_jit.JitSpawnerConfigAddOptions(prefix+".jit", f) WasmConfigAddOptions(prefix+".wasm", f) - redis.ValidationServerConfigAddOptions(prefix+".redis-exec-runner", f) } type ValidationNode struct { @@ -118,8 +114,8 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod jitSpawner *server_jit.JitSpawner redisSpawner *arbredis.ExecutionSpawner ) - if config.RedisExecRunner.Enabled() { - es, err := arbredis.NewExecutionSpawner(&config.RedisExecRunner, arbSpawner) + if config.Arbitrator.RedisValidationServerConfig.Enabled() { + es, err := arbredis.NewExecutionSpawner(&config.Arbitrator.RedisValidationServerConfig, arbSpawner) if err != nil { log.Error("creating redis execution spawner", "error", err) } From fa808562fc4616ebf4543c54f566d15a2626a0bf Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 18:50:44 +0530 Subject: [PATCH 20/23] Changes based on PR comments --- staker/block_validator.go | 94 ++++++++++++++--------------- staker/stateless_block_validator.go | 2 +- 2 files changed, 46 insertions(+), 50 deletions(-) diff --git a/staker/block_validator.go b/staker/block_validator.go index fae899d4cc..43e5c7d28f 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -107,22 +107,21 @@ type BlockValidator struct { } type BlockValidatorConfig struct { - Enable bool `koanf:"enable"` - RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` - RedisBoldValidationClientConfig redis.ValidationClientConfig `koanf:"redis-bold-validation-client-config"` - ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` - ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` - ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` - PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` - RecordingIterLimit uint64 `koanf:"recording-iter-limit"` - ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` - BatchCacheLimit uint32 `koanf:"batch-cache-limit"` - CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload - PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload - FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` - Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` - MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` - ValidationServerConfigsList string `koanf:"validation-server-configs-list"` + Enable bool `koanf:"enable"` + RedisValidationClientConfig redis.ValidationClientConfig `koanf:"redis-validation-client-config"` + ValidationServer rpcclient.ClientConfig `koanf:"validation-server" reload:"hot"` + ValidationServerConfigs []rpcclient.ClientConfig `koanf:"validation-server-configs"` + ValidationPoll time.Duration `koanf:"validation-poll" reload:"hot"` + PrerecordedBlocks uint64 `koanf:"prerecorded-blocks" reload:"hot"` + RecordingIterLimit uint64 `koanf:"recording-iter-limit"` + ForwardBlocks uint64 `koanf:"forward-blocks" reload:"hot"` + BatchCacheLimit uint32 `koanf:"batch-cache-limit"` + CurrentModuleRoot string `koanf:"current-module-root"` // TODO(magic) requires reinitialization on hot reload + PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload + FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` + Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + MemoryFreeLimit string `koanf:"memory-free-limit" reload:"hot"` + ValidationServerConfigsList string `koanf:"validation-server-configs-list"` // The directory to which the BlockValidator will write the // block_inputs_.json files when WriteToFile() is called. BlockInputsFilePath string `koanf:"block-inputs-file-path"` @@ -182,7 +181,6 @@ func BlockValidatorConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".enable", DefaultBlockValidatorConfig.Enable, "enable block-by-block validation") rpcclient.RPCClientAddOptions(prefix+".validation-server", f, &DefaultBlockValidatorConfig.ValidationServer) redis.ValidationClientConfigAddOptions(prefix+".redis-validation-client-config", f) - redis.ValidationClientConfigAddOptions(prefix+".redis-bold-validation-client-config", f) f.String(prefix+".validation-server-configs-list", DefaultBlockValidatorConfig.ValidationServerConfigsList, "array of execution rpc configs given as a json string. time duration should be supplied in number indicating nanoseconds") f.Duration(prefix+".validation-poll", DefaultBlockValidatorConfig.ValidationPoll, "poll time to check validations") f.Uint64(prefix+".forward-blocks", DefaultBlockValidatorConfig.ForwardBlocks, "prepare entries for up to that many blocks ahead of validation (stores batch-copy per block)") @@ -202,41 +200,39 @@ func BlockValidatorDangerousConfigAddOptions(prefix string, f *pflag.FlagSet) { } var DefaultBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - ValidationServerConfigsList: "default", - ValidationServer: rpcclient.DefaultClientConfig, - RedisValidationClientConfig: redis.DefaultValidationClientConfig, - RedisBoldValidationClientConfig: redis.DefaultValidationClientConfig, - ValidationPoll: time.Second, - ForwardBlocks: 128, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - BatchCacheLimit: 20, - CurrentModuleRoot: "current", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - BlockInputsFilePath: "./target/validation_inputs", - MemoryFreeLimit: "default", - RecordingIterLimit: 20, + Enable: false, + ValidationServerConfigsList: "default", + ValidationServer: rpcclient.DefaultClientConfig, + RedisValidationClientConfig: redis.DefaultValidationClientConfig, + ValidationPoll: time.Second, + ForwardBlocks: 128, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + BatchCacheLimit: 20, + CurrentModuleRoot: "current", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + BlockInputsFilePath: "./target/validation_inputs", + MemoryFreeLimit: "default", + RecordingIterLimit: 20, } var TestBlockValidatorConfig = BlockValidatorConfig{ - Enable: false, - ValidationServer: rpcclient.TestClientConfig, - ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, - RedisValidationClientConfig: redis.TestValidationClientConfig, - RedisBoldValidationClientConfig: redis.TestValidationClientConfig, - ValidationPoll: 100 * time.Millisecond, - ForwardBlocks: 128, - BatchCacheLimit: 20, - PrerecordedBlocks: uint64(2 * runtime.NumCPU()), - RecordingIterLimit: 20, - CurrentModuleRoot: "latest", - PendingUpgradeModuleRoot: "latest", - FailureIsFatal: true, - Dangerous: DefaultBlockValidatorDangerousConfig, - BlockInputsFilePath: "./target/validation_inputs", - MemoryFreeLimit: "default", + Enable: false, + ValidationServer: rpcclient.TestClientConfig, + ValidationServerConfigs: []rpcclient.ClientConfig{rpcclient.TestClientConfig}, + RedisValidationClientConfig: redis.TestValidationClientConfig, + ValidationPoll: 100 * time.Millisecond, + ForwardBlocks: 128, + BatchCacheLimit: 20, + PrerecordedBlocks: uint64(2 * runtime.NumCPU()), + RecordingIterLimit: 20, + CurrentModuleRoot: "latest", + PendingUpgradeModuleRoot: "latest", + FailureIsFatal: true, + Dangerous: DefaultBlockValidatorDangerousConfig, + BlockInputsFilePath: "./target/validation_inputs", + MemoryFreeLimit: "default", } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index 16b0077430..cfa4dc995f 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -251,7 +251,7 @@ func NewStatelessBlockValidator( confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack)) if i == 0 { - executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisBoldValidationClientConfig, stack)) + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack)) } } From 580db8d69d929c3002c25aae373378f779c179a7 Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 19:29:54 +0530 Subject: [PATCH 21/23] Changes based on PR comments --- validator/server_arb/redis/consumer.go | 131 ------------------------- validator/valnode/redis/consumer.go | 127 +++++++++++++++++++++++- validator/valnode/valnode.go | 28 ++---- 3 files changed, 132 insertions(+), 154 deletions(-) delete mode 100644 validator/server_arb/redis/consumer.go diff --git a/validator/server_arb/redis/consumer.go b/validator/server_arb/redis/consumer.go deleted file mode 100644 index 15d9ea2e46..0000000000 --- a/validator/server_arb/redis/consumer.go +++ /dev/null @@ -1,131 +0,0 @@ -package redis - -import ( - "context" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/offchainlabs/nitro/pubsub" - "github.com/offchainlabs/nitro/util/redisutil" - "github.com/offchainlabs/nitro/util/stopwaiter" - "github.com/offchainlabs/nitro/validator" - "github.com/offchainlabs/nitro/validator/server_api" - "github.com/offchainlabs/nitro/validator/valnode/redis" -) - -type ExecutionSpawner struct { - stopwaiter.StopWaiter - spawner validator.ExecutionSpawner - - // consumers stores moduleRoot to consumer mapping. - consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] - config *redis.ValidationServerConfig -} - -func NewExecutionSpawner(cfg *redis.ValidationServerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { - if cfg.RedisURL == "" { - return nil, fmt.Errorf("redis url cannot be empty") - } - redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) - if err != nil { - return nil, err - } - consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) - for _, hash := range cfg.ModuleRoots { - mr := common.HexToHash(hash) - c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) - if err != nil { - return nil, fmt.Errorf("creating consumer for validation: %w", err) - } - consumers[mr] = c - } - return &ExecutionSpawner{ - consumers: consumers, - spawner: spawner, - config: cfg, - }, nil -} - -func (s *ExecutionSpawner) Start(ctx_in context.Context) { - s.StopWaiter.Start(ctx_in, s) - // Channel that all consumers use to indicate their readiness. - readyStreams := make(chan struct{}, len(s.consumers)) - for moduleRoot, c := range s.consumers { - c := c - moduleRoot := moduleRoot - c.Start(ctx_in) - // Channel for single consumer, once readiness is indicated in this, - // consumer will start consuming iteratively. - ready := make(chan struct{}, 1) - s.StopWaiter.LaunchThread(func(ctx context.Context) { - for { - if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { - ready <- struct{}{} - readyStreams <- struct{}{} - return - } - select { - case <-ctx.Done(): - log.Info("Context done", "error", ctx.Err().Error()) - return - case <-time.After(time.Millisecond * 100): - } - } - }) - s.StopWaiter.LaunchThread(func(ctx context.Context) { - select { - case <-ctx.Done(): - log.Info("Context done", "error", ctx.Err().Error()) - return - case <-ready: // Wait until the stream exists and start consuming iteratively. - } - s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { - req, err := c.Consume(ctx) - if err != nil { - log.Error("Consuming request", "error", err) - return 0 - } - if req == nil { - // There's nothing in the queue. - return time.Second - } - run, err := s.spawner.CreateExecutionRun(moduleRoot, - req.Value.ValidationInput, true).Await(ctx) - if err != nil { - log.Error("Creating BOLD execution", "error", err) - return 0 - } - hashes, err := run.GetMachineHashesWithStepSize( - req.Value.MachineStartIndex, - req.Value.StepSize, - req.Value.NumDesiredLeaves).Await(ctx) - if err != nil { - log.Error("Getting machine hashes", "error", err) - return 0 - } - if err := c.SetResult(ctx, req.ID, hashes); err != nil { - log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) - return 0 - } - return time.Second - }) - }) - } - s.StopWaiter.LaunchThread(func(ctx context.Context) { - for { - select { - case <-readyStreams: - log.Trace("At least one stream is ready") - return // Don't block Start if at least one of the stream is ready. - case <-time.After(s.config.StreamTimeout): - log.Error("Waiting for redis streams timed out") - case <-ctx.Done(): - log.Info("Context expired, failed to start") - return - } - } - }) -} diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 93b3eddd3f..fdee6a3f6b 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -22,7 +22,7 @@ import ( // RedisValidationClient producers. type ValidationServer struct { stopwaiter.StopWaiter - spawner validator.ValidationSpawner + spawner validator.ExecutionSpawner // consumers stores moduleRoot to consumer mapping. consumers map[common.Hash]*pubsub.Consumer[*validator.ValidationInput, validator.GoGlobalState] @@ -30,7 +30,7 @@ type ValidationServer struct { config *ValidationServerConfig } -func NewValidationServer(cfg *ValidationServerConfig, spawner validator.ValidationSpawner) (*ValidationServer, error) { +func NewValidationServer(cfg *ValidationServerConfig, spawner validator.ExecutionSpawner) (*ValidationServer, error) { if cfg.RedisURL == "" { return nil, fmt.Errorf("redis url cannot be empty") } @@ -56,6 +56,7 @@ func NewValidationServer(cfg *ValidationServerConfig, spawner validator.Validati func (s *ValidationServer) Start(ctx_in context.Context) { s.StopWaiter.Start(ctx_in, s) + s.StartBoldSpawner(ctx_in) // Channel that all consumers use to indicate their readiness. readyStreams := make(chan struct{}, len(s.consumers)) type workUnit struct { @@ -184,6 +185,128 @@ func (s *ValidationServer) Start(ctx_in context.Context) { } } +func (s *ValidationServer) StartBoldSpawner(ctx context.Context) { + boldSpawner, err := NewExecutionSpawner(s.config, s.spawner) + if err != nil { + log.Error("creating redis execution spawner", "error", err) + } + boldSpawner.Start(ctx) +} + +type ExecutionSpawner struct { + stopwaiter.StopWaiter + spawner validator.ExecutionSpawner + + // consumers stores moduleRoot to consumer mapping. + consumers map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *ValidationServerConfig +} + +func NewExecutionSpawner(cfg *ValidationServerConfig, spawner validator.ExecutionSpawner) (*ExecutionSpawner, error) { + if cfg.RedisURL == "" { + return nil, fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) + if err != nil { + return nil, err + } + consumers := make(map[common.Hash]*pubsub.Consumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]) + for _, hash := range cfg.ModuleRoots { + mr := common.HexToHash(hash) + c, err := pubsub.NewConsumer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](redisClient, server_api.RedisBoldStreamForRoot(cfg.StreamPrefix, mr), &cfg.ConsumerConfig) + if err != nil { + return nil, fmt.Errorf("creating consumer for validation: %w", err) + } + consumers[mr] = c + } + return &ExecutionSpawner{ + consumers: consumers, + spawner: spawner, + config: cfg, + }, nil +} + +func (s *ExecutionSpawner) Start(ctx_in context.Context) { + s.StopWaiter.Start(ctx_in, s) + // Channel that all consumers use to indicate their readiness. + readyStreams := make(chan struct{}, len(s.consumers)) + for moduleRoot, c := range s.consumers { + c := c + moduleRoot := moduleRoot + c.Start(ctx_in) + // Channel for single consumer, once readiness is indicated in this, + // consumer will start consuming iteratively. + ready := make(chan struct{}, 1) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + if pubsub.StreamExists(ctx, c.StreamName(), c.RedisClient()) { + ready <- struct{}{} + readyStreams <- struct{}{} + return + } + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-time.After(time.Millisecond * 100): + } + } + }) + s.StopWaiter.LaunchThread(func(ctx context.Context) { + select { + case <-ctx.Done(): + log.Info("Context done", "error", ctx.Err().Error()) + return + case <-ready: // Wait until the stream exists and start consuming iteratively. + } + s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration { + req, err := c.Consume(ctx) + if err != nil { + log.Error("Consuming request", "error", err) + return 0 + } + if req == nil { + // There's nothing in the queue. + return time.Second + } + run, err := s.spawner.CreateExecutionRun(moduleRoot, + req.Value.ValidationInput, true).Await(ctx) + if err != nil { + log.Error("Creating BOLD execution", "error", err) + return 0 + } + hashes, err := run.GetMachineHashesWithStepSize( + req.Value.MachineStartIndex, + req.Value.StepSize, + req.Value.NumDesiredLeaves).Await(ctx) + if err != nil { + log.Error("Getting machine hashes", "error", err) + return 0 + } + if err := c.SetResult(ctx, req.ID, hashes); err != nil { + log.Error("Error setting result for request", "id", req.ID, "result", hashes, "error", err) + return 0 + } + return time.Second + }) + }) + } + s.StopWaiter.LaunchThread(func(ctx context.Context) { + for { + select { + case <-readyStreams: + log.Trace("At least one stream is ready") + return // Don't block Start if at least one of the stream is ready. + case <-time.After(s.config.StreamTimeout): + log.Error("Waiting for redis streams timed out") + case <-ctx.Done(): + log.Info("Context expired, failed to start") + return + } + } + }) +} + type ValidationServerConfig struct { RedisURL string `koanf:"redis-url"` ConsumerConfig pubsub.ConsumerConfig `koanf:"consumer-config"` diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 40a208a966..5839aeb16a 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -12,7 +12,6 @@ import ( "github.com/offchainlabs/nitro/validator" "github.com/offchainlabs/nitro/validator/server_api" "github.com/offchainlabs/nitro/validator/server_arb" - arbredis "github.com/offchainlabs/nitro/validator/server_arb/redis" "github.com/offchainlabs/nitro/validator/server_common" "github.com/offchainlabs/nitro/validator/server_jit" "github.com/offchainlabs/nitro/validator/valnode/redis" @@ -79,8 +78,7 @@ type ValidationNode struct { arbSpawner *server_arb.ArbitratorSpawner jitSpawner *server_jit.JitSpawner - redisConsumer *redis.ValidationServer - redisExecSpawner *arbredis.ExecutionSpawner + redisConsumer *redis.ValidationServer } func EnsureValidationExposedViaAuthRPC(stackConf *node.Config) { @@ -110,17 +108,9 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod return nil, err } var ( - serverAPI *ExecServerAPI - jitSpawner *server_jit.JitSpawner - redisSpawner *arbredis.ExecutionSpawner + serverAPI *ExecServerAPI + jitSpawner *server_jit.JitSpawner ) - if config.Arbitrator.RedisValidationServerConfig.Enabled() { - es, err := arbredis.NewExecutionSpawner(&config.Arbitrator.RedisValidationServerConfig, arbSpawner) - if err != nil { - log.Error("creating redis execution spawner", "error", err) - } - redisSpawner = es - } if config.UseJit { jitConfigFetcher := func() *server_jit.JitSpawnerConfig { return &configFetcher().Jit } var err error @@ -150,11 +140,10 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod stack.RegisterAPIs(valAPIs) return &ValidationNode{ - config: configFetcher, - arbSpawner: arbSpawner, - jitSpawner: jitSpawner, - redisConsumer: redisConsumer, - redisExecSpawner: redisSpawner, + config: configFetcher, + arbSpawner: arbSpawner, + jitSpawner: jitSpawner, + redisConsumer: redisConsumer, }, nil } @@ -170,9 +159,6 @@ func (v *ValidationNode) Start(ctx context.Context) error { if v.redisConsumer != nil { v.redisConsumer.Start(ctx) } - if v.redisExecSpawner != nil { - v.redisExecSpawner.Start(ctx) - } return nil } From 87b68d5349dfb1aa7cdc8cb12644ec859926184b Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 19:35:16 +0530 Subject: [PATCH 22/23] remove --- validator/valnode/validation_api.go | 5 +---- validator/valnode/valnode.go | 13 +++---------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/validator/valnode/validation_api.go b/validator/valnode/validation_api.go index 83e691f8c4..dab74f6e29 100644 --- a/validator/valnode/validation_api.go +++ b/validator/valnode/validation_api.go @@ -70,10 +70,7 @@ type ExecServerAPI struct { runs map[uint64]*execRunEntry } -func NewExecutionServerAPI( - valSpawner validator.ValidationSpawner, - execution validator.ExecutionSpawner, - config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { +func NewExecutionServerAPI(valSpawner validator.ValidationSpawner, execution validator.ExecutionSpawner, config server_arb.ArbitratorSpawnerConfigFecher) *ExecServerAPI { return &ExecServerAPI{ ValidationServerAPI: *NewValidationServerAPI(valSpawner), execSpawner: execution, diff --git a/validator/valnode/valnode.go b/validator/valnode/valnode.go index 5839aeb16a..e3bf662aaa 100644 --- a/validator/valnode/valnode.go +++ b/validator/valnode/valnode.go @@ -107,10 +107,8 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod if err != nil { return nil, err } - var ( - serverAPI *ExecServerAPI - jitSpawner *server_jit.JitSpawner - ) + var serverAPI *ExecServerAPI + var jitSpawner *server_jit.JitSpawner if config.UseJit { jitConfigFetcher := func() *server_jit.JitSpawnerConfig { return &configFetcher().Jit } var err error @@ -139,12 +137,7 @@ func CreateValidationNode(configFetcher ValidationConfigFetcher, stack *node.Nod }} stack.RegisterAPIs(valAPIs) - return &ValidationNode{ - config: configFetcher, - arbSpawner: arbSpawner, - jitSpawner: jitSpawner, - redisConsumer: redisConsumer, - }, nil + return &ValidationNode{configFetcher, arbSpawner, jitSpawner, redisConsumer}, nil } func (v *ValidationNode) Start(ctx context.Context) error { From a9ab22ea0bba8f295c82d6f5eec97483a997441f Mon Sep 17 00:00:00 2001 From: Aman Sanghi Date: Fri, 3 Jan 2025 20:50:56 +0530 Subject: [PATCH 23/23] Changes based on PR comments --- staker/stateless_block_validator.go | 6 +- system_tests/validation_mock_test.go | 8 +- validator/client/redis/boldproducer.go | 86 ------------- validator/client/validation_client.go | 159 +++++++++++++++++++++---- 4 files changed, 144 insertions(+), 115 deletions(-) delete mode 100644 validator/client/redis/boldproducer.go diff --git a/staker/stateless_block_validator.go b/staker/stateless_block_validator.go index cfa4dc995f..ab021ec85d 100644 --- a/staker/stateless_block_validator.go +++ b/staker/stateless_block_validator.go @@ -249,9 +249,11 @@ func NewStatelessBlockValidator( for i := range configs { i := i confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] } - executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack)) + executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack)) if i == 0 { - executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack)) + if config().RedisValidationClientConfig.Enabled() { + executionSpawners = append(executionSpawners, validatorclient.NewBoldExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack)) + } } } diff --git a/system_tests/validation_mock_test.go b/system_tests/validation_mock_test.go index 2eedf32123..43475c57bf 100644 --- a/system_tests/validation_mock_test.go +++ b/system_tests/validation_mock_test.go @@ -227,7 +227,7 @@ func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bo } _, validationDefault := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault) + client := validatorclient.NewBoldExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault) err := client.Start(ctx) Require(t, err) @@ -311,7 +311,7 @@ func TestValidationClientRoom(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil) - client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack) + client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack) err := client.Start(ctx) Require(t, err) @@ -398,10 +398,10 @@ func TestExecutionKeepAlive(t *testing.T) { _, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig) configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig) - clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault) + clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault) err := clientDefault.Start(ctx) Require(t, err) - clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO) + clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO) err = clientShortTO.Start(ctx) Require(t, err) diff --git a/validator/client/redis/boldproducer.go b/validator/client/redis/boldproducer.go deleted file mode 100644 index d834b0fefe..0000000000 --- a/validator/client/redis/boldproducer.go +++ /dev/null @@ -1,86 +0,0 @@ -package redis - -import ( - "context" - "fmt" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/offchainlabs/nitro/pubsub" - "github.com/offchainlabs/nitro/util/containers" - "github.com/offchainlabs/nitro/util/redisutil" - "github.com/offchainlabs/nitro/util/stopwaiter" - "github.com/offchainlabs/nitro/validator/server_api" -) - -// BoldValidationClient implements bold validation client through redis streams. -type BoldValidationClient struct { - stopwaiter.StopWaiter - // producers stores moduleRoot to producer mapping. - producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] - config *ValidationClientConfig -} - -func NewBoldValidationClient(cfg *ValidationClientConfig) (*BoldValidationClient, error) { - return &BoldValidationClient{ - producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), - config: cfg, - }, nil -} - -func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { - if c.config.RedisURL == "" { - return fmt.Errorf("redis url cannot be empty") - } - redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL) - if err != nil { - return err - } - for _, mr := range moduleRoots { - if c.config.CreateStreams { - if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil { - return fmt.Errorf("creating redis stream: %w", err) - } - } - if _, exists := c.producers[mr]; exists { - log.Warn("Producer already exists for module root", "hash", mr) - continue - } - p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( - redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) - if err != nil { - log.Warn("failed init redis for %v: %w", mr, err) - continue - } - c.producers[mr] = p - } - return nil -} - -func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] { - producer, found := c.producers[req.ModuleRoot] - if !found { - return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot)) - } - promise, err := producer.Produce(c.GetContext(), req) - if err != nil { - return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err)) - } - return promise -} - -func (c *BoldValidationClient) Start(ctx_in context.Context) error { - for _, p := range c.producers { - p.Start(ctx_in) - } - c.StopWaiter.Start(ctx_in, c) - return nil -} - -func (c *BoldValidationClient) Stop() { - for _, p := range c.producers { - p.StopAndWait() - } - c.StopWaiter.StopAndWait() -} diff --git a/validator/client/validation_client.go b/validator/client/validation_client.go index ba8c399f03..3e64e33282 100644 --- a/validator/client/validation_client.go +++ b/validator/client/validation_client.go @@ -18,7 +18,9 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" + "github.com/offchainlabs/nitro/pubsub" "github.com/offchainlabs/nitro/util/containers" + "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/rpcclient" "github.com/offchainlabs/nitro/util/stopwaiter" "github.com/offchainlabs/nitro/validator" @@ -145,36 +147,18 @@ func (c *ValidationClient) Room() int { } type ExecutionClient struct { - ValidationClient - boldValClient *redis.BoldValidationClient + *ValidationClient } -func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *ExecutionClient { - var boldClient *redis.BoldValidationClient - if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() { - var err error - boldClient, err = redis.NewBoldValidationClient(redisBoldValidationClientConfig) - if err != nil { - log.Error("Creating new redis bold validation client", "error", err) - } - } +func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient { return &ExecutionClient{ - ValidationClient: *NewValidationClient(config, stack), - boldValClient: boldClient, + ValidationClient: NewValidationClient(config, stack), } } func (c *ExecutionClient) Start(ctx context.Context) error { if err := c.ValidationClient.Start(ctx); err != nil { return err } - if c.boldValClient != nil { - if err := c.boldValClient.Initialize(ctx, c.wasmModuleRoots); err != nil { - return err - } - if err := c.boldValClient.Start(ctx); err != nil { - return err - } - } return nil } @@ -203,6 +187,7 @@ func (c *ExecutionClient) CreateExecutionRun( type ExecutionClientRun struct { stopwaiter.StopWaiter client *ExecutionClient + boldClient *BoldExecutionClient id uint64 wasmModuleRoot common.Hash input *validator.ValidationInput @@ -252,8 +237,8 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[* } func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] { - if r.client.boldValClient != nil { - return r.client.boldValClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{ + if r.boldClient != nil { + return r.boldClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{ ModuleRoot: r.wasmModuleRoot, MachineStartIndex: machineStartIndex, StepSize: stepSize, @@ -305,3 +290,131 @@ func (r *ExecutionClientRun) Close() { } }) } + +// BoldValidationClient implements bold validation client through redis streams. +type BoldValidationClient struct { + stopwaiter.StopWaiter + *ValidationClient + // producers stores moduleRoot to producer mapping. + producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash] + config *redis.ValidationClientConfig +} + +func NewBoldValidationClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) (*BoldValidationClient, error) { + return &BoldValidationClient{ + ValidationClient: NewValidationClient(config, stack), + producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]), + config: redisBoldValidationClientConfig, + }, nil +} + +func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error { + if c.config.RedisURL == "" { + return fmt.Errorf("redis url cannot be empty") + } + redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL) + if err != nil { + return err + } + for _, mr := range moduleRoots { + if c.config.CreateStreams { + if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil { + return fmt.Errorf("creating redis stream: %w", err) + } + } + if _, exists := c.producers[mr]; exists { + log.Warn("Producer already exists for module root", "hash", mr) + continue + } + p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]( + redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig) + if err != nil { + log.Warn("failed init redis for %v: %w", mr, err) + continue + } + c.producers[mr] = p + } + return nil +} + +func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] { + producer, found := c.producers[req.ModuleRoot] + if !found { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot)) + } + promise, err := producer.Produce(c.GetContext(), req) + if err != nil { + return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err)) + } + return promise +} + +func (c *BoldValidationClient) Start(ctx_in context.Context) error { + if err := c.Initialize(ctx_in, c.wasmModuleRoots); err != nil { + return err + } + for _, p := range c.producers { + p.Start(ctx_in) + } + c.StopWaiter.Start(ctx_in, c) + return nil +} + +func (c *BoldValidationClient) Stop() { + for _, p := range c.producers { + p.StopAndWait() + } + c.StopWaiter.StopAndWait() +} + +type BoldExecutionClient struct { + *BoldValidationClient +} + +func NewBoldExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *BoldExecutionClient { + var validationClient *BoldValidationClient + if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() { + var err error + validationClient, err = NewBoldValidationClient(config, redisBoldValidationClientConfig, stack) + if err != nil { + log.Error("Creating new redis bold validation client", "error", err) + } + } + return &BoldExecutionClient{ + BoldValidationClient: validationClient, + } +} + +func (c *BoldExecutionClient) CreateExecutionRun( + wasmModuleRoot common.Hash, + input *validator.ValidationInput, + useBoldMachine bool, +) containers.PromiseInterface[validator.ExecutionRun] { + return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (validator.ExecutionRun, error) { + var res uint64 + err := c.client.CallContext(ctx, &res, server_api.Namespace+"_createExecutionRun", wasmModuleRoot, server_api.ValidationInputToJson(input), useBoldMachine) + if err != nil { + return nil, err + } + run := &ExecutionClientRun{ + wasmModuleRoot: wasmModuleRoot, + boldClient: c, + client: &ExecutionClient{ValidationClient: c.BoldValidationClient.ValidationClient}, + id: res, + input: input, + } + run.Start(c.GetContext()) // note: not this temporary thread's context! + return run, nil + }) +} + +func (c *BoldExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] { + return stopwaiter.LaunchPromiseThread[common.Hash](c, func(ctx context.Context) (common.Hash, error) { + var res common.Hash + err := c.client.CallContext(ctx, &res, server_api.Namespace+"_latestWasmModuleRoot") + if err != nil { + return common.Hash{}, err + } + return res, nil + }) +}