diff --git a/internal/component/postgresql/migrations/migrations.go b/internal/component/sql/migrations/postgres/postgres_migrations.go similarity index 100% rename from internal/component/postgresql/migrations/migrations.go rename to internal/component/sql/migrations/postgres/postgres_migrations.go diff --git a/internal/component/sql/migrations/sqlite/sqlite_migrations.go b/internal/component/sql/migrations/sqlite/sqlite_migrations.go new file mode 100644 index 0000000000..2a9b6820ed --- /dev/null +++ b/internal/component/sql/migrations/sqlite/sqlite_migrations.go @@ -0,0 +1,153 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqlitemigrations + +import ( + "context" + "database/sql" + "fmt" + "time" + + sqlinternal "github.com/dapr/components-contrib/internal/component/sql" + "github.com/dapr/kit/logger" +) + +// Migrations performs migrations for the database schema +type Migrations struct { + Pool *sql.DB + Logger logger.Logger + MetadataTableName string + + conn *sql.Conn +} + +// Perform the required migrations +func (m *Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.MigrationFn) (err error) { + // Get a connection so we can create a transaction + m.conn, err = m.Pool.Conn(ctx) + if err != nil { + return fmt.Errorf("failed to get a connection from the pool: %w", err) + } + defer m.conn.Close() + + // Begin an exclusive transaction + // We can't use Begin because that doesn't allow us setting the level of transaction + queryCtx, cancel := context.WithTimeout(ctx, time.Minute) + _, err = m.conn.ExecContext(queryCtx, "BEGIN EXCLUSIVE TRANSACTION") + cancel() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + // Rollback the transaction in a deferred statement to catch errors + success := false + defer func() { + if success { + return + } + queryCtx, cancel = context.WithTimeout(ctx, time.Minute) + _, err = m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION") + cancel() + if err != nil { + // Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around + m.Logger.Fatalf("Failed to rollback transaction: %v", err) + } + }() + + // Perform the migrations + err = sqlinternal.Migrate(ctx, sqlinternal.AdaptDatabaseSQLConn(m.conn), sqlinternal.MigrationOptions{ + Logger: m.Logger, + GetVersionQuery: fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, m.MetadataTableName), + UpdateVersionQuery: func(version string) (string, any) { + return fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES ('migrations', ?)`, m.MetadataTableName), + version + }, + EnsureMetadataTable: func(ctx context.Context) error { + // Check if the metadata table exists, which we also use to store the migration level + queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second) + var exists bool + exists, err = m.tableExists(queryCtx, m.conn, m.MetadataTableName) + cancel() + if err != nil { + return fmt.Errorf("failed to check if the metadata table exists: %w", err) + } + + // If the table doesn't exist, create it + if !exists { + queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second) + err = m.createMetadataTable(queryCtx, m.conn) + cancel() + if err != nil { + return fmt.Errorf("failed to create metadata table: %w", err) + } + } + + return nil + }, + Migrations: migrationFns, + }) + if err != nil { + return err + } + + // Commit the transaction + queryCtx, cancel = context.WithTimeout(ctx, time.Minute) + _, err = m.conn.ExecContext(queryCtx, "COMMIT TRANSACTION") + cancel() + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + // Set success to true so we don't also run a rollback + success = true + + return nil +} + +// GetConn returns the active connection. +func (m *Migrations) GetConn() *sql.Conn { + return m.conn +} + +// Returns true if a table exists +func (m Migrations) tableExists(parentCtx context.Context, db sqlinternal.DatabaseSQLConn, tableName string) (bool, error) { + ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + defer cancel() + + var exists string + // Returns 1 or 0 as a string if the table exists or not. + const q = `SELECT EXISTS ( + SELECT name FROM sqlite_master WHERE type='table' AND name = ? + ) AS 'exists'` + err := db.QueryRowContext(ctx, q, m.MetadataTableName). + Scan(&exists) + return exists == "1", err +} + +func (m Migrations) createMetadataTable(ctx context.Context, db sqlinternal.DatabaseSQLConn) error { + m.Logger.Infof("Creating metadata table '%s' if it doesn't exist", m.MetadataTableName) + // Add an "IF NOT EXISTS" in case another Dapr sidecar is creating the same table at the same time + // In the next step we'll acquire a lock so there won't be issues with concurrency + _, err := db.ExecContext(ctx, fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s ( + key text NOT NULL PRIMARY KEY, + value text NOT NULL + )`, + m.MetadataTableName, + )) + if err != nil { + return fmt.Errorf("failed to create metadata table: %w", err) + } + return nil +} diff --git a/state/postgresql/migrations.go b/state/postgresql/migrations.go index d18c8593f7..adeaffdd3d 100644 --- a/state/postgresql/migrations.go +++ b/state/postgresql/migrations.go @@ -19,8 +19,8 @@ import ( "github.com/dapr/components-contrib/internal/component/postgresql" pginterfaces "github.com/dapr/components-contrib/internal/component/postgresql/interfaces" - pgmigrations "github.com/dapr/components-contrib/internal/component/postgresql/migrations" sqlinternal "github.com/dapr/components-contrib/internal/component/sql" + pgmigrations "github.com/dapr/components-contrib/internal/component/sql/migrations/postgres" ) // Performs the required migrations diff --git a/state/sqlite/sqlite_dbaccess.go b/state/sqlite/sqlite_dbaccess.go index bf9001c54e..e05b5c5032 100644 --- a/state/sqlite/sqlite_dbaccess.go +++ b/state/sqlite/sqlite_dbaccess.go @@ -97,13 +97,10 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error { } // Performs migrations - migrate := &migrations{ - Logger: a.logger, - Pool: a.db, - MetadataTableName: a.metadata.MetadataTableName, + err = performMigrations(ctx, a.db, a.logger, migrationOptions{ StateTableName: a.metadata.TableName, - } - err = migrate.Perform(ctx) + MetadataTableName: a.metadata.MetadataTableName, + }) if err != nil { return fmt.Errorf("failed to perform migrations: %w", err) } diff --git a/state/sqlite/sqlite_migrations.go b/state/sqlite/sqlite_migrations.go index 77b56c23a0..f44b41c847 100644 --- a/state/sqlite/sqlite_migrations.go +++ b/state/sqlite/sqlite_migrations.go @@ -17,92 +17,34 @@ import ( "context" "database/sql" "fmt" - "time" sqlinternal "github.com/dapr/components-contrib/internal/component/sql" + sqlitemigrations "github.com/dapr/components-contrib/internal/component/sql/migrations/sqlite" "github.com/dapr/kit/logger" ) -// Performs migrations for the database schema -type migrations struct { - Logger logger.Logger - Pool *sql.DB +type migrationOptions struct { StateTableName string MetadataTableName string } // Perform the required migrations -func (m *migrations) Perform(ctx context.Context) error { - // Get a connection so we can create a transaction - conn, err := m.Pool.Conn(ctx) - if err != nil { - return fmt.Errorf("failed to get a connection from the pool: %w", err) +func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error { + m := sqlitemigrations.Migrations{ + Pool: db, + Logger: logger, + MetadataTableName: opts.MetadataTableName, } - defer conn.Close() - // Begin an exclusive transaction - // We can't use Begin because that doesn't allow us setting the level of transaction - queryCtx, cancel := context.WithTimeout(ctx, time.Minute) - _, err = conn.ExecContext(queryCtx, "BEGIN EXCLUSIVE TRANSACTION") - cancel() - if err != nil { - return fmt.Errorf("faild to begin transaction: %w", err) - } - - // Rollback the transaction in a deferred statement to catch errors - success := false - defer func() { - if success { - return - } - queryCtx, cancel = context.WithTimeout(ctx, time.Minute) - _, err = conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION") - cancel() - if err != nil { - // Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around - m.Logger.Fatalf("Failed to rollback transaction: %v", err) - } - }() - - // Perform the migrations - err = sqlinternal.Migrate(ctx, sqlinternal.AdaptDatabaseSQLConn(conn), sqlinternal.MigrationOptions{ - Logger: m.Logger, - GetVersionQuery: fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, m.MetadataTableName), - UpdateVersionQuery: func(version string) (string, any) { - return fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES ('migrations', ?)`, m.MetadataTableName), - version - }, - EnsureMetadataTable: func(ctx context.Context) error { - // Check if the metadata table exists, which we also use to store the migration level - queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second) - var exists bool - exists, err = m.tableExists(queryCtx, conn, m.MetadataTableName) - cancel() - if err != nil { - return fmt.Errorf("failed to check if the metadata table exists: %w", err) - } - - // If the table doesn't exist, create it - if !exists { - queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second) - err = m.createMetadataTable(queryCtx, conn) - cancel() - if err != nil { - return fmt.Errorf("failed to create metadata table: %w", err) - } - } - - return nil - }, - Migrations: []sqlinternal.MigrationFn{ - // Migration 0: create the state table - func(ctx context.Context) error { - // We need to add an "IF NOT EXISTS" because we may be migrating from when we did not use a metadata table - m.Logger.Infof("Creating state table '%s'", m.StateTableName) - _, err = conn.ExecContext( - ctx, - fmt.Sprintf( - `CREATE TABLE IF NOT EXISTS %s ( + return m.Perform(ctx, []sqlinternal.MigrationFn{ + // Migration 0: create the state table + func(ctx context.Context) error { + // We need to add an "IF NOT EXISTS" because we may be migrating from when we did not use a metadata table + logger.Infof("Creating state table '%s'", opts.StateTableName) + _, err := m.GetConn().ExecContext( + ctx, + fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s ( key TEXT NOT NULL PRIMARY KEY, value TEXT NOT NULL, is_binary BOOLEAN NOT NULL, @@ -110,62 +52,13 @@ func (m *migrations) Perform(ctx context.Context) error { expiration_time TIMESTAMP DEFAULT NULL, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP )`, - m.StateTableName, - ), - ) - if err != nil { - return fmt.Errorf("failed to create state table: %w", err) - } - return nil - }, + opts.StateTableName, + ), + ) + if err != nil { + return fmt.Errorf("failed to create state table: %w", err) + } + return nil }, }) - if err != nil { - return err - } - - // Commit the transaction - queryCtx, cancel = context.WithTimeout(ctx, time.Minute) - _, err = conn.ExecContext(queryCtx, "COMMIT TRANSACTION") - cancel() - if err != nil { - return fmt.Errorf("failed to commit transaction") - } - - // Set success to true so we don't also run a rollback - success = true - - return nil -} - -// Returns true if a table exists -func (m migrations) tableExists(parentCtx context.Context, db querier, tableName string) (bool, error) { - ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) - defer cancel() - - var exists string - // Returns 1 or 0 as a string if the table exists or not. - const q = `SELECT EXISTS ( - SELECT name FROM sqlite_master WHERE type='table' AND name = ? - ) AS 'exists'` - err := db.QueryRowContext(ctx, q, m.MetadataTableName). - Scan(&exists) - return exists == "1", err -} - -func (m migrations) createMetadataTable(ctx context.Context, db querier) error { - m.Logger.Infof("Creating metadata table '%s' if it doesn't exist", m.MetadataTableName) - // Add an "IF NOT EXISTS" in case another Dapr sidecar is creating the same table at the same time - // In the next step we'll acquire a lock so there won't be issues with concurrency - _, err := db.ExecContext(ctx, fmt.Sprintf( - `CREATE TABLE IF NOT EXISTS %s ( - key text NOT NULL PRIMARY KEY, - value text NOT NULL - )`, - m.MetadataTableName, - )) - if err != nil { - return fmt.Errorf("failed to create metadata table: %w", err) - } - return nil }