From f18ad890c6c8024dfbf4a947e807c72e9dad2d1a Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 11 Jul 2024 18:00:25 -0700 Subject: [PATCH] minibatch store interface --- disperser/batcher/inmem/minibatch_store.go | 102 ++++++++++++++++++ .../batcher/inmem/minibatch_store_test.go | 88 +++++++++++++++ disperser/batcher/minibatch_store.go | 55 ++++++++++ 3 files changed, 245 insertions(+) create mode 100644 disperser/batcher/inmem/minibatch_store.go create mode 100644 disperser/batcher/inmem/minibatch_store_test.go create mode 100644 disperser/batcher/minibatch_store.go diff --git a/disperser/batcher/inmem/minibatch_store.go b/disperser/batcher/inmem/minibatch_store.go new file mode 100644 index 0000000000..37bf3a3324 --- /dev/null +++ b/disperser/batcher/inmem/minibatch_store.go @@ -0,0 +1,102 @@ +package inmem + +import ( + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigensdk-go/logging" +) + +type minibatchStore struct { + // BatchRecords maps date string to a map from batch IDs to batch records + BatchRecords map[string]map[batcher.BatchID]*batcher.BatchRecord + // MinibatchRecords maps batch IDs to a map from minibatch indices to minibatch records + MinibatchRecords map[batcher.BatchID]map[uint]*batcher.MinibatchRecord + // DispersalRequests maps batch IDs to a map from minibatch indices to dispersal requests + DispersalRequests map[batcher.BatchID]map[uint]*batcher.DispersalRequest + // DispersalResponses maps batch IDs to a map from minibatch indices to dispersal responses + DispersalResponses map[batcher.BatchID]map[uint]*batcher.DispersalResponse + + logger logging.Logger +} + +var _ batcher.MinibatchStore = (*minibatchStore)(nil) + +func NewMinibatchStore(logger logging.Logger) batcher.MinibatchStore { + return &minibatchStore{ + BatchRecords: make(map[string]map[batcher.BatchID]*batcher.BatchRecord), + MinibatchRecords: make(map[batcher.BatchID]map[uint]*batcher.MinibatchRecord), + DispersalRequests: make(map[batcher.BatchID]map[uint]*batcher.DispersalRequest), + DispersalResponses: make(map[batcher.BatchID]map[uint]*batcher.DispersalResponse), + + logger: logger, + } +} + +func (m *minibatchStore) PutBatch(batch *batcher.BatchRecord) error { + if _, ok := m.BatchRecords[batch.Date]; !ok { + m.BatchRecords[batch.Date] = make(map[batcher.BatchID]*batcher.BatchRecord) + } + m.BatchRecords[batch.Date][batch.BatchID] = batch + + return nil +} + +func (m *minibatchStore) GetBatch(date string, batchID batcher.BatchID) (*batcher.BatchRecord, error) { + if _, ok := m.BatchRecords[date]; !ok { + return nil, nil + } + return m.BatchRecords[date][batchID], nil +} + +func (m *minibatchStore) PutMiniBatch(minibatch *batcher.MinibatchRecord) error { + if _, ok := m.MinibatchRecords[minibatch.BatchID]; !ok { + m.MinibatchRecords[minibatch.BatchID] = make(map[uint]*batcher.MinibatchRecord) + } + m.MinibatchRecords[minibatch.BatchID][minibatch.MinibatchIndex] = minibatch + + return nil +} + +func (m *minibatchStore) GetMiniBatch(batchID batcher.BatchID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { + if _, ok := m.MinibatchRecords[batchID]; !ok { + return nil, nil + } + return m.MinibatchRecords[batchID][minibatchIndex], nil +} + +func (m *minibatchStore) PutDispersalRequest(request *batcher.DispersalRequest) error { + if _, ok := m.DispersalRequests[request.BatchID]; !ok { + m.DispersalRequests[request.BatchID] = make(map[uint]*batcher.DispersalRequest) + } + m.DispersalRequests[request.BatchID][request.MinibatchIndex] = request + + return nil +} + +func (m *minibatchStore) GetDispersalRequest(batchID batcher.BatchID, minibatchIndex uint) (*batcher.DispersalRequest, error) { + if _, ok := m.DispersalRequests[batchID]; !ok { + return nil, nil + } + + return m.DispersalRequests[batchID][minibatchIndex], nil +} + +func (m *minibatchStore) PutDispersalResponse(response *batcher.DispersalResponse) error { + if _, ok := m.DispersalResponses[response.BatchID]; !ok { + m.DispersalResponses[response.BatchID] = make(map[uint]*batcher.DispersalResponse) + } + m.DispersalResponses[response.BatchID][response.MinibatchIndex] = response + + return nil +} + +func (m *minibatchStore) GetDispersalResponse(batchID batcher.BatchID, minibatchIndex uint) (*batcher.DispersalResponse, error) { + if _, ok := m.DispersalResponses[batchID]; !ok { + return nil, nil + } + + return m.DispersalResponses[batchID][minibatchIndex], nil +} + +func (m *minibatchStore) GetPendingBatch() (*batcher.BatchRecord, error) { + return nil, nil +} diff --git a/disperser/batcher/inmem/minibatch_store_test.go b/disperser/batcher/inmem/minibatch_store_test.go new file mode 100644 index 0000000000..97e94c2ae8 --- /dev/null +++ b/disperser/batcher/inmem/minibatch_store_test.go @@ -0,0 +1,88 @@ +package inmem_test + +import ( + "testing" + "time" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser/batcher" + "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" + gcommon "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func newMinibatchStore() batcher.MinibatchStore { + return inmem.NewMinibatchStore(nil) +} + +func TestPutBatch(t *testing.T) { + s := newMinibatchStore() + + batch := &batcher.BatchRecord{ + BatchID: batcher.BatchID(time.Now().UTC()), + Date: "2021-01-01", + ReferenceBlockNumber: 1, + HeaderHash: [32]byte{1}, + AggregatePubKey: nil, + AggregateSignature: nil, + } + s.PutBatch(batch) + b, err := s.GetBatch("2021-01-01", batch.BatchID) + assert.NoError(t, err) + assert.Equal(t, batch, b) +} + +func TestPutMiniBatch(t *testing.T) { + s := newMinibatchStore() + + minibatch := &batcher.MinibatchRecord{ + BatchID: batcher.BatchID(time.Now().UTC()), + MinibatchIndex: 12, + BlobHeaderHashes: [][32]byte{{1}}, + BatchSize: 1, + ReferenceBlockNumber: 1, + } + s.PutMiniBatch(minibatch) + m, err := s.GetMiniBatch(minibatch.BatchID, minibatch.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, minibatch, m) +} + +func TestPutDispersalRequest(t *testing.T) { + s := newMinibatchStore() + + request := &batcher.DispersalRequest{ + BatchID: batcher.BatchID(time.Now().UTC()), + MinibatchIndex: 0, + OperatorID: core.OperatorID([32]byte{1}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + } + s.PutDispersalRequest(request) + r, err := s.GetDispersalRequest(request.BatchID, request.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, request, r) +} + +func TestPutDispersalResponse(t *testing.T) { + s := newMinibatchStore() + + response := &batcher.DispersalResponse{ + DispersalRequest: batcher.DispersalRequest{ + BatchID: batcher.BatchID(time.Now().UTC()), + MinibatchIndex: 0, + OperatorID: core.OperatorID([32]byte{1}), + OperatorAddress: gcommon.HexToAddress("0x0"), + NumBlobs: 1, + RequestedAt: time.Now().UTC(), + }, + Signatures: nil, + RespondedAt: time.Now().UTC(), + Error: nil, + } + s.PutDispersalResponse(response) + r, err := s.GetDispersalResponse(response.BatchID, response.MinibatchIndex) + assert.NoError(t, err) + assert.Equal(t, response, r) +} diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go new file mode 100644 index 0000000000..000f9887bf --- /dev/null +++ b/disperser/batcher/minibatch_store.go @@ -0,0 +1,55 @@ +package batcher + +import ( + "time" + + "github.com/Layr-Labs/eigenda/core" + gcommon "github.com/ethereum/go-ethereum/common" +) + +type BatchID time.Time + +type BatchRecord struct { + BatchID + Date string + ReferenceBlockNumber uint + HeaderHash [32]byte + AggregatePubKey *core.G2Point + AggregateSignature *core.Signature +} + +type MinibatchRecord struct { + BatchID + MinibatchIndex uint + BlobHeaderHashes [][32]byte + BatchSize uint64 // in bytes + ReferenceBlockNumber uint +} + +type DispersalRequest struct { + BatchID + MinibatchIndex uint + core.OperatorID + OperatorAddress gcommon.Address + NumBlobs uint + RequestedAt time.Time +} + +type DispersalResponse struct { + DispersalRequest + Signatures []*core.Signature + RespondedAt time.Time + Error error +} + +type MinibatchStore interface { + PutBatch(batch *BatchRecord) error + GetBatch(date string, batchID BatchID) (*BatchRecord, error) + PutMiniBatch(minibatch *MinibatchRecord) error + GetMiniBatch(batchID BatchID, minibatchIndex uint) (*MinibatchRecord, error) + PutDispersalRequest(request *DispersalRequest) error + GetDispersalRequest(batchID BatchID, minibatchIndex uint) (*DispersalRequest, error) + PutDispersalResponse(response *DispersalResponse) error + GetDispersalResponse(batchID BatchID, minibatchIndex uint) (*DispersalResponse, error) + GetPendingBatch() (*BatchRecord, error) +}