-
Notifications
You must be signed in to change notification settings - Fork 502
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Actually add & use parallel downloads, preparing checkpoint chunks
- Loading branch information
Showing
7 changed files
with
253 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package ingester | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/stellar/go/historyarchive" | ||
"github.com/stellar/go/metaarchive" | ||
"github.com/stellar/go/support/collections/set" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/support/storage" | ||
"github.com/stellar/go/xdr" | ||
) | ||
|
||
type parallelIngester struct { | ||
liteIngester | ||
workerCount uint | ||
|
||
ledgerFeedLock sync.RWMutex | ||
ledgerFeed map[uint32]downloadState | ||
ledgerQueue set.Set[uint32] | ||
|
||
workQueue chan uint32 | ||
signalChan chan error | ||
} | ||
|
||
type downloadState struct { | ||
ledger xdr.SerializedLedgerCloseMeta | ||
err error | ||
} | ||
|
||
// NewParallelIngester creates an ingester on the given `ledgerSource` using the | ||
// given `networkPassphrase` that can download ledgers in parallel via | ||
// `workerCount` workers. | ||
func NewParallelIngester(ledgerSource storage.Storage, networkPassphrase string, workerCount uint) *parallelIngester { | ||
self := ¶llelIngester{ | ||
liteIngester: liteIngester{ | ||
MetaArchive: metaarchive.NewMetaArchive(ledgerSource), | ||
networkPassphrase: networkPassphrase, | ||
}, | ||
ledgerFeedLock: sync.RWMutex{}, | ||
ledgerFeed: make(map[uint32]downloadState, 64), | ||
ledgerQueue: set.NewSet[uint32](64), | ||
workerCount: workerCount, | ||
workQueue: make(chan uint32, workerCount), | ||
signalChan: make(chan error), | ||
} | ||
|
||
// These are the workers that download & store ledgers in memory. | ||
for j := uint(0); j < workerCount; j++ { | ||
go func() { | ||
for ledgerSeq := range self.workQueue { | ||
start := time.Now() | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
txmeta, err := self.liteIngester.GetLedger(ctx, ledgerSeq) | ||
cancel() | ||
|
||
log.WithField("duration", time.Since(start)).WithError(err). | ||
Debugf("Downloaded ledger %d", ledgerSeq) | ||
|
||
self.ledgerFeedLock.Lock() | ||
self.ledgerFeed[ledgerSeq] = downloadState{txmeta, err} | ||
self.ledgerFeedLock.Unlock() | ||
self.signalChan <- err | ||
} | ||
}() | ||
} | ||
|
||
return self | ||
} | ||
|
||
// PrepareRange will create a set of parallel worker routines that feed ledgers | ||
// to a channel in the order they're downloaded and store the results in an | ||
// array. You can use this to download ledgers in parallel to fetching them | ||
// individually via `GetLedger()`. | ||
// | ||
// Note: The passed in range `r` is inclusive of the boundaries. | ||
func (i *parallelIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { | ||
// The taskmaster adds ledger sequence numbers to the work queue. | ||
go func() { | ||
start := time.Now() | ||
defer func() { | ||
log.WithField("duration", time.Since(start)). | ||
WithField("workers", i.workerCount). | ||
WithError(ctx.Err()). | ||
Infof("Download of ledger range: [%d, %d] (%d ledgers) complete", | ||
r.Low, r.High, r.Size()) | ||
}() | ||
|
||
for seq := r.Low; seq <= r.High; seq++ { | ||
if ctx.Err() != nil { | ||
log.Warnf("Cancelling remaining downloads ([%d, %d]): %v", | ||
seq, r.High, ctx.Err()) | ||
break | ||
} | ||
|
||
// Adding this to the "set of ledgers being downloaded in parallel" | ||
// means that if a GetLedger() request happens in this range but | ||
// outside of the realm of processing, it can be prioritized by the | ||
// normal, direct download. | ||
i.ledgerQueue.Add(seq) | ||
|
||
i.workQueue <- seq // blocks until there's an available worker | ||
|
||
// Similarly to before, removing it immediately after a worker takes | ||
// it means that if someone requests the ledger again, it can be | ||
// direct-downloaded (or pulled from the cache) without waiting on | ||
// the busy workers. | ||
i.ledgerQueue.Remove(seq) | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
func (i *parallelIngester) GetLedger( | ||
ctx context.Context, ledgerSeq uint32, | ||
) (xdr.SerializedLedgerCloseMeta, error) { | ||
// If the requested ledger is out of the queued up ranges, we can fall back | ||
// to the default non-parallel download method. | ||
if !i.ledgerQueue.Contains(ledgerSeq) { | ||
return i.liteIngester.GetLedger(ctx, ledgerSeq) | ||
} | ||
|
||
// If the ledger isn't available yet, wait for the download worker. | ||
var err error | ||
for err == nil { | ||
i.ledgerFeedLock.RLock() | ||
if state, ok := i.ledgerFeed[ledgerSeq]; ok { | ||
i.ledgerFeedLock.RUnlock() // re-lock as a writer | ||
i.ledgerFeedLock.Lock() | ||
delete(i.ledgerFeed, ledgerSeq) | ||
i.ledgerFeedLock.Unlock() | ||
|
||
return state.ledger, state.err | ||
} | ||
i.ledgerFeedLock.RUnlock() | ||
|
||
select { | ||
case err = <-i.signalChan: // blocks until another ledger downloads | ||
case <-ctx.Done(): | ||
err = ctx.Err() | ||
} | ||
} | ||
|
||
return xdr.SerializedLedgerCloseMeta{}, err | ||
} | ||
|
||
var _ Ingester = (*parallelIngester)(nil) // ensure conformity to the interface |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.