Skip to content

Commit

Permalink
Use encoding buffer in state verifier (#4069)
Browse files Browse the repository at this point in the history
Profiling shows there are a lot of allocations due to buffer resizing.
  • Loading branch information
2opremio authored Nov 13, 2021
1 parent 9271fc3 commit 9b9ddf1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 28 deletions.
4 changes: 1 addition & 3 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
}
defer stateReader.Close()

verifier := &verify.StateVerifier{
StateReader: stateReader,
}
verifier := verify.NewStateVerifier(stateReader, nil)

assetStats := processors.AssetStatSet{}
total := int64(0)
Expand Down
44 changes: 27 additions & 17 deletions services/horizon/internal/ingest/verify/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// StateVerifier verifies if ledger entries provided by Add method are the same
// as in the checkpoint ledger entries provided by CheckpointChangeReader.
// The algorithm works in the following way:
// 0. Develop TransformFunction. It should remove all fields and objects not
// 0. Develop `transformFunction`. It should remove all fields and objects not
// stored in your app. For example, if you only store accounts, all other
// ledger entry types should be ignored (return ignore = true).
// 1. In a loop, get entries from history archive by calling GetEntries()
Expand All @@ -32,19 +32,28 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// entries in your storage (to find if some extra entires exist in your
// storage).
// Functions will return StateError type if state is found to be incorrect.
// It's user responsibility to call `StateReader.Close()` when reading is done.
// It's user responsibility to call `stateReader.Close()` when reading is done.
// Check Horizon for an example how to use this tool.
type StateVerifier struct {
StateReader ingest.ChangeReader
// TransformFunction transforms (or ignores) ledger entries streamed from
stateReader ingest.ChangeReader
// transformFunction transforms (or ignores) ledger entries streamed from
// checkpoint buckets to match the form added by `Write`. Read
// TransformLedgerEntryFunction godoc for more information.
TransformFunction TransformLedgerEntryFunction
transformFunction TransformLedgerEntryFunction

readEntries int
readingDone bool

currentEntries map[string]xdr.LedgerEntry
encodingBuffer *xdr.EncodingBuffer
}

func NewStateVerifier(stateReader ingest.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier {
return &StateVerifier{
stateReader: stateReader,
transformFunction: tf,
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

// GetLedgerKeys returns up to `count` ledger keys from history buckets
Expand All @@ -59,7 +68,7 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
v.currentEntries = make(map[string]xdr.LedgerEntry)

for count > 0 {
entryChange, err := v.StateReader.Read()
entryChange, err := v.stateReader.Read()
if err != nil {
if err == io.EOF {
v.readingDone = true
Expand All @@ -70,15 +79,15 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {

entry := *entryChange.Post

if v.TransformFunction != nil {
ignore, _ := v.TransformFunction(entry)
if v.transformFunction != nil {
ignore, _ := v.transformFunction(entry)
if ignore {
continue
}
}

ledgerKey := entry.LedgerKey()
key, err := ledgerKey.MarshalBinary()
key, err := v.encodingBuffer.MarshalBinary(ledgerKey)
if err != nil {
return keys, errors.Wrap(err, "Error marshaling ledgerKey")
}
Expand All @@ -101,12 +110,13 @@ func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
// Any `StateError` returned by this method indicates invalid state!
func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
actualEntry := entry.Normalize()
actualEntryMarshaled, err := actualEntry.MarshalBinary()
actualEntryMarshaled, err := v.encodingBuffer.MarshalBinary(actualEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling actualEntry")
}

key, err := actualEntry.LedgerKey().MarshalBinary()
// safe, since we convert to string right away (causing a copy)
key, err := v.encodingBuffer.UnsafeMarshalBinary(actualEntry.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error marshaling ledgerKey")
}
Expand All @@ -122,25 +132,25 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
delete(v.currentEntries, string(key))

preTransformExpectedEntry := expectedEntry
preTransformExpectedEntryMarshaled, err := preTransformExpectedEntry.MarshalBinary()
preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling preTransformExpectedEntry")
}

if v.TransformFunction != nil {
if v.transformFunction != nil {
var ignore bool
ignore, expectedEntry = v.TransformFunction(expectedEntry)
ignore, expectedEntry = v.transformFunction(expectedEntry)
// Extra check: if entry was ignored in GetEntries, it shouldn't be
// ignored here.
if ignore {
return errors.Errorf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
)
}
}

expectedEntryMarshaled, err := expectedEntry.MarshalBinary()
expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling expectedEntry")
}
Expand Down Expand Up @@ -195,7 +205,7 @@ func (v *StateVerifier) checkUnreadEntries() error {
}

// Ignore error as StateError below is more important
entryString, _ := xdr.MarshalBase64(entry)
entryString, _ := v.encodingBuffer.MarshalBase64(&entry)
return ingest.NewStateError(errors.Errorf(
"Entries (%d) not found locally, example: %s",
len(v.currentEntries),
Expand Down
14 changes: 6 additions & 8 deletions services/horizon/internal/ingest/verify/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ type StateVerifierTestSuite struct {

func (s *StateVerifierTestSuite) SetupTest() {
s.mockStateReader = &ingest.MockChangeReader{}
s.verifier = &StateVerifier{
StateReader: s.mockStateReader,
}
s.verifier = NewStateVerifier(s.mockStateReader, nil)
}

func (s *StateVerifierTestSuite) TearDownTest() {
Expand Down Expand Up @@ -103,7 +101,7 @@ func (s *StateVerifierTestSuite) TestTransformFunction() {

s.mockStateReader.On("Read").Return(ingest.Change{}, io.EOF).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
// Leave Account ID only for accounts, ignore the rest
switch entry.Data.Type {
Expand Down Expand Up @@ -190,7 +188,7 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
Post: &accountEntry,
}, nil).Once()

s.verifier.TransformFunction =
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return false, xdr.LedgerEntry{}
}
Expand All @@ -199,16 +197,16 @@ func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() {
s.Assert().NoError(err)
s.Assert().Len(keys, 1)

// Check the behaviour of TransformFunction to code path to test.
s.verifier.TransformFunction =
// Check the behaviour of transformFunction to code path to test.
s.verifier.transformFunction =
func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) {
return true, xdr.LedgerEntry{}
}

entryBase64, err := xdr.MarshalBase64(accountEntry)
s.Assert().NoError(err)
errorMsg := fmt.Sprintf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
entryBase64,
)
err = s.verifier.Write(accountEntry)
Expand Down
10 changes: 10 additions & 0 deletions xdr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ func (e *EncodingBuffer) UnsafeMarshalHex(encodable XDREncodable) ([]byte, error
return e.otherEncodersBuf, nil
}

func (e *EncodingBuffer) MarshalBinary(encodable XDREncodable) ([]byte, error) {
xdrEncoded, err := e.UnsafeMarshalBinary(encodable)
if err != nil {
return nil, err
}
ret := make([]byte, len(xdrEncoded))
copy(ret, xdrEncoded)
return ret, nil
}

func (e *EncodingBuffer) MarshalBase64(encodable XDREncodable) (string, error) {
b, err := e.UnsafeMarshalBase64(encodable)
if err != nil {
Expand Down

0 comments on commit 9b9ddf1

Please sign in to comment.