Skip to content

Commit

Permalink
Moved to the sql package
Browse files Browse the repository at this point in the history
Migrated sqlite too
  • Loading branch information
ItalyPaleAle committed Sep 18, 2023
1 parent df6dfe2 commit 637bf85
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 137 deletions.
153 changes: 153 additions & 0 deletions internal/component/sql/migrations/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sqlitemigrations

import (
"context"
"database/sql"
"fmt"
"time"

sqlinternal "github.com/dapr/components-contrib/internal/component/sql"
"github.com/dapr/kit/logger"
)

// Migrations performs migrations for the database schema
type Migrations struct {
Pool *sql.DB
Logger logger.Logger
MetadataTableName string

conn *sql.Conn
}

// Perform the required migrations
func (m *Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.MigrationFn) (err error) {
// Get a connection so we can create a transaction
m.conn, err = m.Pool.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to get a connection from the pool: %w", err)
}
defer m.conn.Close()

// Begin an exclusive transaction
// We can't use Begin because that doesn't allow us setting the level of transaction
queryCtx, cancel := context.WithTimeout(ctx, time.Minute)
_, err = m.conn.ExecContext(queryCtx, "BEGIN EXCLUSIVE TRANSACTION")
cancel()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

// Rollback the transaction in a deferred statement to catch errors
success := false
defer func() {
if success {
return
}
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
cancel()
if err != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to rollback transaction: %v", err)
}
}()

// Perform the migrations
err = sqlinternal.Migrate(ctx, sqlinternal.AdaptDatabaseSQLConn(m.conn), sqlinternal.MigrationOptions{
Logger: m.Logger,
GetVersionQuery: fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, m.MetadataTableName),
UpdateVersionQuery: func(version string) (string, any) {
return fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES ('migrations', ?)`, m.MetadataTableName),
version
},
EnsureMetadataTable: func(ctx context.Context) error {
// Check if the metadata table exists, which we also use to store the migration level
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
var exists bool
exists, err = m.tableExists(queryCtx, m.conn, m.MetadataTableName)
cancel()
if err != nil {
return fmt.Errorf("failed to check if the metadata table exists: %w", err)
}

// If the table doesn't exist, create it
if !exists {
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
err = m.createMetadataTable(queryCtx, m.conn)
cancel()
if err != nil {
return fmt.Errorf("failed to create metadata table: %w", err)
}
}

return nil
},
Migrations: migrationFns,
})
if err != nil {
return err
}

// Commit the transaction
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.conn.ExecContext(queryCtx, "COMMIT TRANSACTION")
cancel()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

// Set success to true so we don't also run a rollback
success = true

return nil
}

// GetConn returns the active connection.
func (m *Migrations) GetConn() *sql.Conn {
return m.conn
}

// Returns true if a table exists
func (m Migrations) tableExists(parentCtx context.Context, db sqlinternal.DatabaseSQLConn, tableName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
defer cancel()

var exists string
// Returns 1 or 0 as a string if the table exists or not.
const q = `SELECT EXISTS (
SELECT name FROM sqlite_master WHERE type='table' AND name = ?
) AS 'exists'`
err := db.QueryRowContext(ctx, q, m.MetadataTableName).
Scan(&exists)
return exists == "1", err
}

func (m Migrations) createMetadataTable(ctx context.Context, db sqlinternal.DatabaseSQLConn) error {
m.Logger.Infof("Creating metadata table '%s' if it doesn't exist", m.MetadataTableName)
// Add an "IF NOT EXISTS" in case another Dapr sidecar is creating the same table at the same time
// In the next step we'll acquire a lock so there won't be issues with concurrency
_, err := db.ExecContext(ctx, fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
)`,
m.MetadataTableName,
))
if err != nil {
return fmt.Errorf("failed to create metadata table: %w", err)
}
return nil
}
2 changes: 1 addition & 1 deletion state/postgresql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/dapr/components-contrib/internal/component/postgresql"
pginterfaces "github.com/dapr/components-contrib/internal/component/postgresql/interfaces"
pgmigrations "github.com/dapr/components-contrib/internal/component/postgresql/migrations"
sqlinternal "github.com/dapr/components-contrib/internal/component/sql"
pgmigrations "github.com/dapr/components-contrib/internal/component/sql/migrations/postgres"
)

// Performs the required migrations
Expand Down
9 changes: 3 additions & 6 deletions state/sqlite/sqlite_dbaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,10 @@ func (a *sqliteDBAccess) Init(ctx context.Context, md state.Metadata) error {
}

// Performs migrations
migrate := &migrations{
Logger: a.logger,
Pool: a.db,
MetadataTableName: a.metadata.MetadataTableName,
err = performMigrations(ctx, a.db, a.logger, migrationOptions{
StateTableName: a.metadata.TableName,
}
err = migrate.Perform(ctx)
MetadataTableName: a.metadata.MetadataTableName,
})
if err != nil {
return fmt.Errorf("failed to perform migrations: %w", err)
}
Expand Down
153 changes: 23 additions & 130 deletions state/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,155 +17,48 @@ import (
"context"
"database/sql"
"fmt"
"time"

sqlinternal "github.com/dapr/components-contrib/internal/component/sql"
sqlitemigrations "github.com/dapr/components-contrib/internal/component/sql/migrations/sqlite"
"github.com/dapr/kit/logger"
)

