Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Refactor archive interface and support parallel ledger downloads. #4548

Merged
merged 9 commits into from
Sep 15, 2022
47 changes: 2 additions & 45 deletions exp/lighthorizon/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand All @@ -30,43 +25,8 @@ type liteIngester struct {
networkPassphrase string
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
return nil
}

func (i *liteIngester) NewLedgerTransactionReader(
Expand All @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader(
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
i.networkPassphrase,
ledgerCloseMeta.MustV0())
if err != nil {
return nil, err
}

return &liteLedgerTransactionReader{reader}, err
}
Expand Down
56 changes: 56 additions & 0 deletions exp/lighthorizon/ingester/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -15,6 +23,7 @@ import (
type Ingester interface {
metaarchive.MetaArchive

PrepareRange(ctx context.Context, r historyarchive.Range) error
NewLedgerTransactionReader(
ledgerCloseMeta xdr.SerializedLedgerCloseMeta,
) (LedgerTransactionReader, error)
Expand All @@ -29,3 +38,50 @@ type LedgerTransaction struct {
type LedgerTransactionReader interface {
Read() (LedgerTransaction, error)
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

if config.ParallelDownloads > 1 {
log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads)
return NewParallelIngester(
source,
config.NetworkPassphrase,
config.ParallelDownloads), nil
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
}
151 changes: 151 additions & 0 deletions exp/lighthorizon/ingester/parallel_ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package ingester

import (
"context"
"sync"
"time"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

type parallelIngester struct {
liteIngester

ledgerFeedLock sync.RWMutex
ledgerFeed map[uint32]downloadState
ledgerQueue set.Set[uint32]

workQueue chan uint32
signalChan chan error
}

type downloadState struct {
ledger xdr.SerializedLedgerCloseMeta
err error
}

// NewParallelIngester creates an ingester on the given `ledgerSource` using the
// given `networkPassphrase` that can download ledgers in parallel via
// `workerCount` workers via `PrepareRange()`.
func NewParallelIngester(
ledgerSource storage.Storage,
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
networkPassphrase string,
workerCount uint,
) *parallelIngester {
self := &parallelIngester{
liteIngester: liteIngester{
MetaArchive: metaarchive.NewMetaArchive(ledgerSource),
networkPassphrase: networkPassphrase,
},
ledgerFeedLock: sync.RWMutex{},
ledgerFeed: make(map[uint32]downloadState, 64),
ledgerQueue: set.NewSet[uint32](64),
workQueue: make(chan uint32, workerCount),
signalChan: make(chan error),
}

// These are the workers that download & store ledgers in memory.
for j := uint(0); j < workerCount; j++ {
go func(jj uint) {
for ledgerSeq := range self.workQueue {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
txmeta, err := self.liteIngester.GetLedger(ctx, ledgerSeq)
cancel()

log.WithField("duration", time.Since(start)).
WithField("worker", jj).WithError(err).
Debugf("Downloaded ledger %d", ledgerSeq)

self.ledgerFeedLock.Lock()
self.ledgerFeed[ledgerSeq] = downloadState{txmeta, err}
self.ledgerFeedLock.Unlock()
self.signalChan <- err
}
}(j)
}

return self
}

// PrepareRange will create a set of parallel worker routines that feed ledgers
// to a channel in the order they're downloaded and store the results in an
// array. You can use this to download ledgers in parallel to fetching them
// individually via `GetLedger()`.
//
// Note: The passed in range `r` is inclusive of the boundaries.
func (i *parallelIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how's the test coverage at this, I can't tell if it's used in other tests or mocked out. Maybe at some point a parallel_ingester_test.go will be worthwhile to assert unit testing of an instance and PrepareRange/GetLedger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test coverage is pretty non-existent besides my manual testing, unfortunately. I definitely want to mock out a parallel ingestion simulation but I'm worried about the sprint time crunch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can circle back as tech debt, we should try to include the effort of writing tests in our point estimates during poker also as it's integral part of the feature, it's ok if the story doesn't close by eos due to test coverage, as it just reflects the accurate velocity of feature work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's on me for sure. I didn't factor in the challenge of testing the implementation during the poker (unfortunately it was also "in-progress" when we did poker so I had to point it myself)

// The taskmaster adds ledger sequence numbers to the work queue.
go func() {
start := time.Now()
defer func() {
log.WithField("duration", time.Since(start)).
WithError(ctx.Err()).
Infof("Download of ledger range: [%d, %d] (%d ledgers) complete",
r.Low, r.High, r.Size())
}()

for seq := r.Low; seq <= r.High; seq++ {
if ctx.Err() != nil {
log.Warnf("Cancelling remaining downloads ([%d, %d]): %v",
seq, r.High, ctx.Err())
break
}

// Adding this to the "set of ledgers being downloaded in parallel"
// means that if a GetLedger() request happens in this range but
// outside of the realm of processing, it can be prioritized by the
// normal, direct download.
i.ledgerQueue.Add(seq)

i.workQueue <- seq // blocks until there's an available worker

// We don't remove from the queue here, preferring to remove when
// it's actually pulled from the worker. Removing here would mean
// you could have multiple instances of a ledger download happening.
}
}()

return nil
}

func (i *parallelIngester) GetLedger(
ctx context.Context, ledgerSeq uint32,
) (xdr.SerializedLedgerCloseMeta, error) {
// If the requested ledger is out of the queued up ranges, we can fall back
// to the default non-parallel download method.
if !i.ledgerQueue.Contains(ledgerSeq) {
return i.liteIngester.GetLedger(ctx, ledgerSeq)
}

// If the ledger isn't available yet, wait for the download worker.
var err error
for err == nil {
i.ledgerFeedLock.RLock()
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if state, ok := i.ledgerFeed[ledgerSeq]; ok {
i.ledgerFeedLock.RUnlock() // re-lock as a writer
Copy link
Contributor

@sreuland sreuland Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to prune the feed entries per seqNum, what if for simplification, remove the write locking escalation from read lock, and just return the found feed state, and this just gets gc'd once when caller is done and its parallelIngester falls out of scope, could init the queue to empty at top of PrepareRange() to support caller doing multiple ranges on same instance of ParallelIngester?

This could also prevent caller from doing a re-entrant call to GetLedger with same ledgerSeq not falling into non-parallel ln 123

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There already is support for multiple ranges! They will all get added to the queue. The only caveat is that each ledger will only be accessible one time (noteworthy in the case of overlapping ranges).

As for GC, isn't it the case that the parallelIngester will exist essentially throughout the duration of the program? It gets created once and passed around by reference everywhere. This would mean that the ledgerFeed would continue to grow and grow, essentially acting as an in-memory cache on top of the on-disk cache on top of the S3 ledger source. I'm not opposed to it necessarily (maybe we can actually use an LRU cache of uint32 -> downloadState instead of a map[uint32]downloadState), but I want to make sure that such a design doesn't just eventually OOM because I don't think the GC will ever kick in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm wondering if you think it's worthwhile for simplification, to have instance of parallelIngester and queue/feed state scoped to per invocation of PrepareRange() such as a closure rather than as singleton (same instance per lifetime of program)? Ideally it seems this could avoid additional locking around shared state like here where wouldn't need to do the lock escalation from read up to write to purge, rather we know this queue instance will get gc'd as a whole when PrepareRange() invocation ends and any vars in it's closure lose reference.

Would that also safely allow parallel invocations of PrepareRange(), since each isolates on its own queue/feed, and they all converge into the single LRU cache which has concurrency already right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you're getting at. The model right now is something like:

  • one parallelIngester is created on launch
  • request comes in
  • request gets processed with PrepareRange / GetLedger calls until its fulfilled
  • another request comes in
  • it uses the same instance to do PrepareRange / GetLedger calls
  • etc. for every request in parallel

You note that this probably will cause high contention for the workers and the ledger feed (sync.Map still has to handle concurrency, after all). You propose that each request get its own dedicated worker pool, ledger feed, etc. in order to minimize contention. I think that's a great point, but I have some follow-up concerns:

  1. Do you think each request getting --parallel-downloads workers dedicated to them will actually lead to too much saturation and churn on the network? This is my biggest concern. Imagine workerCount = 4 and we serve 100 requests per second. That's 400 concurrent downloads, open ports, etc. all to the same destination. Or should we keep the download worker pool global while the feeds/queues stay local as you suggest?
  2. What about requests that share ledgers? Actually, answering this myself tells me that the on-disk cache should kick in as soon as one of the workers downloads the ledger in full.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the concurrent web requests end up time slicing over x workers, there might be a point of inflection on web response times where it's fast in low traffic but would slow down in higher traffic. But, I would avoid trying perf tuning this any further and instead get the functionality out as-is first to tune around if needed later. Already have a good algorithm here with some nice locking optimizations, so, should provide good starting point, thanks for considering the idea and discussion, I don't want to hold up the merge, thanks!

i.ledgerFeedLock.Lock()
delete(i.ledgerFeed, ledgerSeq)
i.ledgerQueue.Remove(ledgerSeq)
i.ledgerFeedLock.Unlock()

return state.ledger, state.err
}
i.ledgerFeedLock.RUnlock()

select {
case err = <-i.signalChan: // blocks until another ledger downloads
case <-ctx.Done():
err = ctx.Err()
}
}

return xdr.SerializedLedgerCloseMeta{}, err
}

var _ Ingester = (*parallelIngester)(nil) // ensure conformity to the interface
4 changes: 4 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ break down accounts by active ledgers.`,
cacheDir, _ := cmd.Flags().GetString("ledger-cache")
cacheSize, _ := cmd.Flags().GetUint("ledger-cache-size")
logLevelParam, _ := cmd.Flags().GetString("log-level")
downloadCount, _ := cmd.Flags().GetUint("parallel-downloads")

L := log.WithField("service", "horizon-lite")
logLevel, err := logrus.ParseLevel(logLevelParam)
Expand All @@ -88,6 +89,7 @@ break down accounts by active ledgers.`,
NetworkPassphrase: networkPassphrase,
CacheDir: cacheDir,
CacheSize: int(cacheSize),
ParallelDownloads: downloadCount,
})
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -129,6 +131,8 @@ break down accounts by active ledgers.`,
"if left empty, uses a temporary directory")
serve.Flags().Uint("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
serve.Flags().Uint("parallel-downloads", 1,
"how many workers should download ledgers in parallel?")

cmd.AddCommand(serve)
tools.AddCacheCommands(cmd)
Expand Down
30 changes: 23 additions & 7 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context,
Infof("Fulfilled request for account %s at cursor %d", accountId, cursor)
}()

checkpointMgr := historyarchive.NewCheckpointManager(0)

for {
if checkpointMgr.IsCheckpoint(nextLedger) {
r := historyarchive.Range{
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
}
}

start := time.Now()
ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
}
count++
thisFetchDuration := time.Since(start)
if thisFetchDuration > slowFetchDurationThreshold {
log.WithField("duration", thisFetchDuration).
Expand All @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context,
fetchDuration += thisFetchDuration

start = time.Now()
reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if readerErr != nil {
return readerErr
reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to read ledger %d", nextLedger)
}

for {
Expand Down
6 changes: 5 additions & 1 deletion support/collections/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
)

func TestSet(t *testing.T) {
s := Set[string]{}
s := NewSet[string](10)
s.Add("sanity")
require.True(t, s.Contains("sanity"))
require.False(t, s.Contains("check"))

s.AddSlice([]string{"a", "b", "c"})
require.True(t, s.Contains("b"))
require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice())
}

func TestSafeSet(t *testing.T) {
Expand Down
Loading