Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historyarchive: Add round-robin, error-resilience, and back-off to the ArchivePool #5224

Merged
merged 21 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ require (
gopkg.in/tylerb/graceful.v1 v1.2.15
)

require golang.org/x/sync v0.6.0
require (
github.com/cenkalti/backoff/v4 v4.2.1
golang.org/x/sync v0.6.0
)

require (
cloud.google.com/go/compute v1.23.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENU
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/buger/goreplay v1.3.2 h1:MFAStZZCsHMPeN5xJ11rhUtV4ZctFRgzSHTfWSWOJsg=
github.com/buger/goreplay v1.3.2/go.mod h1:EyAKHxJR6K6phd0NaoPETSDbJRB/ogIw3Y15UlSbVBM=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
Expand Down
225 changes: 170 additions & 55 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,72 @@
package historyarchive

import (
"context"
"math/rand"
"time"

"github.com/stellar/go/support/errors"
"github.com/pkg/errors"
log "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

backoff "github.com/cenkalti/backoff/v4"
)

// A ArchivePool is just a collection of `ArchiveInterface`s so that we can
// An ArchivePool is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type ArchivePool []ArchiveInterface
type ArchivePool struct {
backoff backoff.BackOff
pool []ArchiveInterface
curr int
}

// NewArchivePool tries connecting to each of the provided history archive URLs,
// returning a pool of valid archives.
//
// If none of the archives work, this returns the error message of the last
// failed archive. Note that the errors for each individual archive are hard to
// track if there's success overall.
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchivePool, error) {
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchiveInterface, error) {
return NewArchivePoolWithBackoff(
archiveURLs,
opts,
backoff.WithMaxRetries(backoff.NewConstantBackOff(250*time.Millisecond), 3),
)
}

func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strategy backoff.BackOff) (ArchiveInterface, error) {
if len(archiveURLs) <= 0 {
return nil, errors.New("No history archives provided")
}

var lastErr error = nil
ap := ArchivePool{
pool: make([]ArchiveInterface, 0, len(archiveURLs)),
backoff: strategy,
}
var lastErr error

// Try connecting to all of the listed archives, but only store valid ones.
var validArchives ArchivePool
for _, url := range archiveURLs {
archive, err := Connect(
url,
opts,
)

archive, err := Connect(url, opts)
if err != nil {
lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url)
continue
}

validArchives = append(validArchives, archive)
ap.pool = append(ap.pool, archive)
}

if len(validArchives) == 0 {
if len(ap.pool) == 0 {
return nil, lastErr
}

return validArchives, nil
ap.curr = rand.Intn(len(ap.pool)) // don't necessarily start at zero
return &ap, nil
}

func (pa ArchivePool) GetStats() []ArchiveStats {
func (pa *ArchivePool) GetStats() []ArchiveStats {
stats := []ArchiveStats{}
for _, archive := range pa {
for _, archive := range pa.pool {
stats = append(stats, archive.GetStats()...)
}
return stats
Expand All @@ -62,80 +79,178 @@ func (pa ArchivePool) GetStats() []ArchiveStats {
// Ensure the pool conforms to the ArchiveInterface
var _ ArchiveInterface = &ArchivePool{}

// Below are the ArchiveInterface method implementations.
//
// These are helpers to round-robin calls through archives.
//

func (pa ArchivePool) GetAnyArchive() ArchiveInterface {
return pa[rand.Intn(len(pa))]
// getNextArchive statefully round-robins through the pool
func (pa *ArchivePool) getNextArchive() ArchiveInterface {
// Round-robin through the archives
pa.curr = (pa.curr + 1) % len(pa.pool)
return pa.pool[pa.curr]
}

func (pa ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetPathHAS(path)
// runRoundRobin is a helper method that will run a particular action on every
// archive in the pool until it succeeds or the pool is exhausted (whichever
// comes first), repeating with a constant 500ms backoff.
func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) error {
return backoff.Retry(func() error {
var err error
ai := pa.getNextArchive()
if err = runner(ai); err == nil {
return nil
}

if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return backoff.Permanent(err)
}

// Intentionally avoid logging context errors
if stats := ai.GetStats(); len(stats) > 0 {
log.WithField("error", err).Warnf(
"Encountered an error with archive '%s'",
stats[0].GetBackendName())
}

return err
}, pa.backoff)
}

//
// Below are the ArchiveInterface method implementations.
//

func (pa *ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
has := HistoryArchiveState{}
err := pa.runRoundRobin(func(ai ArchiveInterface) error {
var innerErr error
has, innerErr = ai.GetPathHAS(path)
return innerErr
})
return has, err
}

func (pa ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutPathHAS(path, has, opts)
func (pa *ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutPathHAS(path, has, opts)
})
}

func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) {
return pa.GetAnyArchive().BucketExists(bucket)
func (pa *ArchivePool) BucketExists(bucket Hash) (bool, error) {
status := false
return status, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
status, err = ai.BucketExists(bucket)
return err
})
}

func (pa ArchivePool) BucketSize(bucket Hash) (int64, error) {
return pa.GetAnyArchive().BucketSize(bucket)
func (pa *ArchivePool) BucketSize(bucket Hash) (int64, error) {
var bsize int64
return bsize, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
bsize, err = ai.BucketSize(bucket)
return err
})
}

func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk)
func (pa *ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
var ok bool
return ok, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
ok, err = ai.CategoryCheckpointExists(cat, chk)
return err
})
}

func (pa ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
return pa.GetAnyArchive().GetLedgerHeader(chk)
func (pa *ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
var entry xdr.LedgerHeaderHistoryEntry
return entry, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
entry, err = ai.GetLedgerHeader(chk)
return err
})
}

func (pa ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetRootHAS()
func (pa *ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetRootHAS()
return err
})
}

func (pa ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
return pa.GetAnyArchive().GetLedgers(start, end)
func (pa *ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
var dict map[uint32]*Ledger

return dict, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
dict, err = ai.GetLedgers(start, end)
return err
})
}

func (pa ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetCheckpointHAS(chk)
func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetCheckpointHAS(chk)
return err
})
}

func (pa ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts)
func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutCheckpointHAS(chk, has, opts)
})
}

func (pa ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutRootHAS(has, opts)
func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutRootHAS(has, opts)
})
}

func (pa ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.GetAnyArchive().ListBucket(dp)
func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStreamForHash(hash)
return err
})
}

func (pa ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.GetAnyArchive().ListAllBuckets()
func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStream(pth)
return err
})
}

func (pa ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.GetAnyArchive().ListAllBucketHashes()
func (pa *ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.getNextArchive().GetCheckpointManager()
}

func (pa ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth)
//
// The channel-based methods do not have automatic retries.
//

func (pa *ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.getNextArchive().ListBucket(dp)
}

func (pa ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStreamForHash(hash)
func (pa *ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.getNextArchive().ListAllBuckets()
}

func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStream(pth)
func (pa *ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.getNextArchive().ListAllBucketHashes()
}

func (pa ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.GetAnyArchive().GetCheckpointManager()
func (pa *ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.getNextArchive().ListCategoryCheckpoints(cat, pth)
}
Loading
Loading