// Performs migrations for the database schema
type migrations struct {
Logger logger.Logger
Pool *sql.DB
type migrationOptions struct {
StateTableName string
MetadataTableName string
}

// Perform the required migrations
func (m *migrations) Perform(ctx context.Context) error {
// Get a connection so we can create a transaction
conn, err := m.Pool.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to get a connection from the pool: %w", err)
func performMigrations(ctx context.Context, db *sql.DB, logger logger.Logger, opts migrationOptions) error {
m := sqlitemigrations.Migrations{
Pool: db,
Logger: logger,
MetadataTableName: opts.MetadataTableName,
}
defer conn.Close()

// Begin an exclusive transaction
// We can't use Begin because that doesn't allow us setting the level of transaction
queryCtx, cancel := context.WithTimeout(ctx, time.Minute)
_, err = conn.ExecContext(queryCtx, "BEGIN EXCLUSIVE TRANSACTION")
cancel()
if err != nil {
return fmt.Errorf("faild to begin transaction: %w", err)
}

// Rollback the transaction in a deferred statement to catch errors
success := false
defer func() {
if success {
return
}
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
cancel()
if err != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to rollback transaction: %v", err)
}
}()

// Perform the migrations
err = sqlinternal.Migrate(ctx, sqlinternal.AdaptDatabaseSQLConn(conn), sqlinternal.MigrationOptions{
Logger: m.Logger,
GetVersionQuery: fmt.Sprintf(`SELECT value FROM %s WHERE key = 'migrations'`, m.MetadataTableName),
UpdateVersionQuery: func(version string) (string, any) {
return fmt.Sprintf(`REPLACE INTO %s (key, value) VALUES ('migrations', ?)`, m.MetadataTableName),
version
},
EnsureMetadataTable: func(ctx context.Context) error {
// Check if the metadata table exists, which we also use to store the migration level
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
var exists bool
exists, err = m.tableExists(queryCtx, conn, m.MetadataTableName)
cancel()
if err != nil {
return fmt.Errorf("failed to check if the metadata table exists: %w", err)
}

// If the table doesn't exist, create it
if !exists {
queryCtx, cancel = context.WithTimeout(ctx, 30*time.Second)
err = m.createMetadataTable(queryCtx, conn)
cancel()
if err != nil {
return fmt.Errorf("failed to create metadata table: %w", err)
}
}

return nil
},
Migrations: []sqlinternal.MigrationFn{
// Migration 0: create the state table
func(ctx context.Context) error {
// We need to add an "IF NOT EXISTS" because we may be migrating from when we did not use a metadata table
m.Logger.Infof("Creating state table '%s'", m.StateTableName)
_, err = conn.ExecContext(
ctx,
fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s (
return m.Perform(ctx, []sqlinternal.MigrationFn{
// Migration 0: create the state table
func(ctx context.Context) error {
// We need to add an "IF NOT EXISTS" because we may be migrating from when we did not use a metadata table
logger.Infof("Creating state table '%s'", opts.StateTableName)
_, err := m.GetConn().ExecContext(
ctx,
fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s (
key TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL,
is_binary BOOLEAN NOT NULL,
etag TEXT NOT NULL,
expiration_time TIMESTAMP DEFAULT NULL,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
m.StateTableName,
),
)
if err != nil {
return fmt.Errorf("failed to create state table: %w", err)
}
return nil
},
opts.StateTableName,
),
)
if err != nil {
return fmt.Errorf("failed to create state table: %w", err)
}
return nil
},
})
if err != nil {
return err
}

// Commit the transaction
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = conn.ExecContext(queryCtx, "COMMIT TRANSACTION")
cancel()
if err != nil {
return fmt.Errorf("failed to commit transaction")
}

// Set success to true so we don't also run a rollback
success = true

return nil
}

// Returns true if a table exists
func (m migrations) tableExists(parentCtx context.Context, db querier, tableName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
defer cancel()

var exists string
// Returns 1 or 0 as a string if the table exists or not.
const q = `SELECT EXISTS (
SELECT name FROM sqlite_master WHERE type='table' AND name = ?
) AS 'exists'`
err := db.QueryRowContext(ctx, q, m.MetadataTableName).
Scan(&exists)
return exists == "1", err
}

func (m migrations) createMetadataTable(ctx context.Context, db querier) error {
m.Logger.Infof("Creating metadata table '%s' if it doesn't exist", m.MetadataTableName)
// Add an "IF NOT EXISTS" in case another Dapr sidecar is creating the same table at the same time
// In the next step we'll acquire a lock so there won't be issues with concurrency
_, err := db.ExecContext(ctx, fmt.Sprintf(
`CREATE TABLE IF NOT EXISTS %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
)`,
m.MetadataTableName,
))
if err != nil {
return fmt.Errorf("failed to create metadata table: %w", err)
}
return nil
}

0 comments on commit 637bf85

Please sign in to comment.