Skip to content

Commit

Permalink
stellar#3268: enabled integration test db to be deleted at end of all…
Browse files Browse the repository at this point in the history
… tests in db_test.go
  • Loading branch information
sreuland committed Jan 18, 2022
1 parent 483e057 commit 54e6d9b
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 30 deletions.
23 changes: 13 additions & 10 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,14 @@ var dbFillGapsCmd = &cobra.Command{
}

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
var err error

if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
CheckpointFrequency: config.CheckpointFrequency,
MaxReingestRetries: int(retries),
Expand All @@ -398,15 +395,18 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
StellarCoreURL: config.StellarCoreURL,
}

if !ingestConfig.EnableCaptiveCore {
if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

if !config.EnableCaptiveCoreIngestion {
if config.StellarCoreDatabaseURL == "" {
return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName)
}
coreSession, dbErr := db.Open("postgres", config.StellarCoreDatabaseURL)
if dbErr != nil {
return fmt.Errorf("cannot open Core DB: %v", dbErr)
if ingestConfig.CoreSession, err = db.Open("postgres", config.StellarCoreDatabaseURL); err != nil {
ingestConfig.HistorySession.Close()
return fmt.Errorf("cannot open Core DB: %v", err)
}
ingestConfig.CoreSession = coreSession
}

if parallelWorkers > 1 {
Expand All @@ -425,6 +425,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
if systemErr != nil {
return systemErr
}
defer system.Shutdown()

err = system.ReingestRange(ledgerRanges, reingestForce)
if err != nil {
Expand Down Expand Up @@ -477,6 +478,7 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) {
if err != nil {
return nil, err
}
defer horizonSession.Close()
q := &history.Q{horizonSession}
return q.GetLedgerGaps(context.Background())
}
Expand All @@ -486,6 +488,7 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history
if err != nil {
return nil, err
}
defer horizonSession.Close()
q := &history.Q{horizonSession}
return q.GetLedgerGapsInRange(context.Background(), start, end)
}
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ type IngestionQ interface {
BeginTx(*sql.TxOptions) error
Commit() error
CloneIngestionQ() IngestionQ
Close() error
Rollback() error
GetTx() *sqlx.Tx
GetIngestVersion(context.Context) (int, error)
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func (s *system) Shutdown() {
s.cancel()
// wait for ingestion state machine to terminate
s.wg.Wait()
s.historyQ.Close()
if err := s.ledgerBackend.Close(); err != nil {
log.WithError(err).Info("could not close ledger backend")
}
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ func (m *mockDBQ) Commit() error {
return args.Error(0)
}

func (m *mockDBQ) Close() error {
args := m.Called()
return args.Error(0)
}

func (m *mockDBQ) Rollback() error {
args := m.Called()
return args.Error(0)
Expand Down
13 changes: 13 additions & 0 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func newParallelSystems(config Config, workerCount uint, systemFactory func(Conf
}, nil
}

func (ps *ParallelSystems) Shutdown() {
log.Info("Shutting down parallel ingestion system...")
if ps.config.HistorySession != nil {
ps.config.HistorySession.Close()
}
if ps.config.CoreSession != nil {
ps.config.CoreSession.Close()
}
}

func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError {

for {
Expand Down Expand Up @@ -128,6 +138,9 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat
// the user needs to start again to prevent the gaps.
lowestRangeErr *rangeError
)

defer ps.Shutdown()

if err := validateRanges(ledgerRanges); err != nil {
return err
}
Expand Down
25 changes: 9 additions & 16 deletions services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
Expand Down Expand Up @@ -153,21 +154,7 @@ func TestReingestDB(t *testing.T) {
itest, reachedLedger := initializeDBIntegrationTest(t)
tt := assert.New(t)

// Create a fresh Horizon database
newDB := dbtest.Postgres(t)
// TODO: Unfortunately Horizon's ingestion System leaves open sessions behind,leading to
// a "database is being accessed by other users" error when trying to drop it
// defer newDB.Close()
freshHorizonPostgresURL := newDB.DSN
horizonConfig := itest.GetHorizonConfig()
horizonConfig.DatabaseURL = freshHorizonPostgresURL
// Initialize the DB schema
dbConn, err := db.Open("postgres", freshHorizonPostgresURL)
tt.NoError(err)
defer dbConn.Close()
_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
Expand Down Expand Up @@ -250,6 +237,7 @@ func command(horizonConfig horizon.Config, args ...string) []string {
}

func TestFillGaps(t *testing.T) {
os.Setenv("HORIZON_INTEGRATION_TESTS", "true")
itest, reachedLedger := initializeDBIntegrationTest(t)
tt := assert.New(t)

Expand All @@ -263,7 +251,13 @@ func TestFillGaps(t *testing.T) {
horizonConfig.DatabaseURL = freshHorizonPostgresURL
// Initialize the DB schema
dbConn, err := db.Open("postgres", freshHorizonPostgresURL)
defer dbConn.Close()
tt.NoError(err)
historyQ := history.Q{dbConn}
defer func() {
historyQ.Close()
newDB.Close()
}()

_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

Expand Down Expand Up @@ -307,7 +301,6 @@ func TestFillGaps(t *testing.T) {
// subprocesses to conflict.
itest.StopHorizon()

historyQ := history.Q{dbConn}
var oldestLedger, latestLedger int64
tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger))
tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger))
Expand Down
11 changes: 7 additions & 4 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ func (i *Test) StartHorizon() error {
if horizonPostgresURL == "" {
postgres := dbtest.Postgres(i.t)
i.shutdownCalls = append(i.shutdownCalls, func() {
// FIXME: Unfortunately, Horizon leaves open sessions behind,
// leading to a "database is being accessed by other users"
// error when trying to drop it.
// postgres.Close()
i.StopHorizon()
postgres.Close()
})
horizonPostgresURL = postgres.DSN
}
Expand Down Expand Up @@ -471,6 +469,11 @@ func (i *Test) Horizon() *horizon.App {

// StopHorizon shuts down the running Horizon process
func (i *Test) StopHorizon() {
if i.app == nil {
// horizon has already been stopped
return
}

i.app.CloseDB()
i.app.Close()

Expand Down
3 changes: 3 additions & 0 deletions support/db/dbtest/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func Postgres(t testing.TB) *DB {
result.DSN = fmt.Sprintf("postgres://%s@localhost/%s?sslmode=disable&timezone=UTC", pgUser, result.dbName)

result.closer = func() {
// pg_terminate_backend is a best effort, it does not gaurantee that it can close any lingering connections
// it sends a quit signal to each remaining connection in the db
execStatement(t, pgUser, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"+pq.QuoteIdentifier(result.dbName)+"';")
execStatement(t, pgUser, "DROP DATABASE "+pq.QuoteIdentifier(result.dbName))
}

Expand Down

0 comments on commit 54e6d9b

Please sign in to comment.