diff --git a/tools/traffic/table/blob_metadata.go b/tools/traffic/table/blob_metadata.go new file mode 100644 index 0000000000..6307718b7a --- /dev/null +++ b/tools/traffic/table/blob_metadata.go @@ -0,0 +1,44 @@ +package table + +import "errors" + +// BlobMetadata encapsulates various information about a blob written by the traffic generator. +type BlobMetadata struct { + // Key of the blob, set when the blob is initially uploaded. + Key []byte + + // BlobIndex of the blob. + BlobIndex uint + + // Checksum of the blob. + Checksum [16]byte + + // Size of the blob, in bytes. + Size uint + + // RemainingReadPermits describes the maximum number of remaining reads permitted against this blob. + // If -1 then an unlimited number of reads are permitted. + RemainingReadPermits int +} + +// NewBlobMetadata creates a new BlobMetadata instance. The readPermits parameter describes the maximum number of +// remaining reads permitted against this blob. If -1 then an unlimited number of reads are permitted. +func NewBlobMetadata( + key []byte, + checksum [16]byte, + size uint, + blobIndex uint, + readPermits int) (*BlobMetadata, error) { + + if readPermits == 0 { + return nil, errors.New("read permits must not be zero") + } + + return &BlobMetadata{ + Key: key, + Checksum: checksum, + Size: size, + BlobIndex: blobIndex, + RemainingReadPermits: readPermits, + }, nil +} diff --git a/tools/traffic/table/blob_store.go b/tools/traffic/table/blob_store.go new file mode 100644 index 0000000000..fa28e03fcb --- /dev/null +++ b/tools/traffic/table/blob_store.go @@ -0,0 +1,62 @@ +package table + +import "sync" + +// BlobStore is a thread safe data structure that tracks blobs written by the traffic generator. +type BlobStore struct { + + // blobs contains all blobs currently tracked by the store. + blobs map[uint64]*BlobMetadata + + // nextKey describes the next key to used for the blobs map. + nextKey uint64 + + lock sync.Mutex +} + +// NewBlobStore creates a new BlobStore instance. +func NewBlobStore() *BlobStore { + return &BlobStore{ + blobs: make(map[uint64]*BlobMetadata), + nextKey: 0, + } +} + +// Add a blob to the store. +func (store *BlobStore) Add(blob *BlobMetadata) { + store.lock.Lock() + defer store.lock.Unlock() + + store.blobs[store.nextKey] = blob + store.nextKey++ +} + +// GetNext returns the next blob in the store. Decrements the blob's read permits, removing it +// from the store if the permits are exhausted. This method makes no guarantees about the order +// in which blobs are returned. Returns nil if no blobs are available. +func (store *BlobStore) GetNext() *BlobMetadata { + store.lock.Lock() + defer store.lock.Unlock() + + for key, blob := range store.blobs { + // Always return the first blob found. + + if blob.RemainingReadPermits > 0 { + blob.RemainingReadPermits-- + if blob.RemainingReadPermits == 0 { + delete(store.blobs, key) + } + } + + return blob + } + return nil +} + +// Size returns the number of blobs currently stored. +func (store *BlobStore) Size() uint { + store.lock.Lock() + defer store.lock.Unlock() + + return uint(len(store.blobs)) +} diff --git a/tools/traffic/table/blob_store_test.go b/tools/traffic/table/blob_store_test.go new file mode 100644 index 0000000000..f56e54b606 --- /dev/null +++ b/tools/traffic/table/blob_store_test.go @@ -0,0 +1,114 @@ +package table + +import ( + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" + "testing" +) + +// randomMetadata generates a random BlobMetadata instance. +func randomMetadata(t *testing.T, permits int) *BlobMetadata { + key := make([]byte, 32) + checksum := [16]byte{} + _, _ = rand.Read(key) + _, _ = rand.Read(checksum[:]) + metadata, err := NewBlobMetadata(key, checksum, 1024, 0, permits) + assert.Nil(t, err) + + return metadata +} + +// TestBasicOperation tests basic operations of the BlobTable. Adds blobs and iterates over them. +func TestBasicOperation(t *testing.T) { + tu.InitializeRandom() + + store := NewBlobStore() + assert.Equal(t, uint(0), store.Size()) + + size := 1024 + expectedMetadata := make([]*BlobMetadata, 0) + for i := 0; i < size; i++ { + metadata := randomMetadata(t, 1) + store.Add(metadata) + expectedMetadata = append(expectedMetadata, metadata) + assert.Equal(t, uint(i+1), store.Size()) + } + + for i := 0; i < size; i++ { + assert.Equal(t, expectedMetadata[i], store.blobs[uint64(i)]) + } +} + +// TestGetRandomWithRemoval tests getting a random blob data, but where the number of permits per blob is unlimited. +func TestGetRandomNoRemoval(t *testing.T) { + tu.InitializeRandom() + + table := NewBlobStore() + assert.Equal(t, uint(0), table.Size()) + + // Requesting a random element from an empty table should return nil. + element := table.GetNext() + assert.Nil(t, element) + + expectedMetadata := make(map[string]*BlobMetadata) + size := 128 + for i := 0; i < size; i++ { + metadata := randomMetadata(t, -1) // -1 == unlimited permits + table.Add(metadata) + expectedMetadata[string(metadata.Key)] = metadata + assert.Equal(t, uint(i+1), table.Size()) + } + + randomIndices := make(map[string]bool) + + // Query more times than the number of blobs to ensure that blobs are not removed. + for i := 0; i < size*8; i++ { + metadata := table.GetNext() + assert.NotNil(t, metadata) + assert.Equal(t, expectedMetadata[string(metadata.Key)], metadata) + randomIndices[string(metadata.Key)] = true + } + + // Sanity check: ensure that at least 10 different blobs were returned. This check is attempting to verify + // that we are actually getting random blobs. The probability of this check failing is extremely low if + // the random number generator is working correctly. + assert.GreaterOrEqual(t, len(randomIndices), 10) +} + +// TestGetRandomWithRemoval tests getting a random blob data, where the number of permits per blob is limited. +func TestGetRandomWithRemoval(t *testing.T) { + tu.InitializeRandom() + + table := NewBlobStore() + assert.Equal(t, uint(0), table.Size()) + + // Requesting a random element from an empty table should return nil. + element := table.GetNext() + assert.Nil(t, element) + + permitCount := 2 + + size := 1024 + expectedMetadata := make(map[string]uint) + for i := 0; i < size; i++ { + metadata := randomMetadata(t, permitCount) + table.Add(metadata) + expectedMetadata[string(metadata.Key)] = 0 + assert.Equal(t, uint(i+1), table.Size()) + } + + // Requesting elements a number of times equal to the size times the number of permits should completely + // drain the table and return all elements a number of times equal to the number of permits. + for i := 0; i < size*permitCount; i++ { + metadata := table.GetNext() + assert.NotNil(t, metadata) + + k := string(metadata.Key) + permitsUsed := expectedMetadata[k] + 1 + expectedMetadata[k] = permitsUsed + assert.LessOrEqual(t, permitsUsed, uint(permitCount)) + } + + assert.Equal(t, uint(0), table.Size()) +}