Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historyarchive: Add round-robin, error-resilience, and back-off to the ArchivePool #5224

Merged
merged 21 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ require (
gopkg.in/tylerb/graceful.v1 v1.2.15
)

require golang.org/x/sync v0.6.0
require (
github.com/cenkalti/backoff/v4 v4.2.1
golang.org/x/sync v0.6.0
)

require (
cloud.google.com/go/compute v1.23.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENU
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/buger/goreplay v1.3.2 h1:MFAStZZCsHMPeN5xJ11rhUtV4ZctFRgzSHTfWSWOJsg=
github.com/buger/goreplay v1.3.2/go.mod h1:EyAKHxJR6K6phd0NaoPETSDbJRB/ogIw3Y15UlSbVBM=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
Expand Down
236 changes: 181 additions & 55 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,72 @@
package historyarchive

import (
"context"
"math/rand"
"time"

"github.com/stellar/go/support/errors"
"github.com/pkg/errors"
log "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

backoff "github.com/cenkalti/backoff/v4"
)

// A ArchivePool is just a collection of `ArchiveInterface`s so that we can
// An ArchivePool is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type ArchivePool []ArchiveInterface
type ArchivePool struct {
backoff backoff.BackOff
pool []ArchiveInterface
curr int
}

// NewArchivePool tries connecting to each of the provided history archive URLs,
// returning a pool of valid archives.
//
// If none of the archives work, this returns the error message of the last
// failed archive. Note that the errors for each individual archive are hard to
// track if there's success overall.
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchivePool, error) {
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchiveInterface, error) {
return NewArchivePoolWithBackoff(
archiveURLs,
opts,
backoff.WithMaxRetries(backoff.NewConstantBackOff(250*time.Millisecond), 3),
)
}

func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strategy backoff.BackOff) (ArchiveInterface, error) {
if len(archiveURLs) <= 0 {
return nil, errors.New("No history archives provided")
}

var lastErr error = nil
ap := ArchivePool{
pool: make([]ArchiveInterface, 0, len(archiveURLs)),
backoff: strategy,
}
var lastErr error

// Try connecting to all of the listed archives, but only store valid ones.
var validArchives ArchivePool
for _, url := range archiveURLs {
archive, err := Connect(
url,
opts,
)

archive, err := Connect(url, opts)
if err != nil {
lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url)
continue
}

validArchives = append(validArchives, archive)
ap.pool = append(ap.pool, archive)
}

if len(validArchives) == 0 {
if len(ap.pool) == 0 {
return nil, lastErr
}

return validArchives, nil
ap.curr = rand.Intn(len(ap.pool)) // don't necessarily start at zero
return &ap, nil
}

func (pa ArchivePool) GetStats() []ArchiveStats {
func (pa *ArchivePool) GetStats() []ArchiveStats {
stats := []ArchiveStats{}
for _, archive := range pa {
for _, archive := range pa.pool {
stats = append(stats, archive.GetStats()...)
}
return stats
Expand All @@ -62,80 +79,189 @@ func (pa ArchivePool) GetStats() []ArchiveStats {
// Ensure the pool conforms to the ArchiveInterface
var _ ArchiveInterface = &ArchivePool{}

// Below are the ArchiveInterface method implementations.
//
// These are helpers to round-robin calls through archives.
//

func (pa ArchivePool) GetAnyArchive() ArchiveInterface {
return pa[rand.Intn(len(pa))]
// getNextArchive statefully round-robins through the pool
func (pa *ArchivePool) getNextArchive() ArchiveInterface {
// Round-robin through the archives
pa.curr = (pa.curr + 1) % len(pa.pool)
return pa.pool[pa.curr]
}

func (pa ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetPathHAS(path)
// runRoundRobin is a helper method that will run a particular action on every
// archive in the pool until it succeeds or the pool is exhausted (whichever
// comes first), repeating with a constant 500ms backoff.
func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) error {
return backoff.Retry(func() error {
var err error
ai := pa.getNextArchive()
if err = runner(ai); err == nil {
return nil
}

if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return backoff.Permanent(err)
}

// Intentionally avoid logging context errors
if stats := ai.GetStats(); len(stats) > 0 {
log.WithField("error", err).Warnf(
"Encountered an error with archive '%s'",
stats[0].GetBackendName())
}

return err
}, pa.backoff)
}

func (pa ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutPathHAS(path, has, opts)
//
// Below are the ArchiveInterface method implementations.
//

func (pa *ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
has := HistoryArchiveState{}
err := pa.runRoundRobin(func(ai ArchiveInterface) error {
var innerErr error
has, innerErr = ai.GetPathHAS(path)
return innerErr
})
return has, err
}

func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) {
return pa.GetAnyArchive().BucketExists(bucket)
func (pa *ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutPathHAS(path, has, opts)
})
}

func (pa ArchivePool) BucketSize(bucket Hash) (int64, error) {
return pa.GetAnyArchive().BucketSize(bucket)
func (pa *ArchivePool) BucketExists(bucket Hash) (bool, error) {
status := false
return status, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
status, err = ai.BucketExists(bucket)
return err
})
}

func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk)
func (pa *ArchivePool) BucketSize(bucket Hash) (int64, error) {
var bsize int64
return bsize, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
bsize, err = ai.BucketSize(bucket)
return err
})
}

func (pa ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
return pa.GetAnyArchive().GetLedgerHeader(chk)
func (pa *ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
var ok bool
return ok, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
ok, err = ai.CategoryCheckpointExists(cat, chk)
return err
})
}

func (pa ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetRootHAS()
func (pa *ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
var entry xdr.LedgerHeaderHistoryEntry
return entry, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
entry, err = ai.GetLedgerHeader(chk)
return err
})
}

func (pa ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
return pa.GetAnyArchive().GetLedgers(start, end)
func (pa *ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetRootHAS()
return err
})
}

func (pa ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetCheckpointHAS(chk)
func (pa *ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
var dict map[uint32]*Ledger

return dict, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
dict, err = ai.GetLedgers(start, end)
return err
})
}

func (pa ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts)
func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetCheckpointHAS(chk)
return err
})
}

func (pa ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutRootHAS(has, opts)
func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutCheckpointHAS(chk, has, opts)
})
}

func (pa ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.GetAnyArchive().ListBucket(dp)
func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutRootHAS(has, opts)
})
}

func (pa ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.GetAnyArchive().ListAllBuckets()
func (pa *ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
var c chan string
var err chan error
pa.runRoundRobin(func(ai ArchiveInterface) error {
c, err = ai.ListBucket(dp)
return <-err
// TODO: Does the above block in the absence of errors?? Otherwise, can do this:
//
// if errCount := drainErrors(err); errCount > 0 {
// return fmt.Errorf("encountered %d errors with ListBucket on %+v", errCount, dp)
// }
// return nil
})
return c, err
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Finish the below once it's clear what the better channel <-> error
// resolution strategy should be.

func (pa *ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.getNextArchive().ListAllBuckets()
}

func (pa ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.GetAnyArchive().ListAllBucketHashes()
func (pa *ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.getNextArchive().ListAllBucketHashes()
}

func (pa ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth)
func (pa *ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.getNextArchive().ListCategoryCheckpoints(cat, pth)
}

func (pa ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStreamForHash(hash)
func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStreamForHash(hash)
return err
})
}

func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStream(pth)
func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStream(pth)
return err
})
}

func (pa ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.GetAnyArchive().GetCheckpointManager()
func (pa *ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.getNextArchive().GetCheckpointManager()
}
Loading
Loading