From 54e6d9bc7136abb26651116ad395211acb3aba0d Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 17 Jan 2022 19:37:26 -0800 Subject: [PATCH 1/3] #3268: enabled integration test db to be deleted at end of all tests in db_test.go --- services/horizon/cmd/db.go | 23 +++++++++-------- services/horizon/internal/db2/history/main.go | 1 + services/horizon/internal/ingest/main.go | 1 + services/horizon/internal/ingest/main_test.go | 5 ++++ services/horizon/internal/ingest/parallel.go | 13 ++++++++++ .../horizon/internal/integration/db_test.go | 25 +++++++------------ .../internal/test/integration/integration.go | 11 +++++--- support/db/dbtest/db.go | 3 +++ 8 files changed, 52 insertions(+), 30 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index d4f88b9aa7..3919414a1f 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -374,17 +374,14 @@ var dbFillGapsCmd = &cobra.Command{ } func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error { + var err error + if reingestForce && parallelWorkers > 1 { return errors.New("--force is incompatible with --parallel-workers > 1") } - horizonSession, err := db.Open("postgres", config.DatabaseURL) - if err != nil { - return fmt.Errorf("cannot open Horizon DB: %v", err) - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, - HistorySession: horizonSession, HistoryArchiveURL: config.HistoryArchiveURLs[0], CheckpointFrequency: config.CheckpointFrequency, MaxReingestRetries: int(retries), @@ -398,15 +395,18 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, StellarCoreURL: config.StellarCoreURL, } - if !ingestConfig.EnableCaptiveCore { + if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + + if !config.EnableCaptiveCoreIngestion { if config.StellarCoreDatabaseURL == "" { return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName) } - coreSession, dbErr := db.Open("postgres", config.StellarCoreDatabaseURL) - if dbErr != nil { - return fmt.Errorf("cannot open Core DB: %v", dbErr) + if ingestConfig.CoreSession, err = db.Open("postgres", config.StellarCoreDatabaseURL); err != nil { + ingestConfig.HistorySession.Close() + return fmt.Errorf("cannot open Core DB: %v", err) } - ingestConfig.CoreSession = coreSession } if parallelWorkers > 1 { @@ -425,6 +425,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, if systemErr != nil { return systemErr } + defer system.Shutdown() err = system.ReingestRange(ledgerRanges, reingestForce) if err != nil { @@ -477,6 +478,7 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) { if err != nil { return nil, err } + defer horizonSession.Close() q := &history.Q{horizonSession} return q.GetLedgerGaps(context.Background()) } @@ -486,6 +488,7 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history if err != nil { return nil, err } + defer horizonSession.Close() q := &history.Q{horizonSession} return q.GetLedgerGapsInRange(context.Background(), start, end) } diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 2ebf2d812d..82c3862929 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -265,6 +265,7 @@ type IngestionQ interface { BeginTx(*sql.TxOptions) error Commit() error CloneIngestionQ() IngestionQ + Close() error Rollback() error GetTx() *sqlx.Tx GetIngestVersion(context.Context) (int, error) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 53abbfa9e8..5b6fed4782 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -693,6 +693,7 @@ func (s *system) Shutdown() { s.cancel() // wait for ingestion state machine to terminate s.wg.Wait() + s.historyQ.Close() if err := s.ledgerBackend.Close(); err != nil { log.WithError(err).Info("could not close ledger backend") } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 63c0f19f87..91cfbba331 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -265,6 +265,11 @@ func (m *mockDBQ) Commit() error { return args.Error(0) } +func (m *mockDBQ) Close() error { + args := m.Called() + return args.Error(0) +} + func (m *mockDBQ) Rollback() error { args := m.Called() return args.Error(0) diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 547098ed0d..b3c163689d 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -47,6 +47,16 @@ func newParallelSystems(config Config, workerCount uint, systemFactory func(Conf }, nil } +func (ps *ParallelSystems) Shutdown() { + log.Info("Shutting down parallel ingestion system...") + if ps.config.HistorySession != nil { + ps.config.HistorySession.Close() + } + if ps.config.CoreSession != nil { + ps.config.CoreSession.Close() + } +} + func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError { for { @@ -128,6 +138,9 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat // the user needs to start again to prevent the gaps. lowestRangeErr *rangeError ) + + defer ps.Shutdown() + if err := validateRanges(ledgerRanges); err != nil { return err } diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 1d0a391327..d3a4b146be 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -3,6 +3,7 @@ package integration import ( "context" "fmt" + "os" "path/filepath" "strconv" "testing" @@ -153,21 +154,7 @@ func TestReingestDB(t *testing.T) { itest, reachedLedger := initializeDBIntegrationTest(t) tt := assert.New(t) - // Create a fresh Horizon database - newDB := dbtest.Postgres(t) - // TODO: Unfortunately Horizon's ingestion System leaves open sessions behind,leading to - // a "database is being accessed by other users" error when trying to drop it - // defer newDB.Close() - freshHorizonPostgresURL := newDB.DSN horizonConfig := itest.GetHorizonConfig() - horizonConfig.DatabaseURL = freshHorizonPostgresURL - // Initialize the DB schema - dbConn, err := db.Open("postgres", freshHorizonPostgresURL) - tt.NoError(err) - defer dbConn.Close() - _, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0) - tt.NoError(err) - t.Run("validate parallel range", func(t *testing.T) { horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", @@ -250,6 +237,7 @@ func command(horizonConfig horizon.Config, args ...string) []string { } func TestFillGaps(t *testing.T) { + os.Setenv("HORIZON_INTEGRATION_TESTS", "true") itest, reachedLedger := initializeDBIntegrationTest(t) tt := assert.New(t) @@ -263,7 +251,13 @@ func TestFillGaps(t *testing.T) { horizonConfig.DatabaseURL = freshHorizonPostgresURL // Initialize the DB schema dbConn, err := db.Open("postgres", freshHorizonPostgresURL) - defer dbConn.Close() + tt.NoError(err) + historyQ := history.Q{dbConn} + defer func() { + historyQ.Close() + newDB.Close() + }() + _, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0) tt.NoError(err) @@ -307,7 +301,6 @@ func TestFillGaps(t *testing.T) { // subprocesses to conflict. itest.StopHorizon() - historyQ := history.Q{dbConn} var oldestLedger, latestLedger int64 tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger)) tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger)) diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index e5a265da97..f97ea2766d 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -262,10 +262,8 @@ func (i *Test) StartHorizon() error { if horizonPostgresURL == "" { postgres := dbtest.Postgres(i.t) i.shutdownCalls = append(i.shutdownCalls, func() { - // FIXME: Unfortunately, Horizon leaves open sessions behind, - // leading to a "database is being accessed by other users" - // error when trying to drop it. - // postgres.Close() + i.StopHorizon() + postgres.Close() }) horizonPostgresURL = postgres.DSN } @@ -471,6 +469,11 @@ func (i *Test) Horizon() *horizon.App { // StopHorizon shuts down the running Horizon process func (i *Test) StopHorizon() { + if i.app == nil { + // horizon has already been stopped + return + } + i.app.CloseDB() i.app.Close() diff --git a/support/db/dbtest/db.go b/support/db/dbtest/db.go index bda67c49e8..6ab32b6926 100644 --- a/support/db/dbtest/db.go +++ b/support/db/dbtest/db.go @@ -132,6 +132,9 @@ func Postgres(t testing.TB) *DB { result.DSN = fmt.Sprintf("postgres://%s@localhost/%s?sslmode=disable&timezone=UTC", pgUser, result.dbName) result.closer = func() { + // pg_terminate_backend is a best effort, it does not gaurantee that it can close any lingering connections + // it sends a quit signal to each remaining connection in the db + execStatement(t, pgUser, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"+pq.QuoteIdentifier(result.dbName)+"';") execStatement(t, pgUser, "DROP DATABASE "+pq.QuoteIdentifier(result.dbName)) } From 0fd03ce12fe69e5e507f69dbb47a06e5218a3f91 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 18 Jan 2022 10:10:10 -0800 Subject: [PATCH 2/3] #3268: removed temp debugging code --- services/horizon/internal/integration/db_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index d3a4b146be..411c4cf4a3 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -3,7 +3,6 @@ package integration import ( "context" "fmt" - "os" "path/filepath" "strconv" "testing" @@ -237,7 +236,6 @@ func command(horizonConfig horizon.Config, args ...string) []string { } func TestFillGaps(t *testing.T) { - os.Setenv("HORIZON_INTEGRATION_TESTS", "true") itest, reachedLedger := initializeDBIntegrationTest(t) tt := assert.New(t) From b16b6846a7b0d3b7d2328fba7b63011aeb864e7b Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 10 Feb 2022 13:59:34 -0800 Subject: [PATCH 3/3] #3268: removed duplicate invocation of CloseDB, it is already triggered within Close() --- services/horizon/internal/test/integration/integration.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index f97ea2766d..f913c58086 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -474,7 +474,6 @@ func (i *Test) StopHorizon() { return } - i.app.CloseDB() i.app.Close() // Wait for Horizon to shut down completely.