Skip to content

Commit

Permalink
Perf: cache the operator state
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Oct 15, 2024
1 parent 841e0db commit 751e0dc
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -13,12 +14,15 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/wealdtech/go-merkletree/v2"
grpc_metadata "google.golang.org/grpc/metadata"
)

const encodingInterval = 2 * time.Second

const operatorStateCacheSize = 32

var errNoEncodedResults = errors.New("no encoded results")

type EncodedSizeNotifier struct {
Expand Down Expand Up @@ -77,6 +81,8 @@ type EncodingStreamer struct {

// Used to keep track of the last evaluated key for fetching metadatas
exclusiveStartKey *disperser.BlobStoreExclusiveStartKey

operatorStateCache *lru.Cache[string, any]
}

type batch struct {
Expand Down Expand Up @@ -110,6 +116,10 @@ func NewEncodingStreamer(
if config.EncodingQueueLimit <= 0 {
return nil, errors.New("EncodingQueueLimit should be greater than 0")
}
operatorStateCache, err := lru.New[string, any](operatorStateCacheSize)
if err != nil {
return nil, err
}
return &EncodingStreamer{
StreamerConfig: config,
EncodedBlobstore: newEncodedBlobStore(logger),
Expand All @@ -125,6 +135,7 @@ func NewEncodingStreamer(
batcherMetrics: batcherMetrics,
logger: logger.With("component", "EncodingStreamer"),
exclusiveStartKey: nil,
operatorStateCache: operatorStateCache,
}, nil
}

Expand Down Expand Up @@ -669,11 +680,20 @@ func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*di
i++
}

cacheKey := computeCacheKey(blockNumber, quorumIds)
if val, ok := e.operatorStateCache.Get(cacheKey); ok {
state, ok := val.(*core.IndexedOperatorState)
if !ok {
return nil, errors.New("cached value is not of type *core.IndexedOperatorState")
}
return state, nil
}
// GetIndexedOperatorState should return state for valid quorums only
state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds)
if err != nil {
return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err)
}
e.operatorStateCache.Add(cacheKey, state)
return state, nil
}

Expand All @@ -699,3 +719,12 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe
}
return validMetadata
}

func computeCacheKey(id uint, slice []uint8) string {
parts := make([]string, len(slice)+1)
parts[0] = strconv.FormatUint(uint64(id), 10)
for i, v := range slice {
parts[i+1] = strconv.FormatUint(uint64(v), 10)
}
return strings.Join(parts, "_")
}

0 comments on commit 751e0dc

Please sign in to comment.