Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

historyarchive: Introduce a History Archive pool that's selected from for all calls #3402

Merged
merged 10 commits into from
Feb 16, 2021
97 changes: 97 additions & 0 deletions historyarchive/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2016 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need copyright comment on top of this file. At least we don't have it in most of other files. @ire-and-curses?

Suggested change
// Copyright 2016 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: copyright - let's just follow whatever we do everywhere else in the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so I was confused by this, too. In the historyarchive package, all of the files have copyright notices. The years are mixed, but all of them have one. So I added it here.

package historyarchive

import (
"math/rand"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// Type PooledArchive forwards all API calls to a random ArchiveInterface within
// its internal pool.
type PooledArchive struct {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
pool []ArchiveInterface
}

var _ ArchiveInterface = &PooledArchive{}

func CreatePool(archives ...ArchiveInterface) (*PooledArchive, error) {
if len(archives) <= 0 {
return nil, errors.New("No history archives provided")
}
return &PooledArchive{pool: archives}, nil
}

func (pa *PooledArchive) GetAnyArchive() ArchiveInterface {
return pa.pool[rand.Intn(len(pa.pool))]
}

// Below are the ArchiveInterface method implementations.

func (pa *PooledArchive) GetPathHAS(path string) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetPathHAS(path)
}

func (pa *PooledArchive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutPathHAS(path, has, opts)
}

func (pa *PooledArchive) BucketExists(bucket Hash) (bool, error) {
return pa.GetAnyArchive().BucketExists(bucket)
}

func (pa *PooledArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk)
}

func (pa *PooledArchive) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
return pa.GetAnyArchive().GetLedgerHeader(chk)
}

func (pa *PooledArchive) GetRootHAS() (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetRootHAS()
}

func (pa *PooledArchive) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetCheckpointHAS(chk)
}

func (pa *PooledArchive) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts)
}

func (pa *PooledArchive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutRootHAS(has, opts)
}

func (pa *PooledArchive) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.GetAnyArchive().ListBucket(dp)
}

func (pa *PooledArchive) ListAllBuckets() (chan string, chan error) {
return pa.GetAnyArchive().ListAllBuckets()
}

func (pa *PooledArchive) ListAllBucketHashes() (chan Hash, chan error) {
return pa.GetAnyArchive().ListAllBucketHashes()
}

func (pa *PooledArchive) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth)
}

func (pa *PooledArchive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStreamForHash(hash)
}

func (pa *PooledArchive) GetXdrStream(pth string) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStream(pth)
}

func (pa *PooledArchive) GetCheckpointManager() CheckpointManager {
return pa.GetAnyArchive().GetCheckpointManager()
}
34 changes: 24 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,35 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
var cancel context.CancelFunc
config.Context, cancel = context.WithCancel(parentCtx)

archive, err := historyarchive.Connect(
config.HistoryArchiveURLs[0],
historyarchive.ConnectOptions{
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
},
)
var validArchives []historyarchive.ArchiveInterface

for _, url := range config.HistoryArchiveURLs {
archive, err := historyarchive.Connect(
url,
historyarchive.ConnectOptions{
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
},
)

if err != nil {
config.Log.Warnf("Error connecting to history archive (%s): %s", url, err)
continue
}

validArchives = append(validArchives, archive)
}

pool, err := historyarchive.CreatePool(validArchives...)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error connecting to history archive")
config.Log.Error("Error connecting to ALL history archives.")
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

c := &CaptiveStellarCore{
archive: archive,
archive: pool,
ledgerHashStore: config.LedgerHashStore,
cancel: cancel,
checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency),
Expand Down