Skip to content

Commit

Permalink
Changes based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi committed Jan 3, 2025
1 parent 87b68d5 commit a9ab22e
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 115 deletions.
6 changes: 4 additions & 2 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,11 @@ func NewStatelessBlockValidator(
for i := range configs {
i := i
confFetcher := func() *rpcclient.ClientConfig { return &config().ValidationServerConfigs[i] }
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, nil, stack))
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, stack))
if i == 0 {
executionSpawners = append(executionSpawners, validatorclient.NewExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack))
if config().RedisValidationClientConfig.Enabled() {
executionSpawners = append(executionSpawners, validatorclient.NewBoldExecutionClient(confFetcher, &config().RedisValidationClientConfig, stack))
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func testValidationServerAPI(t *testing.T, withBoldValidationConsumerProducer bo
}

_, validationDefault := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault)
client := validatorclient.NewBoldExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), redisBoldValidationClientConfig, validationDefault)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -311,7 +311,7 @@ func TestValidationClientRoom(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockSpawner, spawnerStack := createMockValidationNode(t, ctx, nil)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), nil, spawnerStack)
client := validatorclient.NewExecutionClient(StaticFetcherFrom(t, &rpcclient.TestClientConfig), spawnerStack)
err := client.Start(ctx)
Require(t, err)

Expand Down Expand Up @@ -398,10 +398,10 @@ func TestExecutionKeepAlive(t *testing.T) {
_, validationShortTO := createMockValidationNode(t, ctx, &shortTimeoutConfig)
configFetcher := StaticFetcherFrom(t, &rpcclient.TestClientConfig)

clientDefault := validatorclient.NewExecutionClient(configFetcher, nil, validationDefault)
clientDefault := validatorclient.NewExecutionClient(configFetcher, validationDefault)
err := clientDefault.Start(ctx)
Require(t, err)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, nil, validationShortTO)
clientShortTO := validatorclient.NewExecutionClient(configFetcher, validationShortTO)
err = clientShortTO.Start(ctx)
Require(t, err)

Expand Down
86 changes: 0 additions & 86 deletions validator/client/redis/boldproducer.go

This file was deleted.

159 changes: 136 additions & 23 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/pubsub"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/rpcclient"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/offchainlabs/nitro/validator"
Expand Down Expand Up @@ -145,36 +147,18 @@ func (c *ValidationClient) Room() int {
}

type ExecutionClient struct {
ValidationClient
boldValClient *redis.BoldValidationClient
*ValidationClient
}

func NewExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *ExecutionClient {
var boldClient *redis.BoldValidationClient
if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() {
var err error
boldClient, err = redis.NewBoldValidationClient(redisBoldValidationClientConfig)
if err != nil {
log.Error("Creating new redis bold validation client", "error", err)
}
}
func NewExecutionClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ExecutionClient {
return &ExecutionClient{
ValidationClient: *NewValidationClient(config, stack),
boldValClient: boldClient,
ValidationClient: NewValidationClient(config, stack),
}
}
func (c *ExecutionClient) Start(ctx context.Context) error {
if err := c.ValidationClient.Start(ctx); err != nil {
return err
}
if c.boldValClient != nil {
if err := c.boldValClient.Initialize(ctx, c.wasmModuleRoots); err != nil {
return err
}
if err := c.boldValClient.Start(ctx); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -203,6 +187,7 @@ func (c *ExecutionClient) CreateExecutionRun(
type ExecutionClientRun struct {
stopwaiter.StopWaiter
client *ExecutionClient
boldClient *BoldExecutionClient
id uint64
wasmModuleRoot common.Hash
input *validator.ValidationInput
Expand Down Expand Up @@ -252,8 +237,8 @@ func (r *ExecutionClientRun) GetStepAt(pos uint64) containers.PromiseInterface[*
}

func (r *ExecutionClientRun) GetMachineHashesWithStepSize(machineStartIndex, stepSize, maxIterations uint64) containers.PromiseInterface[[]common.Hash] {
if r.client.boldValClient != nil {
return r.client.boldValClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{
if r.boldClient != nil {
return r.boldClient.GetLeavesWithStepSize(&server_api.GetLeavesWithStepSizeInput{
ModuleRoot: r.wasmModuleRoot,
MachineStartIndex: machineStartIndex,
StepSize: stepSize,
Expand Down Expand Up @@ -305,3 +290,131 @@ func (r *ExecutionClientRun) Close() {
}
})
}

// BoldValidationClient implements bold validation client through redis streams.
type BoldValidationClient struct {
stopwaiter.StopWaiter
*ValidationClient
// producers stores moduleRoot to producer mapping.
producers map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]
config *redis.ValidationClientConfig
}

func NewBoldValidationClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) (*BoldValidationClient, error) {
return &BoldValidationClient{
ValidationClient: NewValidationClient(config, stack),
producers: make(map[common.Hash]*pubsub.Producer[*server_api.GetLeavesWithStepSizeInput, []common.Hash]),
config: redisBoldValidationClientConfig,
}, nil
}

func (c *BoldValidationClient) Initialize(ctx context.Context, moduleRoots []common.Hash) error {
if c.config.RedisURL == "" {
return fmt.Errorf("redis url cannot be empty")
}
redisClient, err := redisutil.RedisClientFromURL(c.config.RedisURL)
if err != nil {
return err
}
for _, mr := range moduleRoots {
if c.config.CreateStreams {
if err := pubsub.CreateStream(ctx, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), redisClient); err != nil {
return fmt.Errorf("creating redis stream: %w", err)
}
}
if _, exists := c.producers[mr]; exists {
log.Warn("Producer already exists for module root", "hash", mr)
continue
}
p, err := pubsub.NewProducer[*server_api.GetLeavesWithStepSizeInput, []common.Hash](
redisClient, server_api.RedisBoldStreamForRoot(c.config.StreamPrefix, mr), &c.config.ProducerConfig)
if err != nil {
log.Warn("failed init redis for %v: %w", mr, err)
continue
}
c.producers[mr] = p
}
return nil
}

func (c *BoldValidationClient) GetLeavesWithStepSize(req *server_api.GetLeavesWithStepSizeInput) containers.PromiseInterface[[]common.Hash] {
producer, found := c.producers[req.ModuleRoot]
if !found {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("no validation is configured for wasm root %v", req.ModuleRoot))
}
promise, err := producer.Produce(c.GetContext(), req)
if err != nil {
return containers.NewReadyPromise([]common.Hash{}, fmt.Errorf("error producing input: %w", err))
}
return promise
}

func (c *BoldValidationClient) Start(ctx_in context.Context) error {
if err := c.Initialize(ctx_in, c.wasmModuleRoots); err != nil {
return err
}
for _, p := range c.producers {
p.Start(ctx_in)
}
c.StopWaiter.Start(ctx_in, c)
return nil
}

func (c *BoldValidationClient) Stop() {
for _, p := range c.producers {
p.StopAndWait()
}
c.StopWaiter.StopAndWait()
}

type BoldExecutionClient struct {
*BoldValidationClient
}

func NewBoldExecutionClient(config rpcclient.ClientConfigFetcher, redisBoldValidationClientConfig *redis.ValidationClientConfig, stack *node.Node) *BoldExecutionClient {
var validationClient *BoldValidationClient
if redisBoldValidationClientConfig != nil && redisBoldValidationClientConfig.Enabled() {
var err error
validationClient, err = NewBoldValidationClient(config, redisBoldValidationClientConfig, stack)
if err != nil {
log.Error("Creating new redis bold validation client", "error", err)
}
}
return &BoldExecutionClient{
BoldValidationClient: validationClient,
}
}

func (c *BoldExecutionClient) CreateExecutionRun(
wasmModuleRoot common.Hash,
input *validator.ValidationInput,
useBoldMachine bool,
) containers.PromiseInterface[validator.ExecutionRun] {
return stopwaiter.LaunchPromiseThread(c, func(ctx context.Context) (validator.ExecutionRun, error) {
var res uint64
err := c.client.CallContext(ctx, &res, server_api.Namespace+"_createExecutionRun", wasmModuleRoot, server_api.ValidationInputToJson(input), useBoldMachine)
if err != nil {
return nil, err
}
run := &ExecutionClientRun{
wasmModuleRoot: wasmModuleRoot,
boldClient: c,
client: &ExecutionClient{ValidationClient: c.BoldValidationClient.ValidationClient},
id: res,
input: input,
}
run.Start(c.GetContext()) // note: not this temporary thread's context!
return run, nil
})
}

func (c *BoldExecutionClient) LatestWasmModuleRoot() containers.PromiseInterface[common.Hash] {
return stopwaiter.LaunchPromiseThread[common.Hash](c, func(ctx context.Context) (common.Hash, error) {
var res common.Hash
err := c.client.CallContext(ctx, &res, server_api.Namespace+"_latestWasmModuleRoot")
if err != nil {
return common.Hash{}, err
}
return res, nil
})
}

0 comments on commit a9ab22e

Please sign in to comment.