Skip to content

Commit

Permalink
exp/services/ledgerexporter: refactoring LedgerMetaArchive (stellar#5311
Browse files Browse the repository at this point in the history
)
  • Loading branch information
urvisavla authored May 15, 2024
1 parent 8f8237a commit ae1019c
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 238 deletions.
10 changes: 5 additions & 5 deletions exp/services/ledgerexporter/internal/exportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type ExportManager struct {
config datastore.LedgerBatchConfig
ledgerBackend ledgerbackend.LedgerBackend
currentMetaArchive *datastore.LedgerMetaArchive
currentMetaArchive *LedgerMetaArchive
queue UploadQueue
latestLedgerMetric *prometheus.GaugeVec
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
// Determine the object key for the given ledger sequence
objectKey := e.config.GetObjectKeyFromSequenceNumber(ledgerSeq)

if e.currentMetaArchive != nil && e.currentMetaArchive.GetObjectKey() != objectKey {
if e.currentMetaArchive != nil && e.currentMetaArchive.ObjectKey != objectKey {
return errors.New("Current meta archive object key mismatch")
}
if e.currentMetaArchive == nil {
Expand All @@ -61,14 +61,14 @@ func (e *ExportManager) AddLedgerCloseMeta(ctx context.Context, ledgerCloseMeta
}

// Create a new LedgerMetaArchive and add it to the map.
e.currentMetaArchive = datastore.NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
e.currentMetaArchive = NewLedgerMetaArchive(objectKey, ledgerSeq, endSeq)
}

if err := e.currentMetaArchive.AddLedger(ledgerCloseMeta); err != nil {
if err := e.currentMetaArchive.Data.AddLedger(ledgerCloseMeta); err != nil {
return errors.Wrapf(err, "failed to add ledger %d", ledgerSeq)
}

if ledgerSeq >= e.currentMetaArchive.GetEndLedgerSequence() {
if ledgerSeq >= uint32(e.currentMetaArchive.Data.EndSequence) {
// Current archive is full, send it for upload
if err := e.queue.Enqueue(ctx, e.currentMetaArchive); err != nil {
return err
Expand Down
33 changes: 26 additions & 7 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,27 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/xdr"
)

func createLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta {
return xdr.LedgerCloseMeta{
V: int32(0),
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(ledgerSeq),
},
},
TxSet: xdr.TransactionSet{},
TxProcessing: nil,
UpgradesProcessing: nil,
ScpInfo: nil,
},
V1: nil,
}
}

func TestExporterSuite(t *testing.T) {
suite.Run(t, new(ExportManagerSuite))
}
Expand Down Expand Up @@ -57,7 +76,7 @@ func (s *ExportManagerSuite) TestRun() {
expectedKeys := set.NewSet[string](10)
for i := start; i <= end; i++ {
s.mockBackend.On("GetLedger", s.ctx, i).
Return(datastore.CreateLedgerCloseMeta(i), nil)
Return(createLedgerCloseMeta(i), nil)
key := config.GetObjectKeyFromSequenceNumber(i)
expectedKeys.Add(key)
}
Expand Down Expand Up @@ -104,7 +123,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() {
ctx, cancel := context.WithCancel(context.Background())

s.mockBackend.On("GetLedger", mock.Anything, mock.Anything).
Return(datastore.CreateLedgerCloseMeta(1), nil)
Return(createLedgerCloseMeta(1), nil)

go func() {
<-time.After(time.Second * 1)
Expand Down Expand Up @@ -164,7 +183,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
start := uint32(0)
end := uint32(255)
for i := start; i <= end; i++ {
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(i)))
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(i)))

key := config.GetObjectKeyFromSequenceNumber(i)
expectedKeys.Add(key)
Expand All @@ -188,8 +207,8 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
cancel()
}()

require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, datastore.CreateLedgerCloseMeta(1)))
err = exporter.AddLedgerCloseMeta(ctx, datastore.CreateLedgerCloseMeta(2))
require.NoError(s.T(), exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(1)))
err = exporter.AddLedgerCloseMeta(ctx, createLedgerCloseMeta(2))
require.EqualError(s.T(), err, "context canceled")
}

Expand All @@ -200,7 +219,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() {
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.NoError(s.T(), err)

require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(16)))
require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), datastore.CreateLedgerCloseMeta(21)),
require.NoError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(16)))
require.EqualError(s.T(), exporter.AddLedgerCloseMeta(context.Background(), createLedgerCloseMeta(21)),
"Current meta archive object key mismatch")
}
24 changes: 24 additions & 0 deletions exp/services/ledgerexporter/internal/ledger_meta_archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ledgerexporter

import (
"github.com/stellar/go/xdr"
)

// LedgerMetaArchive represents a file with metadata and binary data.
type LedgerMetaArchive struct {
// file name
ObjectKey string
// Actual binary data
Data xdr.LedgerCloseMetaBatch
}

