Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Sep 2, 2022
1 parent 2472e89 commit 934faf3
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 67 deletions.
2 changes: 1 addition & 1 deletion exp/lighthorizon/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func lightHorizonHTTPHandler(registry *prometheus.Registry, lightHorizon service
r.MethodFunc(http.MethodGet, "/operations", actions.NewOpsByAccountHandler(lightHorizon))
})

router.MethodFunc(http.MethodGet, "/", actions.ApiDocs())
router.MethodFunc(http.MethodGet, "/api", actions.ApiDocs())
router.Method(http.MethodGet, "/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))

problem.RegisterHost("")
Expand Down
47 changes: 2 additions & 45 deletions exp/lighthorizon/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand All @@ -30,43 +25,8 @@ type liteIngester struct {
networkPassphrase string
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
return nil
}

func (i *liteIngester) NewLedgerTransactionReader(
Expand All @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader(
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
i.networkPassphrase,
ledgerCloseMeta.MustV0())
if err != nil {
return nil, err
}

return &liteLedgerTransactionReader{reader}, err
}
Expand Down
56 changes: 56 additions & 0 deletions exp/lighthorizon/ingester/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -15,6 +23,7 @@ import (
type Ingester interface {
metaarchive.MetaArchive

PrepareRange(ctx context.Context, r historyarchive.Range) error
NewLedgerTransactionReader(
ledgerCloseMeta xdr.SerializedLedgerCloseMeta,
) (LedgerTransactionReader, error)
Expand All @@ -29,3 +38,50 @@ type LedgerTransaction struct {
type LedgerTransactionReader interface {
Read() (LedgerTransaction, error)
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

if config.ParallelDownloads > 1 {
log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads)
return NewParallelIngester(
source,
config.NetworkPassphrase,
config.ParallelDownloads), nil
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
}
7 changes: 4 additions & 3 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ const (
)

func main() {
log.SetLevel(logrus.InfoLevel) // default for subcommands

cmd := &cobra.Command{
Use: "lighthorizon <subcommand>",
Short: "Horizon Lite suite",
Long: "Horizon Lite suite",
Use: "lighthorizon <subcommand>",
Long: "Horizon Lite command suite",
RunE: func(cmd *cobra.Command, args []string) error {
return cmd.Usage()
},
Expand Down
30 changes: 23 additions & 7 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context,
Infof("Fulfilled request for account %s at cursor %d", accountId, cursor)
}()

checkpointMgr := historyarchive.NewCheckpointManager(0)

for {
if checkpointMgr.IsCheckpoint(nextLedger) {
r := historyarchive.Range{
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
}
}

start := time.Now()
ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
}
count++
thisFetchDuration := time.Since(start)
if thisFetchDuration > slowFetchDurationThreshold {
log.WithField("duration", thisFetchDuration).
Expand All @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context,
fetchDuration += thisFetchDuration

start = time.Now()
reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if readerErr != nil {
return readerErr
reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to read ledger %d", nextLedger)
}

for {
Expand Down
4 changes: 2 additions & 2 deletions exp/lighthorizon/tools/cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package tools

import (
"context"
Expand All @@ -21,7 +21,7 @@ const (
defaultCacheCount = (60 * 60 * 24) / 5 // ~24hrs worth of ledgers
)

func addCacheCommands(parent *cobra.Command) *cobra.Command {
func AddCacheCommands(parent *cobra.Command) *cobra.Command {
cmd := &cobra.Command{
Use: "cache",
Long: "Manages the on-disk cache of ledgers.",
Expand Down
4 changes: 2 additions & 2 deletions exp/lighthorizon/tools/explorer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package tools

import (
"context"
Expand All @@ -24,7 +24,7 @@ var (
checkpointMgr = historyarchive.NewCheckpointManager(0)
)

func addIndexCommands(parent *cobra.Command) *cobra.Command {
func AddIndexCommands(parent *cobra.Command) *cobra.Command {
cmd := &cobra.Command{
Use: "index",
Long: "Lets you view details about an index source.",
Expand Down
6 changes: 3 additions & 3 deletions exp/lighthorizon/tools/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package tools

import (
"github.com/spf13/cobra"
Expand All @@ -19,7 +19,7 @@ func main() {
},
}

cmd = addCacheCommands(cmd)
cmd = addIndexCommands(cmd)
cmd = AddCacheCommands(cmd)
cmd = AddIndexCommands(cmd)
cmd.Execute()
}
6 changes: 5 additions & 1 deletion support/collections/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
)

func TestSet(t *testing.T) {
s := Set[string]{}
s := NewSet[string](10)
s.Add("sanity")
require.True(t, s.Contains("sanity"))
require.False(t, s.Contains("check"))

s.AddSlice([]string{"a", "b", "c"})
require.True(t, s.Contains("b"))
require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice())
}
14 changes: 11 additions & 3 deletions support/storage/ondisk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/stellar/go/support/log"
)

// OnDiskCache fronts another storage with a local filesystem cache
// OnDiskCache fronts another storage with a local filesystem cache. Its
// thread-safe, meaning you can be actively caching a file and retrieve it at
// the same time without corruption, because retrieval will wait for the fetch.
type OnDiskCache struct {
Storage
dir string
Expand Down Expand Up @@ -85,6 +87,8 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) {
return remote, err
}

filepath += ".lock"

local, err = b.createLocal(filepath)
if err != nil {
// If there's some local FS error, we can still continue with the
Expand Down Expand Up @@ -219,12 +223,16 @@ func (t trc) Close() error {
return t.close()
}

func teeReadCloser(r io.ReadCloser, w io.WriteCloser) io.ReadCloser {
func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser {
return trc{
Reader: io.TeeReader(r, w),
close: func() error {
r.Close()
return w.Close()
err := w.Close()
if err != nil {
return err
}
return onClose()
},
}
}

0 comments on commit 934faf3

Please sign in to comment.