Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: Use buffered storage backend for reingest command #5374

Merged
merged 27 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ff28599
#4911: added new config interface for precomputed ledger backend
sreuland Jul 3, 2024
54dfcc3
#4911: fixed syntax error on backend type enum
sreuland Jul 5, 2024
5a020f3
#4911: added buffered storage ledger backend to NewSystem factory method
sreuland Jul 5, 2024
5d8d64b
services/horizon: Reingest from precomputed TxMeta (#5375)
urvisavla Jul 5, 2024
b37be69
#4911: add unit test coverage on buffered backend ingest system creat…
sreuland Jul 9, 2024
2053cdf
Merge remote-tracking branch 'upstream/master' into reingest_precomputed
sreuland Jul 9, 2024
7a267a6
#4911: fixed lexie integration tests to use datastore factory function
sreuland Jul 9, 2024
a873570
services/horizon: Skip captive-core configuration when reingesting fr…
urvisavla Jul 9, 2024
4491049
#4911: added --parallel-job-size=100 default when datastore backend u…
sreuland Jul 10, 2024
7234d6e
services/horizon: Add unit test for new --ledgerbackend flag (#5382)
urvisavla Jul 11, 2024
7dc9ffb
#4911: added test for reingest with --ledgerbackend datastore (#5383)
sreuland Jul 12, 2024
01fbf9c
#4911: updated changelog/readme (#5389)
sreuland Jul 15, 2024
efc8da9
Merge remote-tracking branch 'upstream/master' into reingest_precomputed
sreuland Jul 15, 2024
96284d0
4911: review feedback, remove nil check
sreuland Jul 16, 2024
8d543ba
#4911: use ledger backend enum names for command help, review feedback
sreuland Jul 16, 2024
d8110f9
#4911: use fakestoreage initial objects instead of seed path for fake…
sreuland Jul 16, 2024
bf28af6
#4911: fix go mod tidy
sreuland Jul 16, 2024
7f1fe64
Addressing review comment on --datastore-config parameter validation
urvisavla Jul 16, 2024
0f77ac6
Fix static check errors
urvisavla Jul 16, 2024
8c1adae
Remove unused parameter from datastore::GetSchema
urvisavla Jul 16, 2024
e8261f8
Remove unused context parameter
urvisavla Jul 16, 2024
08f3ab2
#4911: removed NewSystem unit test and related factory creation metho…
sreuland Jul 17, 2024
5793896
Merge remote-tracking branch 'upstream/master' into reingest_precomputed
sreuland Jul 17, 2024
026d41e
#4911: try refactor integration.NewTest to allow only web process and…
sreuland Jul 17, 2024
1642a1a
#4911: fix integeation test StartHorizon usage when intentional cmd e…
sreuland Jul 18, 2024
38f2b6d
#4911: fixed environment integration test of tests, needed to shutdow…
sreuland Jul 18, 2024
dee1424
#4911: revert some of prior change, need to keep the time.sleep on so…
sreuland Jul 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ledgerexporter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ jobs:
CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal
LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true"
LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
# this pins to a version of quickstart:testing that has the same version as LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
# this pins to a version of quickstart:testing that has the same version of core
# as specified on LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
# this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing'
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5c8186f53cc98571749054dd782dce33b0aca2d1a622a7610362f7c15b79b1bf
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
steps:
- name: Install captive core
Expand Down
17 changes: 11 additions & 6 deletions exp/services/ledgerexporter/internal/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// tests then refer to ledger sequences only up to this, therefore
// don't have to do complex waiting within test for a sequence to exist.
waitForCoreLedgerSequence = 16
configTemplate = "test/integration_config_template.toml"
)

func TestLedgerExporterTestSuite(t *testing.T) {
Expand All @@ -54,6 +55,7 @@ type LedgerExporterTestSuite struct {
dockerCli *client.Client
gcsServer *fakestorage.Server
finishedSetup bool
config Config
}

func (s *LedgerExporterTestSuite) TestScanAndFill() {
Expand All @@ -74,7 +76,7 @@ func (s *LedgerExporterTestSuite) TestScanAndFill() {
s.T().Log(output)
s.T().Log(errOutput)

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd")
Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *LedgerExporterTestSuite) TestAppend() {
s.T().Log(output)
s.T().Log(errOutput)

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd")
Expand Down Expand Up @@ -134,7 +136,7 @@ func (s *LedgerExporterTestSuite) TestAppendUnbounded() {
s.T().Log(errOutput)
}()

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

require.EventuallyWithT(func(c *assert.CollectT) {
Expand All @@ -158,9 +160,9 @@ func (s *LedgerExporterTestSuite) SetupSuite() {
}()
testTempDir := t.TempDir()

ledgerExporterConfigTemplate, err := toml.LoadFile("test/integration_config_template.toml")
ledgerExporterConfigTemplate, err := toml.LoadFile(configTemplate)
if err != nil {
t.Fatalf("unable to load config template file %v", err)
t.Fatalf("unable to load config template file %v, %v", configTemplate, err)
}

// if LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN not specified,
Expand All @@ -172,7 +174,10 @@ func (s *LedgerExporterTestSuite) SetupSuite() {

tomlBytes, err := toml.Marshal(ledgerExporterConfigTemplate)
if err != nil {
t.Fatalf("unable to load config file %v", err)
t.Fatalf("unable to parse config file toml %v, %v", configTemplate, err)
}
if err = toml.Unmarshal(tomlBytes, &s.config); err != nil {
t.Fatalf("unable to marshal config file toml into struct, %v", err)
}

tempSeedDataPath := filepath.Join(testTempDir, "data")
Expand Down
22 changes: 9 additions & 13 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
// Ensure BufferedStorageBackend implements LedgerBackend
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendFactory func(ctx context.Context, config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error)
sreuland marked this conversation as resolved.
Show resolved Hide resolved

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.DataStoreSchema
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
RetryLimit uint32
RetryWait time.Duration
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
Expand All @@ -45,7 +45,7 @@ type BufferedStorageBackend struct {
}

// NewBufferedStorageBackend returns a new BufferedStorageBackend instance.
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) {
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
if config.BufferSize == 0 {
return nil, errors.New("buffer size must be > 0")
}
Expand All @@ -54,17 +54,13 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("number of workers must be <= BufferSize")
}

if config.DataStore == nil {
return nil, errors.New("no DataStore provided")
}

if config.LedgerBatchConfig.LedgersPerFile <= 0 {
if dataStore.GetSchema().LedgersPerFile <= 0 {
return nil, errors.New("ledgersPerFile must be > 0")
}

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}

return bsBackend, nil
Expand Down
63 changes: 39 additions & 24 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
param := make(map[string]string)
param["destination_bucket_path"] = "testURL"

ledgerBatchConfig := datastore.DataStoreSchema{
LedgersPerFile: 1,
FilesPerPartition: 64000,
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
}
}

func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()

dataStore := new(datastore.MockDataStore)
return BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}
}

Expand All @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -128,13 +124,17 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
func TestNewBufferedStorageBackend(t *testing.T) {
ctx := context.Background()
config := createBufferedStorageBackendConfigForTesting()

bsb, err := NewBufferedStorageBackend(ctx, config)
mockDataStore := new(datastore.MockDataStore)
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(1),
FilesPerPartition: partitionSize,
})
bsb, err := NewBufferedStorageBackend(ctx, config, mockDataStore)
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, bsb.dataStore, mockDataStore)
assert.Equal(t, uint32(1), bsb.dataStore.GetSchema().LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema().FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
assert.Equal(t, uint32(5), bsb.config.NumWorkers)
assert.Equal(t, uint32(3), bsb.config.RetryLimit)
Expand Down Expand Up @@ -210,12 +210,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2)
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)