// NewLedgerMetaArchive creates a new LedgerMetaArchive instance.
func NewLedgerMetaArchive(key string, startSeq uint32, endSeq uint32) *LedgerMetaArchive {
return &LedgerMetaArchive{
ObjectKey: key,
Data: xdr.LedgerCloseMetaBatch{
StartSequence: xdr.Uint32(startSeq),
EndSequence: xdr.Uint32(endSeq),
},
}
}
9 changes: 4 additions & 5 deletions exp/services/ledgerexporter/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/support/datastore"
)

// UploadQueue is a queue of LedgerMetaArchive objects which are scheduled for upload
type UploadQueue struct {
metaArchiveCh chan *datastore.LedgerMetaArchive
metaArchiveCh chan *LedgerMetaArchive
queueLengthMetric prometheus.Gauge
}

Expand All @@ -23,13 +22,13 @@ func NewUploadQueue(size int, prometheusRegistry *prometheus.Registry) UploadQue
})
prometheusRegistry.MustRegister(queueLengthMetric)
return UploadQueue{
metaArchiveCh: make(chan *datastore.LedgerMetaArchive, size),
metaArchiveCh: make(chan *LedgerMetaArchive, size),
queueLengthMetric: queueLengthMetric,
}
}

// Enqueue will add an upload task to the queue. Enqueue may block if the queue is full.
func (u UploadQueue) Enqueue(ctx context.Context, archive *datastore.LedgerMetaArchive) error {
func (u UploadQueue) Enqueue(ctx context.Context, archive *LedgerMetaArchive) error {
u.queueLengthMetric.Inc()
select {
case u.metaArchiveCh <- archive:
Expand All @@ -40,7 +39,7 @@ func (u UploadQueue) Enqueue(ctx context.Context, archive *datastore.LedgerMetaA
}

// Dequeue will pop a task off the queue. Dequeue may block if the queue is empty.
func (u UploadQueue) Dequeue(ctx context.Context) (*datastore.LedgerMetaArchive, bool, error) {
func (u UploadQueue) Dequeue(ctx context.Context) (*LedgerMetaArchive, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
Expand Down
13 changes: 6 additions & 7 deletions exp/services/ledgerexporter/internal/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stellar/go/support/datastore"
"github.com/stretchr/testify/require"
)

Expand All @@ -32,9 +31,9 @@ func getMetricValue(metric prometheus.Metric) *dto.Metric {
func TestQueue(t *testing.T) {
queue := NewUploadQueue(3, prometheus.NewRegistry())

require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 1, 1)))
require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 2, 2)))
require.NoError(t, queue.Enqueue(context.Background(), datastore.NewLedgerMetaArchive("test", 3, 3)))
require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 1, 1)))
require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 2, 2)))
require.NoError(t, queue.Enqueue(context.Background(), NewLedgerMetaArchive("test", 3, 3)))

require.Equal(t, float64(3), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue())
queue.Close()
Expand All @@ -43,19 +42,19 @@ func TestQueue(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, float64(2), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue())
require.Equal(t, uint32(1), l.GetStartLedgerSequence())
require.Equal(t, uint32(1), uint32(l.Data.StartSequence))

l, ok, err = queue.Dequeue(context.Background())
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, float64(1), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue())
require.Equal(t, uint32(2), l.GetStartLedgerSequence())
require.Equal(t, uint32(2), uint32(l.Data.StartSequence))

l, ok, err = queue.Dequeue(context.Background())
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, float64(0), getMetricValue(queue.queueLengthMetric).GetGauge().GetValue())
require.Equal(t, uint32(3), l.GetStartLedgerSequence())
require.Equal(t, uint32(3), uint32(l.Data.StartSequence))

l, ok, err = queue.Dequeue(context.Background())
require.NoError(t, err)
Expand Down
11 changes: 6 additions & 5 deletions exp/services/ledgerexporter/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
)
Expand Down Expand Up @@ -78,19 +79,19 @@ func (r *writerToRecorder) WriteTo(w io.Writer) (int64, error) {
}

// Upload uploads the serialized binary data of ledger TxMeta to the specified destination.
func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaArchive) error {
logger.Infof("Uploading: %s", metaArchive.GetObjectKey())
func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) error {
logger.Infof("Uploading: %s", metaArchive.ObjectKey)
startTime := time.Now()
numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10)
numLedgers := strconv.FormatUint(uint64(len(metaArchive.Data.LedgerCloseMetas)), 10)

xdrEncoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, &metaArchive.Data)

writerTo := &writerToRecorder{
WriterTo: xdrEncoder,
}
ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), writerTo)
ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.ObjectKey, writerTo)
if err != nil {
return errors.Wrapf(err, "error uploading %s", metaArchive.GetObjectKey())
return errors.Wrapf(err, "error uploading %s", metaArchive.ObjectKey)
}
alreadyExists := strconv.FormatBool(!ok)

