Skip to content

Commit

Permalink
services/horizon/internal/ingest: Prevent redundant and concurrent st…
Browse files Browse the repository at this point in the history
…ate verification runs (#4821)

Disable concurrent state verification and add configuration to specifiy how often state verificaition runs and a timeout for capping the duration of a state verification run.
  • Loading branch information
tamirms authored Apr 6, 2023
1 parent 8e1b630 commit 6c5193d
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 81 deletions.
8 changes: 8 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ type Config struct {
// IngestDisableStateVerification disables state verification
// `System.verifyState()` when set to `true`.
IngestDisableStateVerification bool
// IngestStateVerificationCheckpointFrequency configures how often state verification is performed.
// If IngestStateVerificationCheckpointFrequency is set to 1 state verification is run on every checkpoint,
// If IngestStateVerificationCheckpointFrequency is set to 2 state verification is run on every second checkpoint,
// etc...
IngestStateVerificationCheckpointFrequency uint
// IngestStateVerificationTimeout configures a timeout on the state verification routine.
// If IngestStateVerificationTimeout is set to 0 the timeout is disabled.
IngestStateVerificationTimeout time.Duration
// IngestEnableExtendedLogLedgerStats enables extended ledger stats in
// logging.
IngestEnableExtendedLogLedgerStats bool
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type IngestionQ interface {
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
TryStateVerificationLock(ctx context.Context) (bool, error)
}

// QAccounts defines account related queries.
Expand Down
38 changes: 38 additions & 0 deletions services/horizon/internal/db2/history/verify_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
)

// stateVerificationLockId is the objid for the advisory lock acquired during
// state verification. The value is arbitrary. The only requirement is that
// all ingesting nodes use the same value which is why it's hard coded here.
const stateVerificationLockId = 73897213

// TryStateVerificationLock attempts to acquire the state verification lock
// which gives the ingesting node exclusive access to perform state verification.
// TryStateVerificationLock returns true if the lock was acquired or false if the
// lock could not be acquired because it is held by another node.
func (q *Q) TryStateVerificationLock(ctx context.Context) (bool, error) {
if tx := q.GetTx(); tx == nil {
return false, errors.New("cannot be called outside of a transaction")
}

var acquired []bool
err := q.SelectRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.AdvisoryLockQueryType),
&acquired,
"SELECT pg_try_advisory_xact_lock(?)",
stateVerificationLockId,
)
if err != nil {
return false, errors.Wrap(err, "error acquiring advisory lock for state verification")
}
if len(acquired) != 1 {
return false, errors.Wrap(err, "invalid response from advisory lock")
}
return acquired[0], nil
}
47 changes: 47 additions & 0 deletions services/horizon/internal/db2/history/verify_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package history

import (
"context"
"database/sql"
"testing"

"github.com/stellar/go/services/horizon/internal/test"
)

func TestTryStateVerificationLock(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}
otherQ := &Q{q.Clone()}

_, err := q.TryStateVerificationLock(context.Background())
tt.Assert.EqualError(err, "cannot be called outside of a transaction")

tt.Assert.NoError(q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
ok, err := q.TryStateVerificationLock(context.Background())
tt.Assert.NoError(err)
tt.Assert.True(ok)

// lock is already held by q so we will not succeed
tt.Assert.NoError(otherQ.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}))
ok, err = otherQ.TryStateVerificationLock(context.Background())
tt.Assert.NoError(err)
tt.Assert.False(ok)

// when q is rolled back that releases the lock
tt.Assert.NoError(q.Rollback())

// now otherQ is able to acquire the lock
ok, err = otherQ.TryStateVerificationLock(context.Background())
tt.Assert.NoError(err)
tt.Assert.True(ok)

tt.Assert.NoError(otherQ.Rollback())
}
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ ALTER TABLE history_transactions DROP extra_signers;
ALTER TABLE accounts DROP sequence_ledger;
ALTER TABLE accounts DROP sequence_time;

