Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk store #848

Merged
merged 13 commits into from
Nov 5, 2024
62 changes: 62 additions & 0 deletions encoding/rs/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package rs

import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"

"github.com/consensys/gnark-crypto/ecc/bn254/fr"
)
Expand All @@ -13,6 +15,7 @@ type Frame struct {
Coeffs []fr.Element
}

// Encode serializes the frame into a byte slice.
func (f *Frame) Encode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand All @@ -23,6 +26,7 @@ func (f *Frame) Encode() ([]byte, error) {
return buf.Bytes(), nil
}

// Decode deserializes a byte slice into a frame.
func Decode(b []byte) (Frame, error) {
var f Frame
buf := bytes.NewBuffer(b)
Expand All @@ -33,3 +37,61 @@ func Decode(b []byte) (Frame, error) {
}
return f, nil
}

// EncodeFrames serializes a slice of frames into a byte slice.
func EncodeFrames(frames []*Frame) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use existing serialization methods like this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From split encoder perspective we want to serialize the proofs and the coefficients separately. From node perspective we could make sure it receives the chunks in the expected serialized format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmanc requested that the chunk store had the capability of uploading the encoding.Proof objects and rs.Frame objects separately. The code you point to is capable of serializing both at the same time, but does provide a way to serialize/deserialize them separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. How about this one?


// Serialization format:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this to above the function so it shows up in the go docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// GnarkEncodeFrames serializes a slice of frames into a byte slice.
//
// Serialization format:
// [number of frames: 4 byte uint32]
// [size of frame 1: 4 byte uint32][frame 1]
// [size of frame 2: 4 byte uint32][frame 2]
// ...
// [size of frame n: 4 byte uint32][frame n]
//
// Where relevant, big endian encoding is used.
func GnarkEncodeFrames(frames []*Frame) ([]byte, error) {

// [number of frames: 4 byte uint32]
// [size of frame 1: 4 byte uint32][frame 1]
// [size of frame 2: 4 byte uint32][frame 2]
// ...
// [size of frame n: 4 byte uint32][frame n]

encodedSize := 4
encodedFrames := make([][]byte, len(frames))

for i, frame := range frames {
encodedSize += 4
encodedFrame, err := frame.Encode()
if err != nil {
return nil, err
}
encodedFrames[i] = encodedFrame
encodedSize += len(encodedFrame)
}

serializedBytes := make([]byte, encodedSize)
binary.BigEndian.PutUint32(serializedBytes, uint32(len(frames)))
index := 4

for _, frameBytes := range encodedFrames {
binary.BigEndian.PutUint32(serializedBytes[index:], uint32(len(frameBytes)))
index += 4
copy(serializedBytes[index:], frameBytes)
index += len(frameBytes)
}

return serializedBytes, nil
}

// DecodeFrames deserializes a byte slice into a slice of frames.
func DecodeFrames(serializedFrames []byte) ([]*Frame, error) {
frameCount := binary.BigEndian.Uint32(serializedFrames)
index := 4

frames := make([]*Frame, frameCount)

for i := 0; i < int(frameCount); i++ {
frameSize := binary.BigEndian.Uint32(serializedFrames[index:])
index += 4
frame, err := Decode(serializedFrames[index : index+int(frameSize)])
if err != nil {
return nil, fmt.Errorf("failed to decode frame %d: %w", i, err)
}
frames[i] = &frame
index += int(frameSize)
}

return frames, nil
}
38 changes: 38 additions & 0 deletions encoding/rs/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,41 @@ func TestEncodeDecodeFrame_AreInverses(t *testing.T) {

assert.Equal(t, frame, frames[0])
}

func TestEncodeDecodeFrames_AreInverses(t *testing.T) {
teardownSuite := setupSuite(t)
defer teardownSuite(t)

params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES)))
enc, _ := rs.NewEncoder(params, true)

n := uint8(math.Log2(float64(enc.NumEvaluations())))
if enc.ChunkLength == 1 {
n = uint8(math.Log2(float64(2 * enc.NumChunks)))
}
fs := fft.NewFFTSettings(n)

RsComputeDevice := &rs_cpu.RsCpuComputeDevice{
Fs: fs,
EncodingParams: params,
}

enc.Computer = RsComputeDevice
require.NotNil(t, enc)

frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES)
assert.NoError(t, err)

framesPointers := make([]*rs.Frame, len(frames))
for i, frame := range frames {
framesPointers[i] = &frame
}

encodedFrames, err := rs.EncodeFrames(framesPointers)
assert.NoError(t, err)

decodedFrames, err := rs.DecodeFrames(encodedFrames)
assert.NoError(t, err)

assert.Equal(t, framesPointers, decodedFrames)
}
2 changes: 0 additions & 2 deletions inabox/deploy/localstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"path/filepath"
"runtime"
"runtime/debug"
"time"

