Skip to content

Commit

Permalink
fix encoding queue limit test
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 22, 2023
1 parent 3e2da27 commit ea90857
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 60 deletions.
52 changes: 52 additions & 0 deletions common/mock/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/common"
"github.com/stretchr/testify/mock"
)

type MockWorkerpool struct {
mock.Mock
}

var _ common.WorkerPool = (*MockWorkerpool)(nil)

func (mock *MockWorkerpool) Size() int {
args := mock.Called()
result := args.Get(0)
return result.(int)
}

func (mock *MockWorkerpool) Stop() {
mock.Called()
}

func (mock *MockWorkerpool) StopWait() {
mock.Called()
}

func (mock *MockWorkerpool) Stopped() bool {
args := mock.Called()
result := args.Get(0)
return result.(bool)
}

func (mock *MockWorkerpool) Submit(task func()) {
mock.Called(task)
}

func (mock *MockWorkerpool) SubmitWait(task func()) {
mock.Called(task)
}

func (mock *MockWorkerpool) WaitingQueueSize() int {
args := mock.Called()
result := args.Get(0)
return result.(int)
}

func (mock *MockWorkerpool) Pause(ctx context.Context) {
mock.Called(ctx)
}
14 changes: 14 additions & 0 deletions common/workerpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package common

import "context"

type WorkerPool interface {
Size() int
Stop()
StopWait()
Stopped() bool
Submit(task func())
SubmitWait(task func())
WaitingQueueSize() int
Pause(ctx context.Context)
}
5 changes: 3 additions & 2 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gammazero/workerpool"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/wealdtech/go-merkletree"
Expand Down Expand Up @@ -97,9 +98,9 @@ func NewBatcher(
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
PoolSize: config.NumConnections,
}
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, logger)
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, logger)
if err != nil {
return nil, err
}
Expand Down
9 changes: 3 additions & 6 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/gammazero/workerpool"
"github.com/wealdtech/go-merkletree"
)

Expand Down Expand Up @@ -38,9 +37,6 @@ type StreamerConfig struct {

// EncodingQueueLimit is the maximum number of encoding requests that can be queued
EncodingQueueLimit int

// PoolSize is the number of workers in the worker pool
PoolSize int
}

type EncodingStreamer struct {
Expand All @@ -50,7 +46,7 @@ type EncodingStreamer struct {

EncodedBlobstore *encodedBlobStore
ReferenceBlockNumber uint
Pool *workerpool.WorkerPool
Pool common.WorkerPool
EncodedSizeNotifier *EncodedSizeNotifier

blobStore disperser.BlobStore
Expand Down Expand Up @@ -92,6 +88,7 @@ func NewEncodingStreamer(
encoderClient disperser.EncoderClient,
assignmentCoordinator core.AssignmentCoordinator,
encodedSizeNotifier *EncodedSizeNotifier,
workerPool common.WorkerPool,
logger common.Logger) (*EncodingStreamer, error) {
if config.EncodingQueueLimit <= 0 {
return nil, fmt.Errorf("EncodingQueueLimit should be greater than 0")
Expand All @@ -100,7 +97,7 @@ func NewEncodingStreamer(
StreamerConfig: config,
EncodedBlobstore: newEncodedBlobStore(logger),
ReferenceBlockNumber: uint(0),
Pool: workerpool.New(config.PoolSize),
Pool: workerPool,
EncodedSizeNotifier: encodedSizeNotifier,
blobStore: blobStore,
chainState: chainState,
Expand Down
91 changes: 39 additions & 52 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/common/inmem"
"github.com/Layr-Labs/eigenda/disperser/mock"
"github.com/gammazero/workerpool"
"github.com/stretchr/testify/assert"
tmock "github.com/stretchr/testify/mock"
)
Expand All @@ -23,7 +24,6 @@ var (
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
PoolSize: 5,
}
)

Expand All @@ -45,8 +45,8 @@ func createEncodingStreamer(t *testing.T, initialBlockNumber uint, batchThreshol
encoderClient := disperser.NewLocalEncoderClient(enc)
asgn := &core.StdAssignmentCoordinator{}
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), batchThreshold)

encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, logger)
workerpool := workerpool.New(5)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = initialBlockNumber

Expand All @@ -63,12 +63,11 @@ func TestEncodingQueueLimit(t *testing.T) {
cst, err := coremock.NewChainDataMock(numOperators)
assert.Nil(t, err)
encoderClient := mock.NewMockEncoderClient()
wait := make(chan time.Time)
encoderClient.On("EncodeBlob", tmock.Anything, tmock.Anything, tmock.Anything).WaitUntil(wait).Return(nil, nil, nil)
encoderClient.On("EncodeBlob", tmock.Anything, tmock.Anything, tmock.Anything).Return(nil, nil, nil)
asgn := &core.StdAssignmentCoordinator{}
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 100000)

encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, logger)
pool := &cmock.MockWorkerpool{}
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, pool, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = 10

