Skip to content

Commit

Permalink
Compare exp_history_ledgers with history_ledgers
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Nov 21, 2019
1 parent 82d03fb commit 0f6c3ba
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 85 deletions.
68 changes: 51 additions & 17 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package history
import (
"encoding/hex"
"fmt"
"reflect"
"time"

sq "github.com/Masterminds/squirrel"
Expand All @@ -13,15 +14,26 @@ import (
"github.com/stellar/go/xdr"
)

// LegacyIngestionVersion reflects the latest version of the non-experimental ingestion
// algorithm. As rows are ingested into the horizon database, this version is
// used to tag them. In the future, any breaking changes introduced by a
// developer should be accompanied by an increase in this value.
//
// Scripts, that have yet to be ported to this codebase can then be leveraged
// to re-ingest old data with the new algorithm, providing a seamless
// transition when the ingested data's structure changes.
const LegacyIngestionVersion = 16
const (
// CurrentExpIngestVersion reflects the latest version of the experimental
// ingestion algorithm. This value is stored in KV store and is used to decide
// if there's a need to reprocess the ledger state or reingest data.
//
// Version history:
// - 1: Initial version
// - 2: Added the orderbook, offers processors and distributed ingestion.
// - 3: Fixed a bug that could potentialy result in invalid state
// (#1722). Update the version to clear the state.
// - 4: Fixed a bug in AccountSignersChanged method.
// - 5: Added trust lines.
// - 6: Added accounts and accounts data.
// - 7: Fixes a bug in AccountSignersChanged method.
// - 8: Fixes AccountSigners processor to remove preauth tx signer
// when preauth tx is failed.
// - 9: Fixes a bug in asset stats processor that counted unauthorized
// trustlines.
CurrentExpIngestVersion = 9
)

// LedgerBySequence loads the single ledger at `seq` into `dest`
func (q *Q) LedgerBySequence(dest interface{}, seq int32) error {
Expand All @@ -32,8 +44,8 @@ func (q *Q) LedgerBySequence(dest interface{}, seq int32) error {
return q.Get(dest, sql)
}

// ExpLedgerBySequence returns a row from the exp_history_ledgers table
func (q *Q) ExpLedgerBySequence(seq int32) (Ledger, error) {
// expLedgerBySequence returns a row from the exp_history_ledgers table
func (q *Q) expLedgerBySequence(seq int32) (Ledger, error) {
sql := selectLedgerFields.
From("exp_history_ledgers hl").
Limit(1).
Expand Down Expand Up @@ -109,8 +121,7 @@ func (q *LedgersQ) Select(dest interface{}) error {

// QExpLedgers defines experimental ingestion ledger related queries.
type QExpLedgers interface {
ExpLedgerBySequence(seq int32) (Ledger, error)

CheckExpLeger(seq int32) (bool, error)
InsertExpLedger(
ledger xdr.LedgerHeaderHistoryEntry,
successTxsCount int,
Expand All @@ -119,6 +130,32 @@ type QExpLedgers interface {
) (int64, error)
}

// CheckExpLeger checks that the ledger in exp_history_ledgers
// matches the one in history_ledgers for a given sequence number
func (q *Q) CheckExpLeger(seq int32) (bool, error) {
expLedger, err := q.expLedgerBySequence(seq)
if err != nil {
return false, err
}

var ledger Ledger
err = q.LedgerBySequence(&ledger, seq)
if err != nil {
return false, err
}

// ignore importer version created time, and updated time
expLedger.ImporterVersion = ledger.ImporterVersion
expLedger.CreatedAt = ledger.CreatedAt
expLedger.UpdatedAt = ledger.UpdatedAt

// compare ClosedAt separately because reflect.DeepEqual does not handle time.Time
expClosedAt := expLedger.ClosedAt
expLedger.ClosedAt = ledger.ClosedAt

return expClosedAt.Equal(ledger.ClosedAt) && reflect.DeepEqual(expLedger, ledger), nil
}

// InsertExpLedger creates a row in the exp_history_ledgers table.
// Returns number of rows affected and error.
func (q *Q) InsertExpLedger(
Expand Down Expand Up @@ -158,10 +195,7 @@ func ledgerHeaderToMap(
}
closeTime := time.Unix(int64(ledger.Header.ScpValue.CloseTime), 0).UTC()
return map[string]interface{}{
// when it comes to ingesting ledgers, the experimental ingestion system is compatible with
// the legacy ingestion system which is why we use the same importer version as the legacy
// ingestion system
"importer_version": LegacyIngestionVersion,
"importer_version": CurrentExpIngestVersion,
"id": toid.New(int32(ledger.Header.LedgerSeq), 0, 0).ToInt64(),
"sequence": ledger.Header.LedgerSeq,
"ledger_hash": hex.EncodeToString(ledger.Hash[:]),
Expand Down
124 changes: 117 additions & 7 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/services/horizon/internal/toid"
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestInsertLedger(t *testing.T) {
LedgerHash: "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true),
TotalOrderID: TotalOrderID{toid.New(int32(69859), 0, 0).ToInt64()},
ImporterVersion: LegacyIngestionVersion,
ImporterVersion: CurrentExpIngestVersion,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestInsertLedger(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Equal(rowsAffected, int64(1))

ledgerFromDB, err := q.ExpLedgerBySequence(69859)
ledgerFromDB, err := q.expLedgerBySequence(69859)
tt.Assert.NoError(err)

expectedLedger.CreatedAt = ledgerFromDB.CreatedAt
Expand All @@ -124,10 +125,119 @@ func TestInsertLedger(t *testing.T) {
tt.Assert.True(expectedLedger.ClosedAt.Equal(ledgerFromDB.ClosedAt))
expectedLedger.ClosedAt = ledgerFromDB.ClosedAt

tt.Assert.Equal(*expectedLedger.SuccessfulTransactionCount, *ledgerFromDB.SuccessfulTransactionCount)
tt.Assert.Equal(*expectedLedger.FailedTransactionCount, *ledgerFromDB.FailedTransactionCount)
expectedLedger.SuccessfulTransactionCount = ledgerFromDB.SuccessfulTransactionCount
expectedLedger.FailedTransactionCount = ledgerFromDB.FailedTransactionCount

tt.Assert.Equal(expectedLedger, ledgerFromDB)
}

func ledgerToMap(ledger Ledger) map[string]interface{} {
return map[string]interface{}{
"importer_version": ledger.ImporterVersion,
"id": ledger.TotalOrderID.ID,
"sequence": ledger.Sequence,
"ledger_hash": ledger.LedgerHash,
"previous_ledger_hash": ledger.PreviousLedgerHash,
"total_coins": ledger.TotalCoins,
"fee_pool": ledger.FeePool,
"base_fee": ledger.BaseFee,
"base_reserve": ledger.BaseReserve,
"max_tx_set_size": ledger.MaxTxSetSize,
"closed_at": ledger.ClosedAt,
"created_at": ledger.CreatedAt,
"updated_at": ledger.UpdatedAt,
"transaction_count": ledger.SuccessfulTransactionCount,
"successful_transaction_count": ledger.SuccessfulTransactionCount,
"failed_transaction_count": ledger.FailedTransactionCount,
"operation_count": ledger.OperationCount,
"protocol_version": ledger.ProtocolVersion,
"ledger_header": ledger.LedgerHeaderXDR,
}
}

func TestCheckExpLedger(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

ledger := Ledger{
Sequence: 69859,
LedgerHash: "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true),
TotalOrderID: TotalOrderID{toid.New(int32(69859), 0, 0).ToInt64()},
ImporterVersion: 321,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
OperationCount: 23,
TotalCoins: 23451,
FeePool: 213,
BaseReserve: 687,
MaxTxSetSize: 345,
ProtocolVersion: 12,
BaseFee: 100,
ClosedAt: time.Now().UTC().Truncate(time.Second),
LedgerHeaderXDR: null.NewString("temp", true),
}
*ledger.SuccessfulTransactionCount = 12
*ledger.FailedTransactionCount = 3

_, err := q.CheckExpLeger(ledger.Sequence)
tt.Assert.Equal(err, sql.ErrNoRows)

insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger))
_, err = q.Exec(insertSQL)
tt.Assert.NoError(err)

_, err = q.CheckExpLeger(ledger.Sequence)
tt.Assert.Equal(err, sql.ErrNoRows)

ledger.CreatedAt = time.Now()
ledger.UpdatedAt = time.Now()
ledger.ImporterVersion = 123

insertSQL = sq.Insert("history_ledgers").SetMap(ledgerToMap(ledger))
_, err = q.Exec(insertSQL)
tt.Assert.NoError(err)

valid, err := q.CheckExpLeger(ledger.Sequence)
tt.Assert.NoError(err)
tt.Assert.True(valid)

for fieldName, value := range map[string]interface{}{
"closed_at": time.Now().Add(time.Minute).UTC().Truncate(time.Second),
"ledger_hash": "hash",
"previous_ledger_hash": "previous",
"id": 999,
"total_coins": 9999,
"fee_pool": 9999,
"base_fee": 9999,
"base_reserve": 9999,
"max_tx_set_size": 9999,
"transaction_count": 9999,
"successful_transaction_count": 9999,
"failed_transaction_count": 9999,
"operation_count": 9999,
"protocol_version": 9999,
"ledger_header": "ledger header",
} {
updateSQL := sq.Update("history_ledgers").
Set(fieldName, value).
Where("sequence = ?", ledger.Sequence)
_, err = q.Exec(updateSQL)
tt.Assert.NoError(err)

valid, err = q.CheckExpLeger(ledger.Sequence)
tt.Assert.NoError(err)
tt.Assert.False(valid)

_, err = q.Exec(sq.Delete("history_ledgers").Where("sequence = ?", ledger.Sequence))
tt.Assert.NoError(err)

insertSQL = sq.Insert("history_ledgers").SetMap(ledgerToMap(ledger))
_, err = q.Exec(insertSQL)
tt.Assert.NoError(err)

valid, err := q.CheckExpLeger(ledger.Sequence)
tt.Assert.NoError(err)
tt.Assert.True(valid)
}
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/mock_q_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (m *MockQLedgers) InsertExpLedger(
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQLedgers) ExpLedgerBySequence(seq int32) (Ledger, error) {
func (m *MockQLedgers) CheckExpLeger(seq int32) (bool, error) {
a := m.Called(seq)
return a.Get(0).(Ledger), a.Error(1)
return a.Get(0).(bool), a.Error(1)
}
23 changes: 1 addition & 22 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,6 @@ import (
"github.com/stellar/go/xdr"
)

const (
// CurrentVersion reflects the latest version of the ingestion
// algorithm. This value is stored in KV store and is used to decide
// if there's a need to reprocess the ledger state or reingest data.
//
// Version history:
// - 1: Initial version
// - 2: Added the orderbook, offers processors and distributed ingestion.
// - 3: Fixed a bug that could potentialy result in invalid state
// (#1722). Update the version to clear the state.
// - 4: Fixed a bug in AccountSignersChanged method.
// - 5: Added trust lines.
// - 6: Added accounts and accounts data.
// - 7: Fixes a bug in AccountSignersChanged method.
// - 8: Fixes AccountSigners processor to remove preauth tx signer
// when preauth tx is failed.
// - 9: Fixes a bug in asset stats processor that counted unauthorized
// trustlines.
CurrentVersion = 9
)

var log = ilog.DefaultLogger.WithField("service", "expingest")

type Config struct {
Expand Down Expand Up @@ -239,7 +218,7 @@ func (s *System) Run() {
return errors.Wrap(err, "Error getting exp ingest version")
}

if ingestVersion != CurrentVersion || lastIngestedLedger == 0 {
if ingestVersion != history.CurrentExpIngestVersion || lastIngestedLedger == 0 {
// This block is either starting from empty state or ingestion
// version upgrade.
// This will always run on a single instance due to the fact that
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/expingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package expingest
import (
"testing"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stretchr/testify/assert"
)

func TestCheckVerifyStateVersion(t *testing.T) {
assert.Equal(
t,
CurrentVersion,
history.CurrentExpIngestVersion,
stateVerifierExpectedIngestionVersion,
"State verifier is outdated, update it, then update stateVerifierExpectedIngestionVersion value",
)
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/expingest/pipeline_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestPostProcessingHook(t *testing.T) {
// check that the ingest version and the ingest sequence was updated
version, err := historyQ.GetExpIngestVersion()
tt.Assert.NoError(err)
tt.Assert.Equal(version, CurrentVersion)
tt.Assert.Equal(version, history.CurrentExpIngestVersion)
seq, err := historyQ.GetLastLedgerExpIngestNonBlocking()
tt.Assert.NoError(err)
tt.Assert.Equal(seq, testCase.pipelineLedger)
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/expingest/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func postProcessingHook(
return errors.Wrap(err, "Error updating last ingested ledger")
}

if err = historyQ.UpdateExpIngestVersion(CurrentVersion); err != nil {
if err = historyQ.UpdateExpIngestVersion(history.CurrentExpIngestVersion); err != nil {
return errors.Wrap(err, "Error updating expingest version")
}

Expand Down
Loading

0 comments on commit 0f6c3ba

Please sign in to comment.