"github.com/Layr-Labs/eigenda/common/aws"
Expand All @@ -21,7 +20,6 @@ import (
)

func StartDockertestWithLocalstackContainer(localStackPort string) (*dockertest.Pool, *dockertest.Resource, error) {
debug.PrintStack() // TODO do not merge
fmt.Println("Starting Localstack container")
pool, err := dockertest.NewPool("")
if err != nil {
Expand Down
110 changes: 110 additions & 0 deletions relay/chunkstore/chunk_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package chunkstore

Check failure on line 1 in relay/chunkstore/chunk_reader.go

View workflow job for this annotation

GitHub Actions / Linter

: # github.com/Layr-Labs/eigenda/relay/chunkstore [github.com/Layr-Labs/eigenda/relay/chunkstore.test]

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
)

// ChunkReader reads chunks written by ChunkWriter.
type ChunkReader interface {
// GetChunkProofs reads a slice of proofs from the chunk store.
GetChunkProofs(ctx context.Context, blobKey disperser.BlobKey) ([]*encoding.Proof, error)
// GetChunkCoefficients reads a slice of frames from the chunk store. The metadata parameter
// should match the metadata returned by PutChunkCoefficients.
GetChunkCoefficients(
ctx context.Context,
blobKey disperser.BlobKey) ([]*rs.Frame, error)
}

var _ ChunkReader = (*chunkReader)(nil)

type chunkReader struct {
logger logging.Logger
metadataStore *blobstore.BlobMetadataStore
client s3.Client
bucket string
shards []uint32
}

// NewChunkReader creates a new ChunkReader.
//
// This chunk reader will only return data for the shards specified in the shards parameter.
// If empty, it will return data for all shards. (Note: shard feature is not yet implemented.)
func NewChunkReader(
logger logging.Logger,
metadataStore *blobstore.BlobMetadataStore,
s3Client s3.Client,
bucketName string,
shards []uint32) ChunkReader {

return &chunkReader{
logger: logger,
metadataStore: metadataStore,
client: s3Client,
bucket: bucketName,
shards: shards,
}
}

func (r *chunkReader) GetChunkProofs(
ctx context.Context,
blobKey disperser.BlobKey) ([]*encoding.Proof, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this blobkey references the v1 blob key. In StoreBlob for V2 we use blobKey.Hex() = string

func (b *BlobStore) StoreBlob(ctx context.Context, blobKey string, data []byte) error {
.

V2 blob key:

type BlobKey [32]byte

func (b BlobKey) Hex() string {
	return hex.EncodeToString(b[:])
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also when we fetch for proofs vs coefficients don't we need a different S3 key to differentiate it?

Copy link
Contributor Author

@cody-littley cody-littley Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been assuming we'd use different buckets. Started a slack conversation to discuss. Will circle back on this prior to merging once we decide how we want to handle buckets and namespacing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched over to using v2.BlobKey as recommended by @ian-shim.


s3Key := blobKey.String()

bytes, err := r.client.DownloadObject(ctx, r.bucket, s3Key)
if err != nil {
r.logger.Error("Failed to download chunks from S3: %v", err)
return nil, fmt.Errorf("failed to download chunks from S3: %w", err)
}

if len(bytes)%bn254.SizeOfG1AffineCompressed != 0 {
r.logger.Error("Invalid proof size")
return nil, fmt.Errorf("invalid proof size: %w", err)
}

proofCount := len(bytes) / bn254.SizeOfG1AffineCompressed
proofs := make([]*encoding.Proof, proofCount)

for i := 0; i < proofCount; i++ {
proof := encoding.Proof{}
err := proof.Unmarshal(bytes[i*bn254.SizeOfG1AffineCompressed:])
if err != nil {
r.logger.Error("Failed to unmarshal proof: %v", err)
return nil, fmt.Errorf("failed to unmarshal proof: %w", err)
}
proofs[i] = &proof
}

return proofs, nil
}

func (r *chunkReader) GetChunkCoefficients(
ctx context.Context,
blobKey disperser.BlobKey) ([]*rs.Frame, error) {

s3Key := blobKey.String()

bytes, err := r.client.DownloadObject(ctx, r.bucket, s3Key)
// TODO: Implement fragmented download
//bytes, err := r.client.FragmentedDownloadObject(ctx, r.bucket, s3Key, metadata.DataSize, metadata.FragmentSize)
if err != nil {
r.logger.Error("Failed to download chunks from S3: %v", err)
return nil, fmt.Errorf("failed to download chunks from S3: %w", err)
}

frames, err := rs.DecodeFrames(bytes)
if err != nil {
r.logger.Error("Failed to decode frames: %v", err)
return nil, fmt.Errorf("failed to decode frames: %w", err)
}

return frames, nil
}
Loading
Loading