Skip to content

Commit

Permalink
Merge branch 'lighthorizon' into lighthorizon_refactorArchive
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Sep 8, 2022
2 parents 934faf3 + 3c53909 commit ac90d1f
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 61 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ jobs:
- if: github.ref == 'refs/heads/master'
name: Push to DockerHub
run: docker push stellar/horizon-verify-range:latest

horizon-light:
name: Test and Push the horizon light images
runs-on: ubuntu-latest
Expand All @@ -141,18 +142,18 @@ jobs:
docker run -e ARCHIVE_TARGET=file:///ledgerexport\
-e START=5\
-e END=150\
-e NETWORK_PASSPHRASE="Test SDF Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-testnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-testnet/core_testnet_001,https://history.stellar.org/prd/core-testnet/core_testnet_002"\
-e NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-pubnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-live/core_live_001"\
-v $PWD/ledgerexport:/ledgerexport\
stellar/lighthorizon-ledgerexporter
# run map job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
docker run -e NETWORK_PASSPHRASE='pubnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
-e WORKER_COUNT=1 -e RUN_MODE=map -v $PWD/ledgerexport:/ledgermeta -e TXMETA_SOURCE=file:///ledgermeta -v $PWD/index:/index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch
# run reduce job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
docker run -e NETWORK_PASSPHRASE='pubnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
-e WORKER_COUNT=1 -e RUN_MODE=reduce -v $PWD/index:/index -e INDEX_SOURCE_ROOT=file:///index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch
# Push images
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/build/index-batch/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# `stellar/horizon-indexer`
# `stellar/lighthorizon-index-batch`

This docker image contains the ledger/checkpoint indexing executables. It allows running multiple instances of `map`/`reduce` on a single machine or running it in [AWS Batch](https://aws.amazon.com/batch/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ spec:
- name: RUN_MODE
value: "reduce"
- name: MAP_JOB_COUNT
value: 52
value: "52"
- name: REDUCE_JOB_COUNT
value: 52
value: "52"
- name: WORKER_COUNT
value: 8
value: "8"
- name: INDEX_SOURCE_ROOT
value: "<url of index location>"
- name: JOB_INDEX_ENV
Expand Down
1 change: 0 additions & 1 deletion exp/lighthorizon/build/k8s/lighthorizon_index.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spec:
fluxcd.io/ignore: "true"
prometheus.io/port: "6060"
prometheus.io/scrape: "false"
creationTimestamp: null
labels:
app: lighthorizon-pubnet-index
spec:
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSimpleFileStore(t *testing.T) {
// Create a large (beyond a single chunk) list of arbitrary accounts, some
// regular and some muxed.
accountList := make([]string, 123)
for i, _ := range accountList {
for i := range accountList {
var err error
var muxed xdr.MuxedAccount
address := keypair.MustRandom().Address()
Expand Down
39 changes: 24 additions & 15 deletions exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package main

import (
"encoding/hex"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strconv"
"strings"
"sync"

"github.com/stellar/go/exp/lighthorizon/index"
types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -42,7 +41,7 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) {
indexTargetEnv = "INDEX_TARGET"
)

jobIndexEnv := os.Getenv(jobIndexEnvName)
jobIndexEnv := strings.TrimSpace(os.Getenv(jobIndexEnvName))
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}
Expand Down Expand Up @@ -110,16 +109,19 @@ func main() {
}

func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
doneAccounts := NewSafeStringSet()
doneAccounts := set.NewSafeSet[string](512)
for i := uint32(0); i < config.MapJobCount; i++ {
jobLogger := log.WithField("job", i)

url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10))
jobLogger.Infof("Connecting to %s", url)
jobSubPath := "job_" + strconv.FormatUint(uint64(i), 10)
jobLogger.Infof("Connecting to url %s, sub-path %s", config.IndexRootSource, jobSubPath)
outerJobStore, err := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: jobSubPath,
})

outerJobStore, err := index.Connect(url)
if err != nil {
return errors.Wrapf(err, "failed to connect to indices at %s", url)
return errors.Wrapf(err, "failed to connect to indices at %s, sub-path %s", config.IndexRootSource, jobSubPath)
}

accounts, err := outerJobStore.ReadAccounts()
Expand Down Expand Up @@ -201,16 +203,20 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
// indices from all jobs that touched this account.
for k := uint32(0); k < config.MapJobCount; k++ {
var jobErr error
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))

// FIXME: This could probably come from a pool. Every
// worker needs to have a connection to every index
// store, so there's no reason to re-open these for each
// inner loop.
innerJobStore, jobErr := index.Connect(url)
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, jobErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to open index at %s", url)
Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}

Expand All @@ -227,7 +233,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Merge failure for index at %s", url)
Errorf("Merge failure for index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}
}
Expand Down Expand Up @@ -281,12 +287,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

