Skip to content

Commit

Permalink
horizon/ingest: ingest effects from offer created/updated/deleted
Browse files Browse the repository at this point in the history
This PR starts to ingest effects of an offer created/updated/deleted
with a trade. However, it can't handle the case where an offer is
affected by a path payment.

Close stellar#166
  • Loading branch information
howardtw committed Mar 22, 2019
1 parent d329514 commit a89d713
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 68 deletions.
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/batch_insert_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (b *BatchInsertBuilder) Values(params ...interface{}) error {
b.initOnce.Do(b.init)

if len(params) != len(b.Columns) {
return errors.New(fmt.Sprintf("Number of values doesn't match columns in %s", b.TableName))
return errors.Errorf("Number of values doesn't match columns in %s", b.TableName)
}

b.rows = append(b.rows, params)
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/effect_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func (ei *EffectIngestion) Add(aid xdr.AccountId, typ history.EffectType, detail
return false
}

// This variable doesn't seem to be thread safe. Perhaps it doesn't have to be at this moment?
ei.added++

ei.err = ei.Dest.Effect(Address(aid.Address()), ei.OperationID, ei.added, typ, details)
Expand Down
8 changes: 5 additions & 3 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ type Config struct {
type EffectIngestion struct {
Dest *Ingestion
OperationID int64
err error
added int
parent *Ingestion
// TODO: take err out
err error
added int
parent *Ingestion
}

// LedgerBundle represents a single ledger's worth of novelty created by one
Expand Down Expand Up @@ -193,6 +194,7 @@ type Session struct {
// Results fields
//

// TODO: take Err out
// Err is the error that caused this session to fail, if any.
Err error
// Ingested is the number of ledgers that were successfully ingested during
Expand Down
46 changes: 39 additions & 7 deletions services/horizon/internal/ingest/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,26 @@ func (is *Session) ingestEffects() {
is.assetDetails(dets, op.SendAsset, "")
effects.Add(source, history.EffectAccountDebited, dets)

is.ingestTradeEffects(effects, source, resultSuccess.Offers)
is.ingestTradeEffects(effects, source, resultSuccess.Offers, nil)

case xdr.OperationTypeManageOffer:
result := is.Cursor.OperationResult().MustManageOfferResult().MustSuccess()
is.ingestTradeEffects(effects, source, result.OffersClaimed)
is.ingestTradeEffects(effects, source, result.OffersClaimed, &result.Offer)

case xdr.OperationTypeCreatePassiveOffer:
claims := []xdr.ClaimOfferAtom{}
var offerSuccess xdr.ManageOfferSuccessResult
result := is.Cursor.OperationResult()

// KNOWN ISSUE: stellar-core creates results for CreatePassiveOffer operations
// with the wrong result arm set.
if result.Type == xdr.OperationTypeManageOffer {
claims = result.MustManageOfferResult().MustSuccess().OffersClaimed
offerSuccess = result.MustManageOfferResult().MustSuccess()
} else {
claims = result.MustCreatePassiveOfferResult().MustSuccess().OffersClaimed
offerSuccess = result.MustCreatePassiveOfferResult().MustSuccess()
}

is.ingestTradeEffects(effects, source, claims)
is.ingestTradeEffects(effects, source, offerSuccess.OffersClaimed, &offerSuccess.Offer)

case xdr.OperationTypeSetOptions:
op := opbody.MustSetOptionsOp()

Expand Down Expand Up @@ -598,7 +601,7 @@ func (is *Session) ingestTrades() {
}
}

func (is *Session) ingestTradeEffects(effects *EffectIngestion, buyer xdr.AccountId, claims []xdr.ClaimOfferAtom) {
func (is *Session) ingestTradeEffects(effects *EffectIngestion, buyer xdr.AccountId, claims []xdr.ClaimOfferAtom, offer *xdr.ManageOfferSuccessResultOffer) {
if is.Err != nil {
return
}
Expand All @@ -613,6 +616,35 @@ func (is *Session) ingestTradeEffects(effects *EffectIngestion, buyer xdr.Accoun
effects.Add(buyer, history.EffectTrade, bd)
effects.Add(seller, history.EffectTrade, sd)
}

// in case of path payments
if offer == nil {
return
}

offerDetails := map[string]interface{}{
"offer_id": (*offer).Offer.OfferId,
"seller": (*offer).Offer.SellerId.Address(),
"amount": amount.String((*offer).Offer.Amount),
"price": (*offer).Offer.Price.String(),
"price_r": map[string]interface{}{
"n": (*offer).Offer.Price.N,
"d": (*offer).Offer.Price.D,
},
}
is.assetDetails(offerDetails, (*offer).Offer.Buying, "buying_")
is.assetDetails(offerDetails, (*offer).Offer.Selling, "selling_")

switch (*offer).Effect {
case xdr.ManageOfferEffectManageOfferCreated:
effects.Add(buyer, history.EffectOfferCreated, offerDetails)

case xdr.ManageOfferEffectManageOfferUpdated:
effects.Add(buyer, history.EffectOfferUpdated, offerDetails)

case xdr.ManageOfferEffectManageOfferDeleted:
effects.Add(buyer, history.EffectOfferRemoved, offerDetails)
}
}

func (is *Session) tradeDetails(buyer, seller xdr.AccountId, claim xdr.ClaimOfferAtom) (bd map[string]interface{}, sd map[string]interface{}) {
Expand Down
101 changes: 44 additions & 57 deletions services/horizon/internal/resourceadapter/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stellar/go/protocols/horizon/effects"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/httpx"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
)

Expand Down Expand Up @@ -39,95 +40,81 @@ var EffectTypeNames = map[history.EffectType]string{

// NewEffect creates a new effect resource from the provided database representation
// of the effect.
func NewEffect(
ctx context.Context,
row history.Effect,
ledger history.Ledger,
) (result hal.Pageable, err error) {

func NewEffect(ctx context.Context, row history.Effect, ledger history.Ledger) (hal.Pageable, error) {
basev := effects.Base{}
PopulateBaseEffect(ctx, &basev, row, ledger)

var (
e interface{}
result hal.Pageable
)
switch row.Type {
case history.EffectAccountCreated:
e := effects.AccountCreated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountCreated{Base: basev}

case history.EffectAccountCredited:
e := effects.AccountCredited{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountCredited{Base: basev}

case history.EffectAccountDebited:
e := effects.AccountDebited{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountDebited{Base: basev}

case history.EffectAccountThresholdsUpdated:
e := effects.AccountThresholdsUpdated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountThresholdsUpdated{Base: basev}

case history.EffectAccountHomeDomainUpdated:
e := effects.AccountHomeDomainUpdated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountHomeDomainUpdated{Base: basev}

case history.EffectAccountFlagsUpdated:
e := effects.AccountFlagsUpdated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.AccountFlagsUpdated{Base: basev}

case history.EffectSignerCreated:
e := effects.SignerCreated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.SignerCreated{Base: basev}

case history.EffectSignerUpdated:
e := effects.SignerUpdated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.SignerUpdated{Base: basev}

case history.EffectSignerRemoved:
e := effects.SignerRemoved{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.SignerRemoved{Base: basev}

case history.EffectTrustlineCreated:
e := effects.TrustlineCreated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.TrustlineCreated{Base: basev}

case history.EffectTrustlineUpdated:
e := effects.TrustlineUpdated{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.TrustlineUpdated{Base: basev}

case history.EffectTrustlineRemoved:
e := effects.TrustlineRemoved{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.TrustlineRemoved{Base: basev}

case history.EffectTrustlineAuthorized:
e := effects.TrustlineAuthorized{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.TrustlineAuthorized{Base: basev}

case history.EffectTrustlineDeauthorized:
e := effects.TrustlineDeauthorized{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.TrustlineDeauthorized{Base: basev}

case history.EffectTrade:
e := effects.Trade{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.Trade{Base: basev}

case history.EffectSequenceBumped:
e := effects.SequenceBumped{Base: basev}
err = row.UnmarshalDetails(&e)
result = e
e = effects.SequenceBumped{Base: basev}

default:
result = basev
}

err := row.UnmarshalDetails(&e)
if err != nil {
return
return result, errors.Wrap(err, "unmarshaling effect details")
}

rh, ok := result.(base.Rehydratable)
result = e.(hal.Pageable)

rh, ok := result.(base.Rehydratable)
if ok {
// TODO: remove the returned error as it's always nil
err = rh.Rehydrate()
}

return
return result, errors.Wrap(err, "rehydrating")
}

// Populate loads this resource from `row`
Expand Down

0 comments on commit a89d713

Please sign in to comment.