Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Add basic scaffolding for metrics. #4456

Merged
merged 4 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions exp/lighthorizon/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func Transactions(archiveWrapper archive.Wrapper, indexStore index.Store) func(h

for _, txn := range txns {
var response hProtocol.Transaction
txn.NetworkPassphrase = archiveWrapper.Passphrase
response, err = adapters.PopulateTransaction(r, &txn)
if err != nil {
log.Error(err)
Expand Down
2 changes: 2 additions & 0 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, erro
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface
9 changes: 8 additions & 1 deletion exp/lighthorizon/common/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"encoding/hex"
"errors"

"github.com/stellar/go/network"
"github.com/stellar/go/toid"
Expand All @@ -13,10 +14,16 @@ type Transaction struct {
TransactionResult *xdr.TransactionResult
LedgerHeader *xdr.LedgerHeader
TxIndex int32

NetworkPassphrase string
}

func (o *Transaction) TransactionHash() (string, error) {
hash, err := network.HashTransactionInEnvelope(*o.TransactionEnvelope, network.PublicNetworkPassphrase)
if o.NetworkPassphrase == "" {
return "", errors.New("network passphrase unspecified")
}

hash, err := network.HashTransactionInEnvelope(*o.TransactionEnvelope, o.NetworkPassphrase)
if err != nil {
return "", err
}
Expand Down
41 changes: 21 additions & 20 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ func BuildIndices(
modules []string,
workerCount int,
) (*IndexBuilder, error) {
L := log.Ctx(ctx)
L := log.Ctx(ctx).WithField("service", "builder")

indexStore, err := Connect(targetUrl)
indexStore, err := ConnectWithConfig(StoreConfig{
Url: targetUrl,
Workers: uint32(workerCount),
Log: L.WithField("subservice", "index"),
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,8 +103,8 @@ func BuildIndices(
checkpoints := historyarchive.NewCheckpointManager(0)
for ledger := range ledgerRange.GenerateCheckpoints(checkpoints) {
chunk := checkpoints.GetCheckpointRange(ledger)
chunk.High = min(chunk.High, ledgerRange.High)
chunk.Low = max(chunk.Low, ledgerRange.Low)
chunk.High = min(chunk.High, ledgerRange.High) // don't exceed upper bound
chunk.Low = max(chunk.Low, ledgerRange.Low) // nor the lower bound

ch <- chunk
}
Expand All @@ -117,11 +121,15 @@ func BuildIndices(
ledgerRange.Low, ledgerRange.High, count)

if err := indexBuilder.Build(ctx, ledgerRange); err != nil {
return errors.Wrap(err, "building indices failed")
return errors.Wrapf(err,
"building indices for ledger range [%d, %d] failed",
ledgerRange.Low, ledgerRange.High)
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
if nprocessed%19 == 0 {
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
}

// Upload indices once per checkpoint to save memory
if err := indexStore.Flush(); err != nil {
Expand All @@ -136,19 +144,19 @@ func BuildIndices(
return indexBuilder, errors.Wrap(err, "one or more workers failed")
}

printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime)

// Assertion for testing
if processed != uint64(ledgerCount) {
L.Fatalf("processed %d but expected %d", processed, ledgerCount)
}
printProgress("Reading ledgers", processed, uint64(ledgerCount), startTime)

L.Infof("Processed %d ledgers via %d workers", processed, parallel)
L.Infof("Uploading indices to %s", targetUrl)
if err := indexStore.Flush(); err != nil {
return indexBuilder, errors.Wrap(err, "flushing indices failed")
}

// Assertion for testing
if processed != uint64(ledgerCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these counts meaningful to the caller, i.e. rather than pass judgement in here, let the caller get them and choose severity, etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah as comment notes this was mostly for debugging while I tested the builder. left it in there as a warning because they should always be equal, but we can drop them later or turn it into an error/return value! shouldn't differ while we continue on the MVP, though, so still a useful log

L.Warnf("processed %d but expected %d", processed, ledgerCount)
}

return indexBuilder, nil
}

Expand Down Expand Up @@ -214,7 +222,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
ledger, err := builder.ledgerBackend.GetLedger(ctx, ledgerSeq)
if err != nil {
if !os.IsNotExist(err) {
log.WithField("error", err).Errorf("error getting ledger %d", ledgerSeq)
log.Errorf("error getting ledger %d: %v", ledgerSeq, err)
}
return err
}
Expand Down Expand Up @@ -313,13 +321,6 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
}

func printProgress(prefix string, done, total uint64, startTime time.Time) {
// This should never happen, more of a runtime assertion for now.
// We can remove it when production-ready.
if done > total {
panic(fmt.Errorf("error for %s: done > total (%d > %d)",
prefix, done, total))
}

progress := float64(done) / float64(total)
elapsed := time.Since(startTime)

Expand Down
30 changes: 18 additions & 12 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,47 @@ import (
)

func Connect(backendUrl string) (Store, error) {
parsed, err := url.Parse(backendUrl)
return ConnectWithConfig(StoreConfig{Url: backendUrl})
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
parsed, err := url.Parse(config.Url)
if err != nil {
return nil, err
}
switch parsed.Scheme {
case "s3":
config := &aws.Config{}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
config.Region = aws.String(region)
awsConfig.Region = aws.String(region)
}

return NewS3Store(config, parsed.Path, 20)
config.Url = parsed.Path
return NewS3Store(awsConfig, config)

case "file":
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), 20)
config.Url = filepath.Join(parsed.Host, parsed.Path)
return NewFileStore(config)

default:
return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)",
parsed.Scheme, backendUrl)
parsed.Scheme, config.Url)
}
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := backend.NewFileBackend(dir, parallel)
func NewFileStore(config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(config.Url, config.Workers)
if err != nil {
return nil, err
}
return NewStore(backend)
return NewStore(backend, config)
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, prefix, parallel)
func NewS3Store(awsConfig *aws.Config, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, indexConfig.Url, indexConfig.Workers)
if err != nil {
return nil, err
}
return NewStore(backend)
return NewStore(backend, indexConfig)
}
Loading