Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/integrations: db_test should drop test db instances when finished #4185

Merged
merged 10 commits into from
May 9, 2022
Merged
23 changes: 13 additions & 10 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,14 @@ var dbFillGapsCmd = &cobra.Command{
}

func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
var err error

if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
CheckpointFrequency: config.CheckpointFrequency,
MaxReingestRetries: int(retries),
Expand All @@ -398,15 +395,18 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
StellarCoreURL: config.StellarCoreURL,
}

if !ingestConfig.EnableCaptiveCore {
if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}

if !config.EnableCaptiveCoreIngestion {
if config.StellarCoreDatabaseURL == "" {
return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName)
}
coreSession, dbErr := db.Open("postgres", config.StellarCoreDatabaseURL)
if dbErr != nil {
return fmt.Errorf("cannot open Core DB: %v", dbErr)
if ingestConfig.CoreSession, err = db.Open("postgres", config.StellarCoreDatabaseURL); err != nil {
ingestConfig.HistorySession.Close()
return fmt.Errorf("cannot open Core DB: %v", err)
}
ingestConfig.CoreSession = coreSession
}

if parallelWorkers > 1 {
Expand All @@ -425,6 +425,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
if systemErr != nil {
return systemErr
}
defer system.Shutdown()

err = system.ReingestRange(ledgerRanges, reingestForce)
if err != nil {
Expand Down Expand Up @@ -477,6 +478,7 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) {
if err != nil {
return nil, err
}
defer horizonSession.Close()
q := &history.Q{horizonSession}
return q.GetLedgerGaps(context.Background())
}
Expand All @@ -486,6 +488,7 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history
if err != nil {
return nil, err
}
defer horizonSession.Close()
q := &history.Q{horizonSession}
return q.GetLedgerGapsInRange(context.Background(), start, end)
}
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ type IngestionQ interface {
BeginTx(*sql.TxOptions) error
Commit() error
CloneIngestionQ() IngestionQ
Close() error
Rollback() error
GetTx() *sqlx.Tx
GetIngestVersion(context.Context) (int, error)
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func (s *system) Shutdown() {
s.cancel()
// wait for ingestion state machine to terminate
s.wg.Wait()
s.historyQ.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session.Close() is calling Session.DB.Close() and DB can be shared with other Session instances outside ingestion package. I think we shouldn't call it here. DB should be closed when all Horizon subsystems are closed and this is done in App.CloseDB().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stepped through this in debug, using PGAdmin to view open sessions on the test db instance as horizon app shuts down from a app.Close() invocation, and what I observed:

(1) waitForDone() https://github.com/sreuland/go/blob/3268_test_db_cleanup/services/horizon/internal/app.go#L170 runs first which then calls this ingest.Shutdown()
(2) App.CloseDB() https://github.com/sreuland/go/blob/3268_test_db_cleanup/services/horizon/internal/app.go#L157 runs second after waitForDone() finishes

horizon.waitForDone() and cmd.runDBReingestRange() are the only callers of Shutdown()

When horizon App shuts down, App.CloseDB() does not result in all the active sessions on the db instance being closed, there are 2 or 3 left open, when the historyQ.Close() is invoked from within ingest.Shutdown(), it results in those remaining sessions on db to be closed, maybe it's gc/memory thing since the DB session from horizon app is cloned and placed into a new history.Q struct instance which lives inside the ingest system struct scope, the sessions seem unique to their parent object references, which kind of contrasts the 'shared' aspect since, it appears as though each session instance has exclusive unique db sessions, and they won't be affected by the other, only when both of the sessions are closed, then the integration test cleanup routine that tries to drop the test db will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartekn , did the session closing behavior i mentioned sound sensible? when ingest initiates shutdown it's from horizon waitForDone() which means overall app is going down anyway or from a db reingest command via runDBREingestRange(), the process is ending in both cases when ingest shutdown initiates the close on session which leads to DB. Without invoking the close on this ingest session instance, the underlying cloned session tends to lead to sessions left open on the db server, I haven't been able to remove them otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2opremio , wanted to get your opinion in this fix too, do you see value in taking this approach to delete test db instances, or is it just introducing unnecessary risk, etc. I was thinking to close this PR rather than lean on it anymore or adjust it if something more palpable is suggested. thanks for any insights!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for a huge delay answering this. I missed the notification somehow. I prepared the code sample below that shows that closing a cloned session actually closes the underlying DB connection for both sessions:

package main

import (
	"context"

	"github.com/stellar/go/support/db"
)

func main() {
	session, err := db.Open("postgres", "postgres://bartek@localhost:5432/test?sslmode=disable")
	if err != nil {
		panic(err)
	}

	c1 := session.Clone()
	c2 := session.Clone()

	_, err = c1.ExecRaw(context.Background(), "SELECT 1;")
	if err != nil {
		panic(err)
	}

	err = c1.Close()
	if err != nil {
		panic(err)
	}

	_, err = c2.ExecRaw(context.Background(), "SELECT 1;")
	if err != nil {
		panic(err)
	}
}

Result:

panic: exec failed: sql: database is closed

goroutine 1 [running]:
main.main()
	/Users/bartek/go/src/github.com/stellar/go/shawn_test/main.go:30 +0x132
exit status 2

The reason why the code works in this PR is that in waitForDone the ingestion system is shut down after webserver (a.ingester.Shutdown()). But if we ever move it before a webserver or add another service that will send DB connections in the background and will be used after a.ingester.Shutdown() this will generate errors and bugs (like background worker will not be able to commit it's job).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for verifying the go db/session close behavior, very helpful to rule that out!
yes, we don't want implicit coupling like that either, ok, will look into more. w/o this DB close in ingest shutdown, we strangely get the opposite effect, which is when App.CloseDB() is executed, the sessions related to ingestion don't get closed on the db server side as observed in PGAdmin, that is, until the go o/s process is terminated at which point they are gone.

Copy link
Contributor

@2opremio 2opremio May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

horizon.waitForDone() and cmd.runDBReingestRange() are the only callers of Shutdown()

All tests call it implitcitly on shutdown due to this call in integration.go:

	i.t.Cleanup(i.Shutdown)

if err := s.ledgerBackend.Close(); err != nil {
log.WithError(err).Info("could not close ledger backend")
}
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ func (m *mockDBQ) Commit() error {
return args.Error(0)
}

func (m *mockDBQ) Close() error {
args := m.Called()
return args.Error(0)
}

func (m *mockDBQ) Rollback() error {
args := m.Called()
return args.Error(0)
Expand Down
13 changes: 13 additions & 0 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func newParallelSystems(config Config, workerCount uint, systemFactory func(Conf
}, nil
}

func (ps *ParallelSystems) Shutdown() {
log.Info("Shutting down parallel ingestion system...")
if ps.config.HistorySession != nil {
ps.config.HistorySession.Close()
}
if ps.config.CoreSession != nil {
ps.config.CoreSession.Close()
}
}

func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan history.LedgerRange) rangeError {

for {
Expand Down Expand Up @@ -128,6 +138,9 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat
// the user needs to start again to prevent the gaps.
lowestRangeErr *rangeError
)

defer ps.Shutdown()

if err := validateRanges(ledgerRanges); err != nil {
return err
}
Expand Down
23 changes: 7 additions & 16 deletions services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,7 @@ func TestReingestDB(t *testing.T) {
itest, reachedLedger := initializeDBIntegrationTest(t)
tt := assert.New(t)

// Create a fresh Horizon database
newDB := dbtest.Postgres(t)
// TODO: Unfortunately Horizon's ingestion System leaves open sessions behind,leading to
// a "database is being accessed by other users" error when trying to drop it
// defer newDB.Close()
freshHorizonPostgresURL := newDB.DSN
horizonConfig := itest.GetHorizonConfig()
horizonConfig.DatabaseURL = freshHorizonPostgresURL
// Initialize the DB schema
dbConn, err := db.Open("postgres", freshHorizonPostgresURL)
tt.NoError(err)
defer dbConn.Close()
_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
Expand Down Expand Up @@ -263,7 +249,13 @@ func TestFillGaps(t *testing.T) {
horizonConfig.DatabaseURL = freshHorizonPostgresURL
// Initialize the DB schema
dbConn, err := db.Open("postgres", freshHorizonPostgresURL)
defer dbConn.Close()
tt.NoError(err)
historyQ := history.Q{dbConn}
defer func() {
historyQ.Close()
newDB.Close()
}()

_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

Expand Down Expand Up @@ -307,7 +299,6 @@ func TestFillGaps(t *testing.T) {
// subprocesses to conflict.
itest.StopHorizon()

historyQ := history.Q{dbConn}
var oldestLedger, latestLedger int64
tt.NoError(historyQ.ElderLedger(context.Background(), &oldestLedger))
tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger))
Expand Down
11 changes: 7 additions & 4 deletions services/horizon/internal/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ func (i *Test) StartHorizon() error {
if horizonPostgresURL == "" {
postgres := dbtest.Postgres(i.t)
i.shutdownCalls = append(i.shutdownCalls, func() {
// FIXME: Unfortunately, Horizon leaves open sessions behind,
// leading to a "database is being accessed by other users"
// error when trying to drop it.
// postgres.Close()
i.StopHorizon()
postgres.Close()
})
horizonPostgresURL = postgres.DSN
}
Expand Down Expand Up @@ -471,6 +469,11 @@ func (i *Test) Horizon() *horizon.App {

// StopHorizon shuts down the running Horizon process
func (i *Test) StopHorizon() {
if i.app == nil {
// horizon has already been stopped
return
}

i.app.CloseDB()
i.app.Close()

Expand Down
3 changes: 3 additions & 0 deletions support/db/dbtest/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func Postgres(t testing.TB) *DB {
result.DSN = fmt.Sprintf("postgres://%s@localhost/%s?sslmode=disable&timezone=UTC", pgUser, result.dbName)

result.closer = func() {
// pg_terminate_backend is a best effort, it does not gaurantee that it can close any lingering connections
// it sends a quit signal to each remaining connection in the db
execStatement(t, pgUser, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"+pq.QuoteIdentifier(result.dbName)+"';")
execStatement(t, pgUser, "DROP DATABASE "+pq.QuoteIdentifier(result.dbName))
}

Expand Down