Skip to content

Commit

Permalink
Create xdr.EncodingBuffer, which reduces buffer allocations (#4056)
Browse files Browse the repository at this point in the history
The improvement is considerable. It reduces the allocations (and CPU consumption) by roughly half.

```
goos: darwin
goarch: amd64
pkg: github.com/stellar/go/benchmarks
cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz
BenchmarkXDRMarshal
BenchmarkXDRMarshal-8                          	 1000000	      1050 ns/op	    1352 B/op	      14 allocs/op
BenchmarkXDRMarshalWithEncoder
BenchmarkXDRMarshalWithEncoder-8               	 1909111	       619.1 ns/op	     176 B/op	       9 allocs/op
BenchmarkGXDRMarshal
BenchmarkGXDRMarshal-8                         	  151929	      7880 ns/op	    2152 B/op	     157 allocs/op
BenchmarkXDRMarshalHex
BenchmarkXDRMarshalHex-8                       	  473662	      2199 ns/op	    3640 B/op	      19 allocs/op
BenchmarkXDRMarshalHexWithEncoder
BenchmarkXDRMarshalHexWithEncoder-8            	  846926	      1406 ns/op	    1072 B/op	      10 allocs/op
BenchmarkXDRUnsafeMarshalHexWithEncoder
BenchmarkXDRUnsafeMarshalHexWithEncoder-8      	 1000000	      1137 ns/op	     176 B/op	       9 allocs/op
BenchmarkXDRMarshalBase64
BenchmarkXDRMarshalBase64-8                    	  555267	      1918 ns/op	    3000 B/op	      19 allocs/op
BenchmarkXDRMarshalBase64WithEncoder
BenchmarkXDRMarshalBase64WithEncoder-8         	  998617	      1217 ns/op	     752 B/op	      10 allocs/op
BenchmarkXDRUnsafeMarshalBase64WithEncoder
BenchmarkXDRUnsafeMarshalBase64WithEncoder-8   	 1000000	      1048 ns/op	     176 B/op	       9 allocs/op
PASS
```
  • Loading branch information
2opremio authored Nov 10, 2021
1 parent 371f727 commit 5db7e40
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 65 deletions.
128 changes: 86 additions & 42 deletions benchmarks/xdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stellar/go/gxdr"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/require"
goxdr "github.com/xdrpp/goxdr/xdr"
)

Expand All @@ -21,69 +20,114 @@ var input = func() []byte {
return decoded
}()

func BenchmarkXDRUnmarshal(b *testing.B) {
b.StopTimer()
te := xdr.TransactionEnvelope{}
var xdrInput = func() xdr.TransactionEnvelope {
var te xdr.TransactionEnvelope
if err := te.UnmarshalBinary(input); err != nil {
panic(err)
}
return te
}()

// Make sure the input is valid.
err := te.UnmarshalBinary(input)
require.NoError(b, err)
b.StartTimer()
// Benchmark.
var gxdrInput = func() gxdr.TransactionEnvelope {
var te gxdr.TransactionEnvelope
// note goxdr will panic if there's a marshaling error.
te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "")
return te
}()

func BenchmarkXDRUnmarshalWithReflection(b *testing.B) {
var (
r bytes.Reader
te xdr.TransactionEnvelope
)
for i := 0; i < b.N; i++ {
r.Reset(input)
_, _ = xdr.Unmarshal(&r, &te)
}
}

func BenchmarkXDRUnmarshal(b *testing.B) {
var te xdr.TransactionEnvelope
for i := 0; i < b.N; i++ {
_ = te.UnmarshalBinary(input)
}
}

func BenchmarkGXDRUnmarshal(b *testing.B) {
b.StopTimer()
te := gxdr.TransactionEnvelope{}

// Make sure the input is valid, note goxdr will panic if there's a
// marshaling error.
te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "")
b.StartTimer()

// Benchmark.
r := bytes.NewReader(input)
var (
te gxdr.TransactionEnvelope
r bytes.Reader
)
for i := 0; i < b.N; i++ {
r.Reset(input)
te.XdrMarshal(&goxdr.XdrIn{In: r}, "")
te.XdrMarshal(&goxdr.XdrIn{In: &r}, "")
}
}

func BenchmarkXDRMarshalWithReflection(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = xdr.Marshal(&bytes.Buffer{}, xdrInput)
}
}

func BenchmarkXDRMarshal(b *testing.B) {
b.StopTimer()
te := xdr.TransactionEnvelope{}
for i := 0; i < b.N; i++ {
_, _ = xdrInput.MarshalBinary()
}
}