prefix := hex.EncodeToString([]byte{b})
for k := uint32(0); k < config.MapJobCount; k++ {
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
var innerErr error
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, innerErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

innerJobStore, innerErr := index.Connect(url)
if innerErr != nil {
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
txLog.WithError(innerErr).Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(innerErr)
}

Expand Down
29 changes: 0 additions & 29 deletions exp/lighthorizon/index/cmd/batch/reduce/set.go

This file was deleted.

10 changes: 10 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
}
switch parsed.Scheme {
case "s3":
s3Url := fmt.Sprintf("%s/%s", config.URL, config.URLSubPath)
parsed, err = url.Parse(s3Url)
if err != nil {
return nil, err
}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
Expand All @@ -33,6 +38,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
fileUrl := filepath.Join(config.URL, config.URLSubPath)
parsed, err = url.Parse(fileUrl)
if err != nil {
return nil, err
}
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ type Store interface {

type StoreConfig struct {
// init time config
URL string
Workers uint32
// the base url for the store resource
URL string
// optional url path to append to the base url to realize the complete url
URLSubPath string
Workers uint32

// runtime config
ClearMemoryOnFlush bool
Expand Down
51 changes: 51 additions & 0 deletions support/collections/set/safeset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package set

import (
"sync"

"golang.org/x/exp/constraints"
)

// safeSet is a simple, thread-safe set implementation. It must be created via
// NewSafeSet.
type safeSet[T constraints.Ordered] struct {
Set[T]
lock sync.RWMutex
}

func NewSafeSet[T constraints.Ordered](capacity int) *safeSet[T] {
return &safeSet[T]{
Set: NewSet[T](capacity),
lock: sync.RWMutex{},
}
}

func (s *safeSet[T]) Add(item T) {
s.lock.Lock()
defer s.lock.Unlock()
s.Set.Add(item)
}

func (s *safeSet[T]) AddSlice(items []T) {
s.lock.Lock()
defer s.lock.Unlock()
s.Set.AddSlice(items)
}

func (s *safeSet[T]) Remove(item T) {
s.lock.Lock()
defer s.lock.Unlock()
s.Set.Remove(item)
}

func (s *safeSet[T]) Contains(item T) bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.Set.Contains(item)
}

func (s *safeSet[T]) Slice() []T {
s.lock.RLock()
defer s.lock.RUnlock()
return s.Set.Slice()
}
7 changes: 7 additions & 0 deletions support/collections/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@ func TestSet(t *testing.T) {
require.True(t, s.Contains("b"))
require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice())
}

func TestSafeSet(t *testing.T) {
s := NewSafeSet[string](0)
s.Add("sanity")
require.True(t, s.Contains("sanity"))
require.False(t, s.Contains("check"))
}
24 changes: 20 additions & 4 deletions support/storage/ondisk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) {
L := b.log.WithField("key", filepath)
localPath := path.Join(b.dir, filepath)

if _, ok := b.lru.Get(localPath); !ok {
// If the lockfile exists, we should defer to the remote source.
_, statErr := os.Stat(nameLockfile(localPath))

if _, ok := b.lru.Get(localPath); !ok || statErr == nil {
// If it doesn't exist in the cache, it might still exist on the disk if
// we've restarted from an existing directory.
local, err := os.Open(localPath)
Expand Down Expand Up @@ -98,7 +101,9 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) {
return remote, nil
}

return teeReadCloser(remote, local), nil
return teeReadCloser(remote, local, func() error {
return os.Remove(nameLockfile(localPath))
}), nil
}

// The cache claims it exists, so just give it a read and send it.
Expand Down Expand Up @@ -147,7 +152,8 @@ func (b *OnDiskCache) Size(filepath string) (int64, error) {
}

L.WithError(err).Debug("retrieving size of cached ledger failed")
b.lru.Remove(localPath) // stale cache?
b.lru.Remove(localPath) // stale cache?
os.Remove(nameLockfile(localPath)) // errors don't matter
}

return b.Storage.Size(filepath)
Expand All @@ -166,7 +172,9 @@ func (b *OnDiskCache) PutFile(filepath string, in io.ReadCloser) error {
L.WithError(err).Error("failed to put file locally")
} else {
// tee upload data into our local file
in = teeReadCloser(in, local)
in = teeReadCloser(in, local, func() error {
return os.Remove(nameLockfile(path.Join(b.dir, filepath)))
})
}

return b.Storage.PutFile(filepath, in)
Expand Down Expand Up @@ -206,11 +214,19 @@ func (b *OnDiskCache) createLocal(filepath string) (*os.File, error) {
if err != nil {
return nil, err
}
_, err = os.Create(nameLockfile(localPath))
if err != nil {
return nil, err
}

b.lru.Add(localPath, struct{}{}) // just use the cache as an array
return local, nil
}

func nameLockfile(file string) string {
return file + ".lock"
}

// The below is a helper interface so that we can use io.TeeReader to write
// data locally immediately as we read it remotely.

Expand Down

0 comments on commit ac90d1f

Please sign in to comment.