Expand Down Expand Up @@ -451,6 +453,10 @@ func TestLedgerBufferClose(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
Expand Down Expand Up @@ -483,7 +489,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
Expand All @@ -509,7 +518,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
Expand Down Expand Up @@ -551,7 +563,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
})

bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange))

bsb.ledgerBuffer.wg.Wait()
Expand Down
16 changes: 8 additions & 8 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
// but for easier conceptualization, len(taskQueue) can be interpreted as both pending and in-flight tasks
// where we assume the workers are empty and not processing any tasks.
for i := 0; i <= int(bsb.config.BufferSize); i++ {
ledgerBuffer.pushTaskQueue()
ledgerBuffer.pushTaskQueue(ctx)
}

return ledgerBuffer, nil
}

func (lb *ledgerBuffer) pushTaskQueue() {
func (lb *ledgerBuffer) pushTaskQueue(ctx context.Context) {
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
// In bounded mode, don't queue past the end ledger
if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.nextTaskLedger += lb.dataStore.GetSchema().LedgersPerFile
}

// sleepWithContext returns true upon sleeping without interruption from the context
Expand Down Expand Up @@ -155,15 +155,15 @@ func (lb *ledgerBuffer) worker(ctx context.Context) {
// Thus, the number of tasks decreases by 1 and the priority queue length increases by 1.
// This keeps the overall total the same (<= BufferSize). As long as the the ledger buffer invariant
// was maintained in the previous state, it is still maintained during this state transition.
lb.storeObject(ledgerObject, sequence)
lb.storeObject(ctx, ledgerObject, sequence)
break
}
}
}
}

func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) {
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
objectKey := lb.dataStore.GetSchema().GetObjectKeyFromSequenceNumber(sequence)

reader, err := lb.dataStore.GetFile(ctx, objectKey)
if err != nil {
Expand All @@ -180,7 +180,7 @@ func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint3
return objectBytes, nil
}

func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
func (lb *ledgerBuffer) storeObject(ctx context.Context, ledgerObject []byte, sequence uint32) {
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
lb.priorityQueueLock.Lock()
defer lb.priorityQueueLock.Unlock()

Expand All @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) {
item := lb.ledgerPriorityQueue.Pop()
lb.ledgerQueue <- item.payload
lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.currentLedger += lb.dataStore.GetSchema().LedgersPerFile
}
}

Expand All @@ -215,7 +215,7 @@ func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) (xdr.LedgerClose
// Thus len(ledgerQueue) decreases by 1 and the number of tasks increases by 1.
// The overall sum below remains the same:
// len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize
lb.pushTaskQueue()
lb.pushTaskQueue(ctx)

lcmBatch := xdr.LedgerCloseMetaBatch{}
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &lcmBatch)
Expand Down
11 changes: 11 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## Pending

### Added

- Reingest from pre-computed tx meta on remote cloud storage. ([4911](https://github.com/stellar/go/issues/4911)), ([5374](https://github.com/stellar/go/pull/5374))
- Configure horizon reingestion to obtain ledger tx meta in pre-computed files from a Google Cloud Storage(GCS) location.
- Using this option will no longer require a captive core binary be present and it no longer runs a captive core sub-process, instead obtaining the tx meta from the GCS backend.
- Horizon supports this new feature with two new parameters `ledgerbackend` and `datastore-config` on the `reingest` command. Refer to [Reingestion README](./internal/ingest/README.md#reingestion).



## 2.31.0

### Breaking Changes
Expand Down
Loading
Loading