Expand Down
14 changes: 7 additions & 7 deletions exp/services/ledgerexporter/internal/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func (s *UploaderSuite) TestUpload() {

func (s *UploaderSuite) testUpload(putOkReturnVal bool) {
key, start, end := "test-1-100", uint32(1), uint32(100)
archive := datastore.NewLedgerMetaArchive(key, start, end)
archive := NewLedgerMetaArchive(key, start, end)
for i := start; i <= end; i++ {
_ = archive.AddLedger(datastore.CreateLedgerCloseMeta(i))
_ = archive.Data.AddLedger(createLedgerCloseMeta(i))
}

var capturedBuf bytes.Buffer
Expand All @@ -61,7 +61,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {
require.NoError(s.T(), dataUploader.Upload(context.Background(), archive))

expectedCompressedLength := capturedBuf.Len()
var decodedArchive datastore.LedgerMetaArchive
var decodedArchive LedgerMetaArchive
xdrDecoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &decodedArchive.Data)

decoder := xdrDecoder
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *UploaderSuite) TestUploadPutError() {

func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) {
key, start, end := "test-1-100", uint32(1), uint32(100)
archive := datastore.NewLedgerMetaArchive(key, start, end)
archive := NewLedgerMetaArchive(key, start, end)

s.mockDataStore.On("PutFileIfNotExists", context.Background(), key,
mock.Anything).Return(putOkReturnVal, errors.New("error in PutFileIfNotExists"))
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *UploaderSuite) TestRunChannelClose() {
go func() {
key, start, end := "test", uint32(1), uint32(100)
for i := start; i <= end; i++ {
s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive(key, i, i)))
s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive(key, i, i)))
}
<-time.After(time.Second * 2)
queue.Close()
Expand All @@ -225,7 +225,7 @@ func (s *UploaderSuite) TestRunContextCancel() {
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)

s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive("test", 1, 1)))
s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1)))

go func() {
<-time.After(time.Second * 2)
Expand All @@ -240,7 +240,7 @@ func (s *UploaderSuite) TestRunUploadError() {
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)

s.Assert().NoError(queue.Enqueue(s.ctx, datastore.NewLedgerMetaArchive("test", 1, 1)))
s.Assert().NoError(queue.Enqueue(s.ctx, NewLedgerMetaArchive("test", 1, 1)))
s.mockDataStore.On("PutFileIfNotExists", mock.Anything, "test",
mock.Anything).Return(false, errors.New("Put error"))

Expand Down
29 changes: 13 additions & 16 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type BufferedStorageBackend struct {
// ledgerBuffer is the buffer for LedgerCloseMeta data read in parallel.
ledgerBuffer *ledgerBuffer

dataStore datastore.DataStore
prepared *Range // Non-nil if any range is prepared
closed bool // False until the core is closed
ledgerMetaArchive *datastore.LedgerMetaArchive
nextLedger uint32
lastLedger uint32
dataStore datastore.DataStore
prepared *Range // Non-nil if any range is prepared
closed bool // False until the core is closed
lcmBatch xdr.LedgerCloseMetaBatch
nextLedger uint32
lastLedger uint32
}

// NewBufferedStorageBackend returns a new BufferedStorageBackend instance.
Expand All @@ -62,12 +62,9 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("ledgersPerFile must be > 0")
}

ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
ledgerMetaArchive: ledgerMetaArchive,
config: config,
dataStore: config.DataStore,
}

return bsBackend, nil
Expand Down Expand Up @@ -98,19 +95,19 @@ func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context)
// Otherwise will continuously load in the next LedgerCloseMetaBatch until found.
func (bsb *BufferedStorageBackend) getBatchForSequence(ctx context.Context, sequence uint32) error {
// Sequence inside the current cached LedgerCloseMetaBatch
if sequence >= bsb.ledgerMetaArchive.GetStartLedgerSequence() && sequence <= bsb.ledgerMetaArchive.GetEndLedgerSequence() {
if sequence >= uint32(bsb.lcmBatch.StartSequence) && sequence <= uint32(bsb.lcmBatch.EndSequence) {
return nil
}

// Sequence is before the current LedgerCloseMetaBatch
// Does not support retrieving LedgerCloseMeta before the current cached batch
if sequence < bsb.ledgerMetaArchive.GetStartLedgerSequence() {
return errors.New("requested sequence preceeds current LedgerCloseMetaBatch")
if sequence < uint32(bsb.lcmBatch.StartSequence) {
return errors.New("requested sequence precedes current LedgerCloseMetaBatch")
}

// Sequence is beyond the current LedgerCloseMetaBatch
var err error
bsb.ledgerMetaArchive.Data, err = bsb.ledgerBuffer.getFromLedgerQueue(ctx)
bsb.lcmBatch, err = bsb.ledgerBuffer.getFromLedgerQueue(ctx)
if err != nil {
return errors.Wrap(err, "failed getting next ledger batch from queue")
}
Expand Down Expand Up @@ -165,7 +162,7 @@ func (bsb *BufferedStorageBackend) GetLedger(ctx context.Context, sequence uint3
return xdr.LedgerCloseMeta{}, err
}

ledgerCloseMeta, err := bsb.ledgerMetaArchive.GetLedger(sequence)
ledgerCloseMeta, err := bsb.lcmBatch.GetLedger(sequence)
if err != nil {
return xdr.LedgerCloseMeta{}, err
}
Expand Down
Loading

0 comments on commit ae1019c

Please sign in to comment.