-- we cannot restore the original type of varying(64) because there might be some
-- rows with signers that are too long.
ALTER TABLE accounts_signers
ALTER COLUMN signer TYPE character varying(64);
ALTER COLUMN signer TYPE character varying(165);
18 changes: 18 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,24 @@ func Flags() (*Config, support.ConfigOptions) {
FlagDefault: false,
Usage: "ingestion system runs a verification routing to compare state in local database with history buckets, this can be disabled however it's not recommended",
},
&support.ConfigOption{
Name: "ingest-state-verification-checkpoint-frequency",
ConfigKey: &config.IngestStateVerificationCheckpointFrequency,
OptType: types.Uint,
FlagDefault: uint(1),
Usage: "the frequency in units per checkpoint for how often state verification is executed. " +
"A value of 1 implies running state verification on every checkpoint. " +
"A value of 2 implies running state verification on every second checkpoint.",
},
&support.ConfigOption{
Name: "ingest-state-verification-timeout",
ConfigKey: &config.IngestStateVerificationTimeout,
OptType: types.Int,
FlagDefault: 0,
CustomSetValue: support.SetDurationMinutes,
Usage: "defines an upper bound in minutes for on how long state verification is allowed to run. " +
"A value of 0 disables the timeout.",
},
&support.ConfigOption{
Name: "ingest-enable-extended-log-ledger-stats",
ConfigKey: &config.IngestEnableExtendedLogLedgerStats,
Expand Down
2 changes: 0 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ func (r resumeState) run(s *system) (transition, error) {
log.WithError(err).Warn("error updating stellar-core cursor")
}

s.maybeVerifyState(ingestLedger)

// resume immediately so Captive-Core catchup is not slowed down
return resumeImmediately(lastIngestedLedger), nil
}
Expand Down
22 changes: 18 additions & 4 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ type Config struct {
ReingestRetryBackoffSeconds int

// The checkpoint frequency will be 64 unless you are using an exotic test setup.
CheckpointFrequency uint32
CheckpointFrequency uint32
StateVerificationCheckpointFrequency uint32
StateVerificationTimeout time.Duration

RoundingSlippageFilter int

Expand Down Expand Up @@ -223,7 +225,7 @@ type system struct {
stateVerificationRunning bool
disableStateVerification bool

checkpointManager historyarchive.CheckpointManager
runStateVerificationOnLedger func(uint32) bool

reapOffsets map[string]int64
}
Expand Down Expand Up @@ -306,13 +308,25 @@ func NewSystem(config Config) (System, error) {
historyAdapter: historyAdapter,
filters: filters,
},
checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency),
runStateVerificationOnLedger: ledgerEligibleForStateVerification(
config.CheckpointFrequency,
config.StateVerificationCheckpointFrequency,
),
}

system.initMetrics()
return system, nil
}

func ledgerEligibleForStateVerification(checkpointFrequency, stateVerificationFrequency uint32) func(ledger uint32) bool {
stateVerificationCheckpointManager := historyarchive.NewCheckpointManager(
checkpointFrequency * stateVerificationFrequency,
)
return func(ledger uint32) bool {
return stateVerificationCheckpointManager.IsCheckpoint(ledger)
}
}

func (s *system) initMetrics() {
s.metrics.MaxSupportedProtocolVersion = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "max_supported_protocol_version",
Expand Down Expand Up @@ -679,7 +693,7 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
// Run verification routine only when...
if !stateInvalid && // state has not been proved to be invalid...
!s.disableStateVerification && // state verification is not disabled...
s.checkpointManager.IsCheckpoint(lastIngestedLedger) { // it's a checkpoint ledger.
s.runStateVerificationOnLedger(lastIngestedLedger) { // it's a ledger eligible for state verification.
s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand Down
27 changes: 20 additions & 7 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -79,6 +78,15 @@ func TestCheckVerifyStateVersion(t *testing.T) {
)
}

func TestLedgerEligibleForStateVerification(t *testing.T) {
checker := ledgerEligibleForStateVerification(64, 3)
for ledger := uint32(1); ledger < 64*6; ledger++ {
run := checker(ledger)
expected := (ledger+1)%(64*3) == 0
assert.Equal(t, expected, run)
}
}

func TestNewSystem(t *testing.T) {
config := Config{
CoreSession: &db.Session{DB: &sqlx.DB{}},
Expand Down Expand Up @@ -165,9 +173,9 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.
func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) {
historyQ := &mockDBQ{}
system := &system{
historyQ: historyQ,
ctx: context.Background(),
checkpointManager: historyarchive.NewCheckpointManager(64),
historyQ: historyQ,
ctx: context.Background(),
runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1),
}

var out bytes.Buffer
Expand Down Expand Up @@ -200,9 +208,9 @@ func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) {
func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) {
historyQ := &mockDBQ{}
system := &system{
historyQ: historyQ,
ctx: context.Background(),
checkpointManager: historyarchive.NewCheckpointManager(64),
historyQ: historyQ,
ctx: context.Background(),
runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1),
}

var out bytes.Buffer
Expand Down Expand Up @@ -284,6 +292,11 @@ func (m *mockDBQ) Rollback() error {
return args.Error(0)
}

func (m *mockDBQ) TryStateVerificationLock(ctx context.Context) (bool, error) {
args := m.Called(ctx)
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) GetTx() *sqlx.Tx {
args := m.Called()
if args.Get(0) == nil {
Expand Down
18 changes: 7 additions & 11 deletions services/horizon/internal/ingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -38,13 +37,13 @@ func (s *ResumeTestTestSuite) SetupTest() {
s.runner = &mockProcessorsRunner{}
s.stellarCoreClient = &mockStellarCoreClient{}
s.system = &system{
ctx: s.ctx,
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
runner: s.runner,
ledgerBackend: s.ledgerBackend,
stellarCoreClient: s.stellarCoreClient,
checkpointManager: historyarchive.NewCheckpointManager(64),
ctx: s.ctx,
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
runner: s.runner,
ledgerBackend: s.ledgerBackend,
stellarCoreClient: s.stellarCoreClient,
runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1),
}
s.system.initMetrics()

Expand Down Expand Up @@ -311,9 +310,6 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() {
int32(101),
).Return(errors.New("my error")).Once()

// Skips state verification but ensures maybeVerifyState called
s.historyQ.On("GetExpStateInvalid", s.ctx).Return(true, nil).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 99}.run(s.system)
s.Assert().NoError(err)
s.Assert().Equal(
Expand Down
Loading

0 comments on commit 6c5193d

Please sign in to comment.