// Make sure the input is valid.
err := te.UnmarshalBinary(input)
require.NoError(b, err)
output, err := te.MarshalBinary()
require.NoError(b, err)
require.Equal(b, input, output)
b.StartTimer()
func BenchmarkXDRMarshalWithEncodingBuffer(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
_, _ = e.UnsafeMarshalBinary(xdrInput)
}
}

func BenchmarkGXDRMarshal(b *testing.B) {
var output bytes.Buffer
// Benchmark.
for i := 0; i < b.N; i++ {
_, _ = te.MarshalBinary()
output.Reset()
gxdrInput.XdrMarshal(&goxdr.XdrOut{Out: &output}, "")
}
}

func BenchmarkGXDRMarshal(b *testing.B) {
b.StopTimer()
te := gxdr.TransactionEnvelope{}
func BenchmarkXDRMarshalHex(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = xdr.MarshalHex(xdrInput)
}
}

// Make sure the input is valid, note goxdr will panic if there's a
// marshaling error.
te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "")
output := bytes.Buffer{}
te.XdrMarshal(&goxdr.XdrOut{Out: &output}, "")
func BenchmarkXDRMarshalHexWithEncodingBuffer(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
_, _ = e.MarshalHex(xdrInput)
}
}

b.StartTimer()
// Benchmark.
func BenchmarkXDRUnsafeMarshalHexWithEncodingBuffer(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
output.Reset()
te.XdrMarshal(&goxdr.XdrOut{Out: &output}, "")
_, _ = e.UnsafeMarshalHex(xdrInput)
}
}

func BenchmarkXDRMarshalBase64(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = xdr.MarshalBase64(xdrInput)
}
}

func BenchmarkXDRMarshalBase64WithEncodingBuffer(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
_, _ = e.MarshalBase64(xdrInput)
}
}

