diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index c17142a35..7c9bada10 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "sync" "time" @@ -13,12 +14,15 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + lru "github.com/hashicorp/golang-lru/v2" "github.com/wealdtech/go-merkletree/v2" grpc_metadata "google.golang.org/grpc/metadata" ) const encodingInterval = 2 * time.Second +const operatorStateCacheSize = 32 + var errNoEncodedResults = errors.New("no encoded results") type EncodedSizeNotifier struct { @@ -77,6 +81,8 @@ type EncodingStreamer struct { // Used to keep track of the last evaluated key for fetching metadatas exclusiveStartKey *disperser.BlobStoreExclusiveStartKey + + operatorStateCache *lru.Cache[string, any] } type batch struct { @@ -110,6 +116,10 @@ func NewEncodingStreamer( if config.EncodingQueueLimit <= 0 { return nil, errors.New("EncodingQueueLimit should be greater than 0") } + operatorStateCache, err := lru.New[string, any](operatorStateCacheSize) + if err != nil { + return nil, err + } return &EncodingStreamer{ StreamerConfig: config, EncodedBlobstore: newEncodedBlobStore(logger), @@ -125,6 +135,7 @@ func NewEncodingStreamer( batcherMetrics: batcherMetrics, logger: logger.With("component", "EncodingStreamer"), exclusiveStartKey: nil, + operatorStateCache: operatorStateCache, }, nil } @@ -669,11 +680,20 @@ func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*di i++ } + cacheKey := computeCacheKey(blockNumber, quorumIds) + if val, ok := e.operatorStateCache.Get(cacheKey); ok { + state, ok := val.(*core.IndexedOperatorState) + if !ok { + return nil, errors.New("cached value is not of type *core.IndexedOperatorState") + } + return state, nil + } // GetIndexedOperatorState should return state for valid quorums only state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds) if err != nil { return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err) } + e.operatorStateCache.Add(cacheKey, state) return state, nil } @@ -699,3 +719,12 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe } return validMetadata } + +func computeCacheKey(id uint, slice []uint8) string { + parts := make([]string, len(slice)+1) + parts[0] = strconv.FormatUint(uint64(id), 10) + for i, v := range slice { + parts[i+1] = strconv.FormatUint(uint64(v), 10) + } + return strings.Join(parts, "_") +}