Expand All @@ -77,59 +76,54 @@ func TestEncodingQueueLimit(t *testing.T) {
AdversaryThreshold: 80,
QuorumThreshold: 100,
}}
blob1Data := []byte{1, 2, 3, 4, 5}
blob1 := core.Blob{
blobData := []byte{1, 2, 3, 4, 5}
blob := core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
},
Data: blob1Data,
Data: blobData,
}

pool.On("Submit", tmock.Anything).Run(func(args tmock.Arguments) {
args.Get(0).(func())()
})

// assume that encoding queue is already full
pool.On("WaitingQueueSize").Return(streamerConfig.EncodingQueueLimit).Once()

ctx := context.Background()
key1, err := blobStore.StoreBlob(ctx, &blob1, uint64(time.Now().UnixNano()))
key, err := blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
out := make(chan batcher.EncodingResultOrStatus)
out := make(chan batcher.EncodingResultOrStatus, 1)
// This should return without making a request since encoding queue was already full
err = encodingStreamer.RequestEncoding(context.Background(), out)
assert.Nil(t, err)

blob2Data := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}
blob2 := core.Blob{
RequestHeader: core.BlobRequestHeader{
SecurityParams: securityParams,
},
Data: blob2Data,
encoderClient.AssertNotCalled(t, "EncodeBlob")
select {
case <-out:
t.Fatal("did not expect any encoding results")
default:
}
key2, err := blobStore.StoreBlob(ctx, &blob2, uint64(time.Now().UnixNano()))
assert.Nil(t, err)
// EncodeBlob still running, so this should return without making a request
err = encodingStreamer.RequestEncoding(context.Background(), out)
assert.Nil(t, err)

// EncodeBlob call returns
wait <- time.Now()
// second blob should not have been encoded
encoderClient.AssertNumberOfCalls(t, "EncodeBlob", 1)
encoderClient.AssertCalled(t, "EncodeBlob", tmock.Anything, blob1Data, tmock.Anything)
encoderClient.AssertNotCalled(t, "EncodeBlob", tmock.Anything, blob2Data, tmock.Anything)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
assert.Nil(t, err)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(key1, 0)
assert.Nil(t, err)
assert.NotNil(t, res)
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(key2, 0)
assert.NotNil(t, err)
assert.Nil(t, res)
// assume that encoding queue opens up
pool.On("WaitingQueueSize").Return(0).Once()

// retry
err = encodingStreamer.RequestEncoding(context.Background(), out)
assert.Nil(t, err)
wait <- time.Now()

encoderClient.AssertNumberOfCalls(t, "EncodeBlob", 2)
encoderClient.AssertCalled(t, "EncodeBlob", tmock.Anything, blob2Data, tmock.Anything)
err = encodingStreamer.ProcessEncodedBlobs(context.Background(), <-out)
encoderClient.AssertNumberOfCalls(t, "EncodeBlob", 1)
encoderClient.AssertCalled(t, "EncodeBlob", tmock.Anything, blobData, tmock.Anything)
var encodingResult batcher.EncodingResultOrStatus
select {
case encodingResult = <-out:
default:
t.Fatal("did not expect any encoding results")
}

err = encodingStreamer.ProcessEncodedBlobs(context.Background(), encodingResult)
assert.Nil(t, err)
res, err = encodingStreamer.EncodedBlobstore.GetEncodingResult(key2, 0)
res, err := encodingStreamer.EncodedBlobstore.GetEncodingResult(key, 0)
assert.Nil(t, err)
assert.NotNil(t, res)
}
Expand Down Expand Up @@ -290,14 +284,8 @@ func TestEncodingFailure(t *testing.T) {
encoderClient := mock.NewMockEncoderClient()
asgn := &core.StdAssignmentCoordinator{}
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12)
streamerConfig := batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
PoolSize: 5,
}

encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, logger)
workerpool := workerpool.New(5)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = 10

Expand Down Expand Up @@ -474,7 +462,6 @@ func TestIncorrectParameters(t *testing.T) {
SRSOrder: 3000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
PoolSize: 5,
}

encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig)
Expand Down

0 comments on commit ea90857

Please sign in to comment.