func BenchmarkXDRUnsafeMarshalBase64WithEncodingBuffer(b *testing.B) {
e := xdr.NewEncodingBuffer()
for i := 0; i < b.N; i++ {
_, _ = e.UnsafeMarshalBase64(xdrInput)
}
}
17 changes: 11 additions & 6 deletions ingest/change_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ import (
// already removed.
type ChangeCompactor struct {
// ledger key => Change
cache map[string]Change
mutex sync.Mutex
cache map[string]Change
mutex sync.Mutex
encodingBuffer *xdr.EncodingBuffer
}

// NewChangeCompactor returns a new ChangeCompactor.
func NewChangeCompactor() *ChangeCompactor {
return &ChangeCompactor{
cache: make(map[string]Change),
cache: make(map[string]Change),
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

Expand Down Expand Up @@ -86,7 +88,8 @@ func (c *ChangeCompactor) AddChange(change Change) error {
// addCreatedChange adds a change to the cache, but returns an error if create
// change is unexpected.
func (c *ChangeCompactor) addCreatedChange(change Change) error {
ledgerKey, err := change.Post.LedgerKey().MarshalBinary()
// safe, since we later cast to string (causing a copy)
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Post.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error MarshalBinary")
}
Expand Down Expand Up @@ -128,7 +131,8 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
// addUpdatedChange adds a change to the cache, but returns an error if update
// change is unexpected.
func (c *ChangeCompactor) addUpdatedChange(change Change) error {
ledgerKey, err := change.Post.LedgerKey().MarshalBinary()
// safe, since we later cast to string (causing a copy)
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Post.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error MarshalBinary")
}
Expand Down Expand Up @@ -171,7 +175,8 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
// addRemovedChange adds a change to the cache, but returns an error if remove
// change is unexpected.
func (c *ChangeCompactor) addRemovedChange(change Change) error {
ledgerKey, err := change.Pre.LedgerKey().MarshalBinary()
// safe, since we later cast to string (causing a copy)
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Pre.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error MarshalBinary")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ type TransactionBatchInsertBuilder interface {

// transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type transactionBatchInsertBuilder struct {
builder db.BatchInsertBuilder
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
}

// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
return &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transactions"),
MaxBatchSize: maxBatchSize,
Expand All @@ -46,7 +48,7 @@ func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchI

// Add adds a new transaction to the batch
func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error {
row, err := transactionToRow(transaction, sequence)
row, err := i.transactionToRow(transaction, sequence)
if err != nil {
return err
}
Expand Down Expand Up @@ -229,20 +231,20 @@ type TransactionWithoutLedger struct {
InnerSignatures pq.StringArray `db:"inner_signatures"`
}

func transactionToRow(transaction ingest.LedgerTransaction, sequence uint32) (TransactionWithoutLedger, error) {
envelopeBase64, err := xdr.MarshalBase64(transaction.Envelope)
func (i *transactionBatchInsertBuilder) transactionToRow(transaction ingest.LedgerTransaction, sequence uint32) (TransactionWithoutLedger, error) {
envelopeBase64, err := i.encodingBuffer.MarshalBase64(transaction.Envelope)
if err != nil {
return TransactionWithoutLedger{}, err
}
resultBase64, err := xdr.MarshalBase64(transaction.Result.Result)
resultBase64, err := i.encodingBuffer.MarshalBase64(&transaction.Result.Result)
if err != nil {
return TransactionWithoutLedger{}, err
}
metaBase64, err := xdr.MarshalBase64(transaction.UnsafeMeta)
metaBase64, err := i.encodingBuffer.MarshalBase64(transaction.UnsafeMeta)
if err != nil {
return TransactionWithoutLedger{}, err
}
feeMetaBase64, err := xdr.MarshalBase64(transaction.FeeChanges)
feeMetaBase64, err := i.encodingBuffer.MarshalBase64(transaction.FeeChanges)
if err != nil {
return TransactionWithoutLedger{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func TestTransactionToMap_muxed(t *testing.T) {
},
},
}
row, err := transactionToRow(tx, 20)
b := &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
}
row, err := b.transactionToRow(tx, 20)
assert.NoError(t, err)

assert.Equal(t, innerAccountID.Address(), row.Account)
Expand Down Expand Up @@ -165,7 +168,10 @@ func TestTransactionToMap_SourceMuxedAndFeeSourceUnmuxed(t *testing.T) {
},
},
}
row, err := transactionToRow(tx, 20)
b := &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
}
row, err := b.transactionToRow(tx, 20)
assert.NoError(t, err)

assert.Equal(t, innerAccountID.Address(), row.Account)
Expand Down
10 changes: 6 additions & 4 deletions services/horizon/internal/ingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type OrderBookStream struct {
LatestLedgerGauge prometheus.Gauge
lastLedger uint32
lastVerification time.Time
encodingBuffer *xdr.EncodingBuffer
}

// NewOrderBookStream constructs and initializes an OrderBookStream instance
Expand All @@ -46,6 +47,7 @@ func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *O
Namespace: "horizon", Subsystem: "order_book_stream", Name: "latest_ledger",
}),
lastVerification: time.Now(),
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

Expand Down Expand Up @@ -226,11 +228,11 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context) (bool, error) {
for i, offerRow := range ingestionOffers {
offerEntry := offers[i]
offerRowXDR := offerToXDR(offerRow)
offerEntryBase64, err := xdr.MarshalBase64(offerEntry)
offerEntryBase64, err := o.encodingBuffer.MarshalBase64(&offerEntry)
if err != nil {
return false, errors.Wrap(err, "Error from marshalling offerEntry")
}
offerRowBase64, err := xdr.MarshalBase64(offerRowXDR)
offerRowBase64, err := o.encodingBuffer.MarshalBase64(&offerRowXDR)
if err != nil {
return false, errors.Wrap(err, "Error from marshalling offerRowXDR")
}
Expand Down Expand Up @@ -275,11 +277,11 @@ func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context) (bool, er
if err != nil {
return false, errors.Wrap(err, "Error from converting liquidity pool row to xdr")
}
liquidityPoolEntryBase64, err := xdr.MarshalBase64(liquidityPoolEntry)
liquidityPoolEntryBase64, err := o.encodingBuffer.MarshalBase64(&liquidityPoolEntry)
if err != nil {
return false, errors.Wrap(err, "Error from marshalling liquidityPoolEntry")
}
liquidityPoolRowBase64, err := xdr.MarshalBase64(liquidityPoolRowXDR)
liquidityPoolRowBase64, err := o.encodingBuffer.MarshalBase64(&liquidityPoolRowXDR)
if err != nil {
return false, errors.Wrap(err, "Error from marshalling liquidityPoolRowXDR")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
)

type ClaimableBalancesChangeProcessor struct {
encodingBuffer *xdr.EncodingBuffer
qClaimableBalances history.QClaimableBalances
cache *ingest.ChangeCompactor
}

func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *ClaimableBalancesChangeProcessor {
p := &ClaimableBalancesChangeProcessor{
encodingBuffer: xdr.NewEncodingBuffer(),
qClaimableBalances: Q,
}
p.reset()
Expand Down Expand Up @@ -65,7 +67,7 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
case change.Pre != nil && change.Post == nil:
// Removed
cBalance := change.Pre.Data.MustClaimableBalance()
id, err := xdr.MarshalHex(cBalance.BalanceId)
id, err := p.encodingBuffer.MarshalHex(cBalance.BalanceId)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,9 @@ func addClaimableBalanceToStateVerifier(
}

var idStrings []string
e := xdr.NewEncodingBuffer()
for _, id := range ids {
idString, err := xdr.MarshalHex(id)
idString, err := e.MarshalHex(id)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 5db7e40

Please sign in to comment.