Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LedgerEntryChangeCache #2004

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions exp/ingest/errors/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package errors

// StateError are errors indicating invalid state. Type is used to differentiate
// between network, i/o, marshaling, bad usage etc. errors and actual state errors.
// You can use type assertion or type switch to check for type.
type StateError struct {
error
}

// NewStateError creates a new StateError.
func NewStateError(err error) StateError {
return StateError{err}
}
217 changes: 217 additions & 0 deletions exp/ingest/io/ledger_entry_change_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package io

import (
"sync"

ingesterrors "github.com/stellar/go/exp/ingest/errors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// LedgerEntryChangeCache is a cache of ledger entry changes that squashes all
bartekn marked this conversation as resolved.
Show resolved Hide resolved
// changes within a single ledger. By doing this, it decreases number of DB
// queries sent to a DB to update the current state of the ledger.
// It has integrity checks built in so ex. removing an account that was
// previously removed returns an error. In such case verify.StateError is
// returned.
//
// It applies changes to the cache using the following algorithm:
//
// 1. If the change is CREATED it checks if any change connected to given entry
// is already in the cache. If not, it adds CREATED change. Otherwise, if
// existing change is:
// a. CREATED it returns error because we can't add an entry that already
// exists.
// b. UPDATED it returns error because we can't add an entry that already
// exists.
// c. REMOVED it means that due to previous transitions we want to remove
// this from a DB what means that it already exists in a DB so we need to
// update the type of change to UPDATED.
// 2. If the change is UPDATE it checks if any change connected to given entry
// is already in the cache. If not, it adds UPDATE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB so we need to
// update the entry but stay with CREATED type.
// b. UPDATED we simply update it with the new value.
// c. REMOVED it means that at this point in the ledger the entry is removed
// so updating it returns an error.
// 3. If the change is REMOVE it checks if any change connected to given entry
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB. If it was
// created and removed in the same ledger it's a noop so we remove entry
// from the cache.
// b. UPDATED we simply update it to be a REMOVE change because the UPDATE
// change means the entry exists in a DB.
// c. REMOVED it returns error because we can't remove an entry that was
// already removed.
type LedgerEntryChangeCache struct {
// ledger key => Change
cache map[string]Change
mutex sync.Mutex
}

// NewLedgerEntryChangeCache returns a new LedgerEntryChangeCache.
func NewLedgerEntryChangeCache() *LedgerEntryChangeCache {
return &LedgerEntryChangeCache{
cache: make(map[string]Change),
}
}

// AddChange adds a change to LedgerEntryChangeCache. All changes are stored
// in memory. To get the final, squashed changes call GetChanges.
//
// Please note that the current ledger capacity in pubnet (max 1000 ops/ledger)
// makes LedgerEntryChangeCache safe to use in terms of memory usage. If the
// cache takes too much memory, you apply changes returned by GetChanges and
// create a new LedgerEntryChangeCache object to continue ingestion.
func (c *LedgerEntryChangeCache) AddChange(change Change) error {
c.mutex.Lock()
defer c.mutex.Unlock()

switch {
case change.Pre == nil && change.Post != nil:
return c.addCreatedChange(change)
case change.Pre != nil && change.Post != nil:
return c.addUpdatedChange(change)
case change.Pre != nil && change.Post == nil:
return c.addRemovedChange(change)
default:
return errors.New("Unknown entry change state")
}
}

// addCreatedChange adds a change to the cache, but returns an error if create
// change is unexpected.
func (c *LedgerEntryChangeCache) addCreatedChange(change Change) error {
ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
return ingesterrors.NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
ledgerKeyString,
))
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return ingesterrors.NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
ledgerKeyString,
))
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
// If existing type is removed it means that this entry does exist
// in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre,
Post: change.Post,
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}

return nil
}

// addUpdatedChange adds a change to the cache, but returns an error if update
// change is unexpected.
func (c *LedgerEntryChangeCache) addUpdatedChange(change Change) error {
ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this entry does not
// exist in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre, // = nil
Post: change.Post,
}
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: change.Post.LedgerKey().Type,
Pre: existingChange.Pre,
Post: change.Post,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingesterrors.NewStateError(errors.Errorf(
"can't update an entry that was previously removed (ledger key = %s)",
ledgerKeyString,
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
}

return nil
}

// addRemovedChange adds a change to the cache, but returns an error if remove
// change is unexpected.
func (c *LedgerEntryChangeCache) addRemovedChange(change Change) error {
ledgerKeyString, err := change.Pre.LedgerKey().MarshalBinaryBase64()
if err != nil {
return errors.Wrap(err, "Error MarshalBinaryBase64")
}

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this will be no op.
// Entry was created and is now removed in a single ledger.
delete(c.cache, ledgerKeyString)
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: change.Pre.LedgerKey().Type,
Pre: existingChange.Pre,
Post: nil,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingesterrors.NewStateError(errors.Errorf(
"can't remove an entry that was previously removed (ledger key = %s)",
ledgerKeyString,
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
}

return nil
}

// GetChanges returns a slice of Change's in the cache. The of changes is random
bartekn marked this conversation as resolved.
Show resolved Hide resolved
// but each change is connected to a separate entry.
func (c *LedgerEntryChangeCache) GetChanges() []Change {
c.mutex.Lock()
defer c.mutex.Unlock()

changes := make([]Change, 0, len(c.cache))

for _, entryChange := range c.cache {
changes = append(changes, entryChange)
}

return changes
}
Loading