Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrating new loaders and builders into processors #5083

Merged
merged 14 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
horizon-postgres:
image: postgres:9.6.17-alpine
image: postgres:postgres:12-bullseye
restart: on-failure
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
Expand Down
4 changes: 1 addition & 3 deletions services/horizon/docker/verify-range/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
FROM ubuntu:20.04

MAINTAINER Bartek Nowotarski <[email protected]>

ARG STELLAR_CORE_VERSION
ENV STELLAR_CORE_VERSION=${STELLAR_CORE_VERSION:-*}
# to remove tzdata interactive flow
ENV DEBIAN_FRONTEND=noninteractive

ADD dependencies /
RUN ["chmod", "+x", "dependencies"]
RUN ["chmod", "+x", "/dependencies"]
RUN /dependencies

ADD stellar-core.cfg /
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/docker/verify-range/dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/
apt-get update
apt-get install -y stellar-core=${STELLAR_CORE_VERSION}

wget -q https://dl.google.com/go/go1.18.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.18.linux-amd64.tar.gz
wget -q https://dl.google.com/go/go1.20.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.20.linux-amd64.tar.gz

git clone https://github.com/stellar/go.git stellar-go
cd stellar-go
# By default "git fetch" only fetches refs/<branchname>
# Below ensures we also fetch PR refs
git config --add remote.origin.fetch "+refs/pull/*/head:refs/remotes/origin/pull/*"
git fetch --force --quiet origin
/usr/local/go/bin/go build -v ./services/horizon
/usr/local/go/bin/go build -v ./services/horizon/.
sreuland marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions services/horizon/internal/actions/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func checkOuterHashResponse(
}

func TestFeeBumpTransactionPage(t *testing.T) {

tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
Expand Down
15 changes: 10 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
return a.loader.GetNow(a.address)
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
func (a *AccountLoader) GetNow(address string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid account loader state,
Exec was not called yet to properly seal and resolve %v id`, address)
}
if internalID, ok := a.ids[address]; !ok {
return 0, fmt.Errorf(`account loader address %q was not found`, address)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -205,5 +209,6 @@ func NewAccountLoaderStub() AccountLoaderStub {
// Insert updates the wrapped AccountLoader so that the given account
// address is mapped to the provided history account id
func (a AccountLoaderStub) Insert(address string, id int64) {
a.Loader.sealed = true
a.Loader.ids[address] = id
}
24 changes: 10 additions & 14 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid account loader state,`)
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -42,15 +37,16 @@
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
for _, address := range addresses {
internalId, err := loader.GetNow(address)

Check failure on line 41 in services/horizon/internal/db2/history/account_loader_test.go

View workflow job for this annotation

GitHub Actions / golangci

ST1003: var internalId should be internalID (stylecheck)
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
44 changes: 24 additions & 20 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql/driver"
"fmt"
"sort"
"strings"

sq "github.com/Masterminds/squirrel"

Expand All @@ -21,11 +22,18 @@ type AssetKey struct {
Issuer string
}

func (key AssetKey) String() string {
if key.Type == xdr.AssetTypeToString[xdr.AssetTypeAssetTypeNative] {
return key.Type
}
return key.Type + "/" + key.Code + "/" + key.Issuer
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}

// AssetKeyFromXDR constructs an AssetKey from an xdr asset
func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
return AssetKey{
Type: xdr.AssetTypeToString[asset.Type],
Code: asset.GetCode(),
Code: strings.TrimRight(asset.GetCode(), "\x00"),
Issuer: asset.GetIssuer(),
}
}
Expand All @@ -41,7 +49,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.GetNow(a.asset), nil
return a.loader.GetNow(a.asset)
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -81,11 +89,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid asset loader state,
Exec was not called yet to properly seal and resolve %v id`, asset)
}
if internalID, ok := a.ids[asset]; !ok {
return 0, fmt.Errorf(`asset loader id %v was not found`, asset)
} else {
return id
return internalID, nil
}
}

Expand Down Expand Up @@ -137,6 +149,11 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
assetIssuers := make([]string, 0, len(a.set)-len(a.ids))
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
insert := 0
for _, key := range keys {
if _, ok := a.ids[key]; ok {
Expand All @@ -152,20 +169,6 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
return nil
}
keys = keys[:insert]
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Slice(keys, func(i, j int) bool {
if keys[i].Type < keys[j].Type {
return true
}
if keys[i].Code < keys[j].Code {
return true
}
if keys[i].Issuer < keys[j].Issuer {
return true
}
return false
})

err := bulkInsert(
ctx,
Expand Down Expand Up @@ -211,5 +214,6 @@ func NewAssetLoaderStub() AssetLoaderStub {
// Insert updates the wrapped AssetLoaderStub so that the given asset
// address is mapped to the provided history asset id
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.sealed = true
a.Loader.ids[asset] = id
}
67 changes: 47 additions & 20 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import (
"github.com/stellar/go/xdr"
)

func TestAssetKeyToString(t *testing.T) {
num4key := AssetKey{
Type: "credit_alphanum4",
Code: "USD",
Issuer: "A1B2C3",
}

num12key := AssetKey{
Type: "credit_alphanum12",
Code: "USDABC",
Issuer: "A1B2C3",
}

nativekey := AssetKey{
Type: "native",
}

assert.Equal(t, num4key.String(), "credit_alphanum4/USD/A1B2C3")
assert.Equal(t, num12key.String(), "credit_alphanum12/USDABC/A1B2C3")
assert.Equal(t, nativekey.String(), "native")
}

func TestAssetLoader(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand All @@ -22,30 +44,34 @@ func TestAssetLoader(t *testing.T) {
for i := 0; i < 100; i++ {
var key AssetKey
if i == 0 {
key.Type = "native"
key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative})
} else if i%2 == 0 {
key.Type = "credit_alphanum4"
key.Code = fmt.Sprintf("ab%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [4]byte{0, 0, 0, 0}
copy(code[:], fmt.Sprintf("ab%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})
} else {
key.Type = "credit_alphanum12"
key.Code = fmt.Sprintf("abcdef%d", i)
key.Issuer = keypair.MustRandom().Address()
code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
copy(code[:], fmt.Sprintf("abcdef%d", i))
key = AssetKeyFromXDR(xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum12,
AlphaNum12: &xdr.AlphaNum12{
AssetCode: code,
Issuer: xdr.MustAddress(keypair.MustRandom().Address())}})

}
keys = append(keys, key)
}

loader := NewAssetLoader()
var futures []FutureAssetID
for _, key := range keys {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid asset loader state,`)
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -56,12 +82,9 @@ func TestAssetLoader(t *testing.T) {
})

q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.GetNow(key)
val, err := future.Value()
for _, key := range keys {
internalID, err := loader.GetNow(key)
assert.NoError(t, err)
assert.Equal(t, internalID, val)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
Expand All @@ -72,4 +95,8 @@ func TestAssetLoader(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureClaimableBalanceID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.getNow(a.id)
}

// ClaimableBalanceLoader will map claimable balance ids to their internal
Expand Down Expand Up @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID {
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// call can succeed.
func (a *ClaimableBalanceLoader) getNow(id string) int64 {
func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) {
if !a.sealed {
return 0, fmt.Errorf(`invalid claimable balance loader state,
Exec was not called yet to properly seal and resolve %v id`, id)
}
if internalID, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id)
} else {
return internalID
return internalID, nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) {
for _, id := range ids {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
})
assert.Panics(t, func() {
future.Value()
})
_, err := future.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `invalid claimable balance loader state,`)
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}
Expand All @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
val, err := future.Value()
internalID, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
Loading
Loading