diff --git a/benchmarks/xdr_test.go b/benchmarks/xdr_test.go index a39f799382..d89fe1ab26 100644 --- a/benchmarks/xdr_test.go +++ b/benchmarks/xdr_test.go @@ -7,7 +7,6 @@ import ( "github.com/stellar/go/gxdr" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/require" goxdr "github.com/xdrpp/goxdr/xdr" ) @@ -21,69 +20,114 @@ var input = func() []byte { return decoded }() -func BenchmarkXDRUnmarshal(b *testing.B) { - b.StopTimer() - te := xdr.TransactionEnvelope{} +var xdrInput = func() xdr.TransactionEnvelope { + var te xdr.TransactionEnvelope + if err := te.UnmarshalBinary(input); err != nil { + panic(err) + } + return te +}() - // Make sure the input is valid. - err := te.UnmarshalBinary(input) - require.NoError(b, err) - b.StartTimer() - // Benchmark. +var gxdrInput = func() gxdr.TransactionEnvelope { + var te gxdr.TransactionEnvelope + // note goxdr will panic if there's a marshaling error. + te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "") + return te +}() + +func BenchmarkXDRUnmarshalWithReflection(b *testing.B) { + var ( + r bytes.Reader + te xdr.TransactionEnvelope + ) + for i := 0; i < b.N; i++ { + r.Reset(input) + _, _ = xdr.Unmarshal(&r, &te) + } +} + +func BenchmarkXDRUnmarshal(b *testing.B) { + var te xdr.TransactionEnvelope for i := 0; i < b.N; i++ { _ = te.UnmarshalBinary(input) } } func BenchmarkGXDRUnmarshal(b *testing.B) { - b.StopTimer() - te := gxdr.TransactionEnvelope{} - - // Make sure the input is valid, note goxdr will panic if there's a - // marshaling error. - te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "") - b.StartTimer() - - // Benchmark. - r := bytes.NewReader(input) + var ( + te gxdr.TransactionEnvelope + r bytes.Reader + ) for i := 0; i < b.N; i++ { r.Reset(input) - te.XdrMarshal(&goxdr.XdrIn{In: r}, "") + te.XdrMarshal(&goxdr.XdrIn{In: &r}, "") + } +} + +func BenchmarkXDRMarshalWithReflection(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = xdr.Marshal(&bytes.Buffer{}, xdrInput) } } func BenchmarkXDRMarshal(b *testing.B) { - b.StopTimer() - te := xdr.TransactionEnvelope{} + for i := 0; i < b.N; i++ { + _, _ = xdrInput.MarshalBinary() + } +} - // Make sure the input is valid. - err := te.UnmarshalBinary(input) - require.NoError(b, err) - output, err := te.MarshalBinary() - require.NoError(b, err) - require.Equal(b, input, output) - b.StartTimer() +func BenchmarkXDRMarshalWithEncodingBuffer(b *testing.B) { + e := xdr.NewEncodingBuffer() + for i := 0; i < b.N; i++ { + _, _ = e.UnsafeMarshalBinary(xdrInput) + } +} +func BenchmarkGXDRMarshal(b *testing.B) { + var output bytes.Buffer // Benchmark. for i := 0; i < b.N; i++ { - _, _ = te.MarshalBinary() + output.Reset() + gxdrInput.XdrMarshal(&goxdr.XdrOut{Out: &output}, "") } } -func BenchmarkGXDRMarshal(b *testing.B) { - b.StopTimer() - te := gxdr.TransactionEnvelope{} +func BenchmarkXDRMarshalHex(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = xdr.MarshalHex(xdrInput) + } +} - // Make sure the input is valid, note goxdr will panic if there's a - // marshaling error. - te.XdrMarshal(&goxdr.XdrIn{In: bytes.NewReader(input)}, "") - output := bytes.Buffer{} - te.XdrMarshal(&goxdr.XdrOut{Out: &output}, "") +func BenchmarkXDRMarshalHexWithEncodingBuffer(b *testing.B) { + e := xdr.NewEncodingBuffer() + for i := 0; i < b.N; i++ { + _, _ = e.MarshalHex(xdrInput) + } +} - b.StartTimer() - // Benchmark. +func BenchmarkXDRUnsafeMarshalHexWithEncodingBuffer(b *testing.B) { + e := xdr.NewEncodingBuffer() for i := 0; i < b.N; i++ { - output.Reset() - te.XdrMarshal(&goxdr.XdrOut{Out: &output}, "") + _, _ = e.UnsafeMarshalHex(xdrInput) + } +} + +func BenchmarkXDRMarshalBase64(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = xdr.MarshalBase64(xdrInput) + } +} + +func BenchmarkXDRMarshalBase64WithEncodingBuffer(b *testing.B) { + e := xdr.NewEncodingBuffer() + for i := 0; i < b.N; i++ { + _, _ = e.MarshalBase64(xdrInput) + } +} + +func BenchmarkXDRUnsafeMarshalBase64WithEncodingBuffer(b *testing.B) { + e := xdr.NewEncodingBuffer() + for i := 0; i < b.N; i++ { + _, _ = e.UnsafeMarshalBase64(xdrInput) } } diff --git a/ingest/change_compactor.go b/ingest/change_compactor.go index e785e1f515..dfac201269 100644 --- a/ingest/change_compactor.go +++ b/ingest/change_compactor.go @@ -49,14 +49,16 @@ import ( // already removed. type ChangeCompactor struct { // ledger key => Change - cache map[string]Change - mutex sync.Mutex + cache map[string]Change + mutex sync.Mutex + encodingBuffer *xdr.EncodingBuffer } // NewChangeCompactor returns a new ChangeCompactor. func NewChangeCompactor() *ChangeCompactor { return &ChangeCompactor{ - cache: make(map[string]Change), + cache: make(map[string]Change), + encodingBuffer: xdr.NewEncodingBuffer(), } } @@ -86,7 +88,8 @@ func (c *ChangeCompactor) AddChange(change Change) error { // addCreatedChange adds a change to the cache, but returns an error if create // change is unexpected. func (c *ChangeCompactor) addCreatedChange(change Change) error { - ledgerKey, err := change.Post.LedgerKey().MarshalBinary() + // safe, since we later cast to string (causing a copy) + ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Post.LedgerKey()) if err != nil { return errors.Wrap(err, "Error MarshalBinary") } @@ -128,7 +131,8 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error { // addUpdatedChange adds a change to the cache, but returns an error if update // change is unexpected. func (c *ChangeCompactor) addUpdatedChange(change Change) error { - ledgerKey, err := change.Post.LedgerKey().MarshalBinary() + // safe, since we later cast to string (causing a copy) + ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Post.LedgerKey()) if err != nil { return errors.Wrap(err, "Error MarshalBinary") } @@ -171,7 +175,8 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error { // addRemovedChange adds a change to the cache, but returns an error if remove // change is unexpected. func (c *ChangeCompactor) addRemovedChange(change Change) error { - ledgerKey, err := change.Pre.LedgerKey().MarshalBinary() + // safe, since we later cast to string (causing a copy) + ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(change.Pre.LedgerKey()) if err != nil { return errors.Wrap(err, "Error MarshalBinary") } diff --git a/services/horizon/internal/db2/history/transaction_batch_insert_builder.go b/services/horizon/internal/db2/history/transaction_batch_insert_builder.go index 50fadbea53..ddbe401222 100644 --- a/services/horizon/internal/db2/history/transaction_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/transaction_batch_insert_builder.go @@ -31,12 +31,14 @@ type TransactionBatchInsertBuilder interface { // transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder type transactionBatchInsertBuilder struct { - builder db.BatchInsertBuilder + encodingBuffer *xdr.EncodingBuffer + builder db.BatchInsertBuilder } // NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder { return &transactionBatchInsertBuilder{ + encodingBuffer: xdr.NewEncodingBuffer(), builder: db.BatchInsertBuilder{ Table: q.GetTable("history_transactions"), MaxBatchSize: maxBatchSize, @@ -46,7 +48,7 @@ func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchI // Add adds a new transaction to the batch func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error { - row, err := transactionToRow(transaction, sequence) + row, err := i.transactionToRow(transaction, sequence) if err != nil { return err } @@ -229,20 +231,20 @@ type TransactionWithoutLedger struct { InnerSignatures pq.StringArray `db:"inner_signatures"` } -func transactionToRow(transaction ingest.LedgerTransaction, sequence uint32) (TransactionWithoutLedger, error) { - envelopeBase64, err := xdr.MarshalBase64(transaction.Envelope) +func (i *transactionBatchInsertBuilder) transactionToRow(transaction ingest.LedgerTransaction, sequence uint32) (TransactionWithoutLedger, error) { + envelopeBase64, err := i.encodingBuffer.MarshalBase64(transaction.Envelope) if err != nil { return TransactionWithoutLedger{}, err } - resultBase64, err := xdr.MarshalBase64(transaction.Result.Result) + resultBase64, err := i.encodingBuffer.MarshalBase64(&transaction.Result.Result) if err != nil { return TransactionWithoutLedger{}, err } - metaBase64, err := xdr.MarshalBase64(transaction.UnsafeMeta) + metaBase64, err := i.encodingBuffer.MarshalBase64(transaction.UnsafeMeta) if err != nil { return TransactionWithoutLedger{}, err } - feeMetaBase64, err := xdr.MarshalBase64(transaction.FeeChanges) + feeMetaBase64, err := i.encodingBuffer.MarshalBase64(transaction.FeeChanges) if err != nil { return TransactionWithoutLedger{}, err } diff --git a/services/horizon/internal/db2/history/transaction_batch_insert_builder_test.go b/services/horizon/internal/db2/history/transaction_batch_insert_builder_test.go index 336fdc5b6e..aaf623a545 100644 --- a/services/horizon/internal/db2/history/transaction_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/transaction_batch_insert_builder_test.go @@ -84,7 +84,10 @@ func TestTransactionToMap_muxed(t *testing.T) { }, }, } - row, err := transactionToRow(tx, 20) + b := &transactionBatchInsertBuilder{ + encodingBuffer: xdr.NewEncodingBuffer(), + } + row, err := b.transactionToRow(tx, 20) assert.NoError(t, err) assert.Equal(t, innerAccountID.Address(), row.Account) @@ -165,7 +168,10 @@ func TestTransactionToMap_SourceMuxedAndFeeSourceUnmuxed(t *testing.T) { }, }, } - row, err := transactionToRow(tx, 20) + b := &transactionBatchInsertBuilder{ + encodingBuffer: xdr.NewEncodingBuffer(), + } + row, err := b.transactionToRow(tx, 20) assert.NoError(t, err) assert.Equal(t, innerAccountID.Address(), row.Account) diff --git a/services/horizon/internal/ingest/orderbook.go b/services/horizon/internal/ingest/orderbook.go index a97a273113..9ec79c7824 100644 --- a/services/horizon/internal/ingest/orderbook.go +++ b/services/horizon/internal/ingest/orderbook.go @@ -35,6 +35,7 @@ type OrderBookStream struct { LatestLedgerGauge prometheus.Gauge lastLedger uint32 lastVerification time.Time + encodingBuffer *xdr.EncodingBuffer } // NewOrderBookStream constructs and initializes an OrderBookStream instance @@ -46,6 +47,7 @@ func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *O Namespace: "horizon", Subsystem: "order_book_stream", Name: "latest_ledger", }), lastVerification: time.Now(), + encodingBuffer: xdr.NewEncodingBuffer(), } } @@ -226,11 +228,11 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context) (bool, error) { for i, offerRow := range ingestionOffers { offerEntry := offers[i] offerRowXDR := offerToXDR(offerRow) - offerEntryBase64, err := xdr.MarshalBase64(offerEntry) + offerEntryBase64, err := o.encodingBuffer.MarshalBase64(&offerEntry) if err != nil { return false, errors.Wrap(err, "Error from marshalling offerEntry") } - offerRowBase64, err := xdr.MarshalBase64(offerRowXDR) + offerRowBase64, err := o.encodingBuffer.MarshalBase64(&offerRowXDR) if err != nil { return false, errors.Wrap(err, "Error from marshalling offerRowXDR") } @@ -275,11 +277,11 @@ func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context) (bool, er if err != nil { return false, errors.Wrap(err, "Error from converting liquidity pool row to xdr") } - liquidityPoolEntryBase64, err := xdr.MarshalBase64(liquidityPoolEntry) + liquidityPoolEntryBase64, err := o.encodingBuffer.MarshalBase64(&liquidityPoolEntry) if err != nil { return false, errors.Wrap(err, "Error from marshalling liquidityPoolEntry") } - liquidityPoolRowBase64, err := xdr.MarshalBase64(liquidityPoolRowXDR) + liquidityPoolRowBase64, err := o.encodingBuffer.MarshalBase64(&liquidityPoolRowXDR) if err != nil { return false, errors.Wrap(err, "Error from marshalling liquidityPoolRowXDR") } diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index d5463b6f5e..fae72f2c49 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -10,12 +10,14 @@ import ( ) type ClaimableBalancesChangeProcessor struct { + encodingBuffer *xdr.EncodingBuffer qClaimableBalances history.QClaimableBalances cache *ingest.ChangeCompactor } func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *ClaimableBalancesChangeProcessor { p := &ClaimableBalancesChangeProcessor{ + encodingBuffer: xdr.NewEncodingBuffer(), qClaimableBalances: Q, } p.reset() @@ -65,7 +67,7 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { case change.Pre != nil && change.Post == nil: // Removed cBalance := change.Pre.Data.MustClaimableBalance() - id, err := xdr.MarshalHex(cBalance.BalanceId) + id, err := p.encodingBuffer.MarshalHex(cBalance.BalanceId) if err != nil { return err } diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index bb5917a8df..9c5dbefacd 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -648,8 +648,9 @@ func addClaimableBalanceToStateVerifier( } var idStrings []string + e := xdr.NewEncodingBuffer() for _, id := range ids { - idString, err := xdr.MarshalHex(id) + idString, err := e.MarshalHex(id) if err != nil { return err } diff --git a/xdr/main.go b/xdr/main.go index 220eb29d5c..cd08e82676 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -47,7 +47,7 @@ func safeUnmarshalString(decoder func(reader io.Reader) io.Reader, data string, } // SafeUnmarshalBase64 first decodes the provided reader from base64 before -// decoding the xdr into the provided destination. Also ensures that the reader +// decoding the xdr into the provided destination. Also ensures that the reader // is fully consumed. func SafeUnmarshalBase64(data string, dest interface{}) error { return safeUnmarshalString( @@ -60,7 +60,7 @@ func SafeUnmarshalBase64(data string, dest interface{}) error { } // SafeUnmarshalHex first decodes the provided reader from hex before -// decoding the xdr into the provided destination. Also ensures that the reader +// decoding the xdr into the provided destination. Also ensures that the reader // is fully consumed. func SafeUnmarshalHex(data string, dest interface{}) error { return safeUnmarshalString(hex.NewDecoder, data, dest) @@ -103,6 +103,86 @@ func MarshalHex(v interface{}) (string, error) { return marshalString(hex.EncodeToString, v) } +// EncodingBuffer reuses internal buffers between invocations to minimize allocations. +// It intentionally only allows EncodeTo method arguments, to guarantee high performance encoding. +type EncodingBuffer struct { + encoder *xdr.Encoder + xdrEncoderBuf bytes.Buffer + otherEncodersBuf []byte +} + +func growSlice(old []byte, newSize int) []byte { + oldCap := cap(old) + if newSize <= oldCap { + return old[:newSize] + } + // the array doesn't fit, lets return a new one with double the capacity + // to avoid further resizing + return make([]byte, newSize, 2*newSize) +} + +type XDREncodable interface { + EncodeTo(e *xdr.Encoder) error +} + +func NewEncodingBuffer() *EncodingBuffer { + var ret EncodingBuffer + ret.encoder = xdr.NewEncoder(&ret.xdrEncoderBuf) + return &ret +} + +// UnsafeMarshalBinary marshals the input XDR binary, returning +// a slice pointing to the internal buffer. Handled with care this improveds +// performance since copying is not required. +// Subsequent calls to marshalling methods will overwrite the returned buffer. +func (e *EncodingBuffer) UnsafeMarshalBinary(encodable XDREncodable) ([]byte, error) { + e.xdrEncoderBuf.Reset() + if err := encodable.EncodeTo(e.encoder); err != nil { + return nil, err + } + return e.xdrEncoderBuf.Bytes(), nil +} + +// UnsafeMarshalBase64 is the base64 version of UnsafeMarshalBinary +func (e *EncodingBuffer) UnsafeMarshalBase64(encodable XDREncodable) ([]byte, error) { + xdrEncoded, err := e.UnsafeMarshalBinary(encodable) + if err != nil { + return nil, err + } + neededLen := base64.StdEncoding.EncodedLen(len(xdrEncoded)) + e.otherEncodersBuf = growSlice(e.otherEncodersBuf, neededLen) + base64.StdEncoding.Encode(e.otherEncodersBuf, xdrEncoded) + return e.otherEncodersBuf, nil +} + +// UnsafeMarshalHex is the hex version of UnsafeMarshalBinary +func (e *EncodingBuffer) UnsafeMarshalHex(encodable XDREncodable) ([]byte, error) { + xdrEncoded, err := e.UnsafeMarshalBinary(encodable) + if err != nil { + return nil, err + } + neededLen := hex.EncodedLen(len(xdrEncoded)) + e.otherEncodersBuf = growSlice(e.otherEncodersBuf, neededLen) + hex.Encode(e.otherEncodersBuf, xdrEncoded) + return e.otherEncodersBuf, nil +} + +func (e *EncodingBuffer) MarshalBase64(encodable XDREncodable) (string, error) { + b, err := e.UnsafeMarshalBase64(encodable) + if err != nil { + return "", err + } + return string(b), nil +} + +func (e *EncodingBuffer) MarshalHex(encodable XDREncodable) (string, error) { + b, err := e.UnsafeMarshalHex(encodable) + if err != nil { + return "", err + } + return string(b), nil +} + func MarshalFramed(w io.Writer, v interface{}) error { var tmp bytes.Buffer n, err := Marshal(&tmp, v)