Skip to content

Commit

Permalink
MySQL 8: use low lock_wait_timeout with workspace=temp-schema
Browse files Browse the repository at this point in the history
This commit adjusts DDL calls in workspace=temp-schema to use a session-level
lock_wait_timeout=5 (seconds) if the database is running MySQL 8. This is a
safety precaution in case cross-schema foreign keys are present. MySQL 8
made some changes to foreign key metadata locking, which means the workspace
DDL is not fully isolated from other workloads/queries' metadata locking if
cross-schema foreign keys are in use.

Background:

MySQL and MariaDB use metadata locking (MDL) to ensure table structure does
not change while transactions are already in-progress. Generally, read and
write queries obtain shared MDL on any table mentioned in the query. DDL
requires an exclusive lock; it queues behind any in-progress transactions, but
in front of any other new pending transactions that come in after the DDL.

The lock_wait_timeout session variable controls how long a query will wait to
obtain a metadata lock before timing out. However, this variable has extremely
high defaults: 1 year in MySQL, or 1 day in MariaDB 10.2+. This means that in
the event of pending/queued metadata locking, by default the observed effect
will appear to be a complete stall of the affected queries.

Ordinarily, Skeema's default setting of workspace=temp-schema is isolated from
the metadata locking of other workloads/queries, since it uses a dedicated
database schema (_skeema_tmp by default) for performing all operations. In
other words, DDL performed in _skeema_tmp wouldn't interact with metadata
locking of things outside of _skeema_tmp.

MySQL 8's change:

In MySQL 8, metadata locking is also extended to the "parent" side of a
foreign key (FK) constraint. This means that if you create a table with an
FK (which is always specified on the "child" side), this DDL will be affected
by any MDL on the "parent" (referenced) side table as well. If there's already
a slow transaction querying a table, and you attempt to execute a CREATE TABLE
with a child-side FK referencing this parent table, the CREATE TABLE will
block until the slow transaction completes. Not only that, but all subsequent
queries (of any type!) on the parent table will now block as well, since they
are queued behind the blocked CREATE TABLE.

Impact in Skeema (MySQL 8 + cross-schema FKs + workspace=temp-schema):

In Skeema, this MySQL 8 change can be problematic whenever cross-schema FKs
are in-use: that is, the child and parent tables are located in different
schemas. Skeema's workspace model operates on one schema at a time, running
the *.sql CREATEs in a temporary location so that they can be introspected
from information_schema. With cross-schema FKs, this means the "parent" side
is a real non-workspace table. Previously, this had no harmful impact. But
with MySQL 8 extending MDL to the parent, it can be prone to the stall
situations described above.

Note that same-schema FKs are unaffected by this problem in Skeema, since both
sides of the FK will be in the workspace (e.g. _skeema_tmp), so there's no
exposure to MDL conflicts with "real" application/service query workloads.

Also note that workspace=docker is unaffected by this problem, since it moves
workspace operations to an isolated ephemeral containerized database. And
since workspace operations use session-level foreign_key_checks=0, it isn't
a problem if the parent side schema or table of a cross-schema FK simply does
not exist at all in the containerized database.

The improvement in this commit:

Whenever using workspace=temp-schema in MySQL 8, this commit now applies a
session-level lock_wait_timeout=5 to DDL operations in workspaces, with up to
one retry per DDL. This means that in the case of a single slow transaction
on the parent side of a cross-schema FK, the maximum stall time should be
about 10 seconds, rather than 1 year. Skeema will emit an error and move on
to the next subdir in this situation, which is non-ideal, but still much
preferable to a long stall requiring manual query-killing.

Even so, MySQL 8 users with cross-schema foreign key constraints should
consider using workspace=docker whenever possible, to ensure full isolation of
metadata locks. Additional workspace options may be introduced in the future
to provide other alternatives; for example see #94 which will be prioritized
in the coming months.
  • Loading branch information
evanelias committed Sep 10, 2022
1 parent c0267f4 commit 762da89
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 64 deletions.
12 changes: 6 additions & 6 deletions internal/dumper/dumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ func (s *IntegrationSuite) setupDirAndDB(t *testing.T, subdir string) {
s.setupScratchDir(t, subdir)

wsOpts := workspace.Options{
Type: workspace.TypeTempSchema,
Instance: s.d.Instance,
CleanupAction: workspace.CleanupActionDrop,
SchemaName: "dumper_test",
LockWaitTimeout: 30 * time.Second,
Concurrency: 5,
Type: workspace.TypeTempSchema,
Instance: s.d.Instance,
CleanupAction: workspace.CleanupActionDrop,
SchemaName: "dumper_test",
LockTimeout: 30 * time.Second,
Concurrency: 5,
}
wsSchema, err := workspace.ExecLogicalSchema(s.scratchDir.LogicalSchemas[0], wsOpts)
if err != nil {
Expand Down
63 changes: 41 additions & 22 deletions internal/tengo/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@ import (

// Instance represents a single database server running on a specific host or address.
type Instance struct {
BaseDSN string // DSN ending in trailing slash; i.e. no schema name or params
Driver string
User string
Password string
Host string
Port int
SocketPath string
defaultParams map[string]string
connectionPool map[string]*sqlx.DB // key is in format "schema?params"
m *sync.Mutex // protects unexported fields for concurrent operations
flavor Flavor
grants []string
waitTimeout int
maxUserConns int
bufferPoolSize int64
lowerCaseNames int
sqlMode []string
valid bool // true if any conn has ever successfully been made yet
BaseDSN string // DSN ending in trailing slash; i.e. no schema name or params
Driver string
User string
Password string
Host string
Port int
SocketPath string
defaultParams map[string]string
connectionPool map[string]*sqlx.DB // key is in format "schema?params"
m *sync.Mutex // protects unexported fields for concurrent operations
flavor Flavor
grants []string
waitTimeout int
lockWaitTimeout int
maxUserConns int
bufferPoolSize int64
lowerCaseNames int
sqlMode []string
valid bool // true if any conn has ever successfully been made yet
}

// NewInstance returns a pointer to a new Instance corresponding to the
Expand Down Expand Up @@ -262,6 +263,15 @@ func (instance *Instance) NameCaseMode() NameCaseMode {
return NameCaseMode(instance.lowerCaseNames)
}

// LockWaitTimeout returns the default session lock_wait_timeout for connections
// to this instance, or 0 if it could not be queried.
func (instance *Instance) LockWaitTimeout() int {
if ok, _ := instance.Valid(); !ok {
return 0
}
return instance.lockWaitTimeout
}

// hydrateVars populates several non-exported Instance fields by querying
// various global and session variables. Failures are ignored; these variables
// are designed to help inform behavior but are not strictly mandatory.
Expand All @@ -279,6 +289,7 @@ func (instance *Instance) hydrateVars(db *sqlx.DB, lock bool) {
Version string
SQLMode string
WaitTimeout int
LockWaitTimeout int
MaxUserConns int
MaxConns int
BufferPoolSize int64
Expand All @@ -289,6 +300,7 @@ func (instance *Instance) hydrateVars(db *sqlx.DB, lock bool) {
@@global.version AS version,
@@session.sql_mode AS sqlmode,
@@session.wait_timeout AS waittimeout,
@@session.lock_wait_timeout AS lockwaittimeout,
@@global.innodb_buffer_pool_size AS bufferpoolsize,
@@global.lower_case_table_names as lowercasetablenames,
@@session.max_user_connections AS maxuserconns,
Expand All @@ -300,6 +312,7 @@ func (instance *Instance) hydrateVars(db *sqlx.DB, lock bool) {
instance.flavor = IdentifyFlavor(result.Version, result.VersionComment)
instance.sqlMode = strings.Split(result.SQLMode, ",")
instance.waitTimeout = result.WaitTimeout
instance.lockWaitTimeout = result.LockWaitTimeout
instance.bufferPoolSize = result.BufferPoolSize
instance.lowerCaseNames = result.LowerCaseTableNames
if result.MaxUserConns > 0 {
Expand Down Expand Up @@ -742,14 +755,19 @@ type BulkDropOptions struct {
MaxConcurrency int // Max objects to drop at once
SkipBinlog bool // If true, use session sql_log_bin=0 (requires superuser)
PartitionsFirst bool // If true, drop RANGE/LIST partitioned tables one partition at a time
LockWaitTimeout int // If greater than 0, limit how long to wait for metadata locks (in seconds)
Schema *Schema // If non-nil, obtain object lists from Schema instead of running I_S queries
}

func (opts BulkDropOptions) params() string {
values := []string{"foreign_key_checks=0"}
if opts.SkipBinlog {
return "foreign_key_checks=0&sql_log_bin=0"
values = append(values, "sql_log_bin=0")
}
if opts.LockWaitTimeout > 0 {
values = append(values, fmt.Sprintf("lock_wait_timeout=%d", opts.LockWaitTimeout))
}
return "foreign_key_checks=0"
return strings.Join(values, "&")
}

// Concurrency returns the concurrency, with a minimum value of 1.
Expand Down Expand Up @@ -812,8 +830,9 @@ func (instance *Instance) DropTablesInSchema(schema string, opts BulkDropOptions
_, err = db.ExecContext(ctx, "DROP TABLE "+EscapeIdentifier(name))
// With the new data dictionary added in MySQL 8.0, attempting to
// concurrently drop two tables that have a foreign key constraint between
// them can deadlock
if IsDatabaseError(err, mysqlerr.ER_LOCK_DEADLOCK) {
// them can deadlock, so we try them serially later. Metadata lock timeouts
// are also more common with FKs in MySQL 8.0, so we retry those as well.
if IsDatabaseError(err, mysqlerr.ER_LOCK_DEADLOCK, mysqlerr.ER_LOCK_WAIT_TIMEOUT) {
retries <- name
err = nil
}
Expand Down
14 changes: 14 additions & 0 deletions internal/tengo/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ func (s TengoIntegrationSuite) TestInstanceNameCaseMode(t *testing.T) {
}
}

func (s TengoIntegrationSuite) TestInstanceLockWaitTimeout(t *testing.T) {
var expected int
// lock_wait_timeout defaults to a ridiculous 1 year in MySQL. MariaDB lowered
// it to a slightly-less-ridiculous 1 day in MariaDB 10.2.
if s.d.Flavor().Min(FlavorMariaDB102) {
expected = 86400
} else {
expected = 86400 * 365
}
if actual := s.d.LockWaitTimeout(); actual != expected {
t.Errorf("Expected LockWaitTimeout to return %d, instead found %d", expected, actual)
}
}

func (s TengoIntegrationSuite) TestInstanceCloseAll(t *testing.T) {
makePool := func(defaultSchema, params string) {
t.Helper()
Expand Down
2 changes: 1 addition & 1 deletion internal/workspace/localdocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewLocalDocker(opts Options) (_ *LocalDocker, retErr error) {
}

lockName := fmt.Sprintf("skeema.%s", ld.schemaName)
if ld.releaseLock, err = getLock(ld.d.Instance, lockName, opts.LockWaitTimeout); err != nil {
if ld.releaseLock, err = getLock(ld.d.Instance, lockName, opts.LockTimeout); err != nil {
return nil, fmt.Errorf("Unable to obtain lock on %s: %s", ld.d.Instance, err)
}
// If this function returns an error, don't continue to hold the lock
Expand Down
8 changes: 4 additions & 4 deletions internal/workspace/localdocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s WorkspaceIntegrationSuite) TestLocalDockerErrors(t *testing.T) {
DefaultCharacterSet: "latin1",
DefaultCollation: "latin1_swedish_ci",
RootPassword: "",
LockWaitTimeout: 100 * time.Millisecond,
LockTimeout: 100 * time.Millisecond,
Concurrency: 10,
}

Expand All @@ -43,7 +43,7 @@ func (s WorkspaceIntegrationSuite) TestLocalDocker(t *testing.T) {
DefaultCollation: "latin1_swedish_ci",
DefaultConnParams: "wait_timeout=123",
RootPassword: "",
LockWaitTimeout: 100 * time.Millisecond,
LockTimeout: 100 * time.Millisecond,
Concurrency: 10,
}

Expand Down Expand Up @@ -77,7 +77,7 @@ func (s WorkspaceIntegrationSuite) TestLocalDockerShutdown(t *testing.T) {
DefaultCharacterSet: "latin1",
DefaultCollation: "latin1_swedish_ci",
RootPassword: "",
LockWaitTimeout: 100 * time.Millisecond,
LockTimeout: 100 * time.Millisecond,
Concurrency: 10,
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (s WorkspaceIntegrationSuite) TestLocalDockerConnParams(t *testing.T) {
DefaultCollation: "latin1_swedish_ci",
DefaultConnParams: "wait_timeout=123",
RootPassword: "",
LockWaitTimeout: 100 * time.Millisecond,
LockTimeout: 100 * time.Millisecond,
Concurrency: 10,
}

Expand Down
49 changes: 35 additions & 14 deletions internal/workspace/tempschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package workspace
import (
"errors"
"fmt"
"os"
"strings"

"github.com/jmoiron/sqlx"
"github.com/skeema/skeema/internal/tengo"
Expand All @@ -18,6 +20,7 @@ type TempSchema struct {
skipBinlog bool
inst *tengo.Instance
releaseLock releaseFunc
mdlTimeout int // metadata lock wait timeout, in seconds; 0 for session default
}

// NewTempSchema creates a temporary schema on the supplied instance and returns
Expand All @@ -42,7 +45,7 @@ func NewTempSchema(opts Options) (_ *TempSchema, retErr error) {
}

lockName := fmt.Sprintf("skeema.%s", ts.schemaName)
if ts.releaseLock, err = getLock(ts.inst, lockName, opts.LockWaitTimeout); err != nil {
if ts.releaseLock, err = getLock(ts.inst, lockName, opts.LockTimeout); err != nil {
return nil, fmt.Errorf("Unable to lock temporary schema on %s: %s", ts.inst, err)
}

Expand All @@ -53,6 +56,20 @@ func NewTempSchema(opts Options) (_ *TempSchema, retErr error) {
}
}()

// MySQL 8 extends foreign key metadata locks to the "parent" side of the FK,
// which means the TempSchema may not be fully isolated from non-workspace
// workloads and their own usage of metadata locks. As a result, we must force
// a low lock_wait_timeout on any TempSchema DDL in MySQL 8.
if ts.inst.Flavor().Min(tengo.FlavorMySQL80) {
wantLockWait := 5
if strings.HasSuffix(os.Args[0], ".test") || strings.HasSuffix(os.Args[0], ".test.exe") {
wantLockWait = 2 // use lower value in test suites so MDL-related tests aren't super slow
}
if ts.inst.LockWaitTimeout() > wantLockWait {
ts.mdlTimeout = wantLockWait
}
}

createOpts := tengo.SchemaCreationOptions{
DefaultCharSet: opts.DefaultCharacterSet,
DefaultCollation: opts.DefaultCollation,
Expand All @@ -63,12 +80,7 @@ func NewTempSchema(opts Options) (_ *TempSchema, retErr error) {
} else if has {
// Attempt to drop any tables already present in tempSchema, but fail if
// any of them actually have 1 or more rows
dropOpts := tengo.BulkDropOptions{
MaxConcurrency: ts.concurrency,
OnlyIfEmpty: true,
SkipBinlog: opts.SkipBinlog,
PartitionsFirst: true,
}
dropOpts := ts.bulkDropOptions()
if err := ts.inst.DropTablesInSchema(ts.schemaName, dropOpts); err != nil {
return nil, fmt.Errorf("Cannot drop existing temp schema tables on %s: %s", ts.inst, err)
}
Expand All @@ -84,9 +96,22 @@ func NewTempSchema(opts Options) (_ *TempSchema, retErr error) {
return ts, nil
}

func (ts *TempSchema) bulkDropOptions() tengo.BulkDropOptions {
return tengo.BulkDropOptions{
MaxConcurrency: ts.concurrency,
OnlyIfEmpty: true,
SkipBinlog: ts.skipBinlog,
PartitionsFirst: true,
LockWaitTimeout: ts.mdlTimeout,
}
}

// ConnectionPool returns a connection pool (*sqlx.DB) to the temporary
// workspace schema, using the supplied connection params (which may be blank).
func (ts *TempSchema) ConnectionPool(params string) (*sqlx.DB, error) {
if ts.mdlTimeout > 0 && !strings.Contains(params, "lock_wait_timeout") {
params = strings.TrimLeft(fmt.Sprintf("%s&lock_wait_timeout=%d", params, ts.mdlTimeout), "&")
}
return ts.inst.CachedConnectionPool(ts.schemaName, params)
}

Expand All @@ -108,13 +133,9 @@ func (ts *TempSchema) Cleanup(schema *tengo.Schema) error {
ts.releaseLock = nil
}()

dropOpts := tengo.BulkDropOptions{
MaxConcurrency: ts.concurrency,
OnlyIfEmpty: true,
SkipBinlog: ts.skipBinlog,
PartitionsFirst: true,
Schema: schema, // may be nil, not a problem
}
dropOpts := ts.bulkDropOptions()
dropOpts.Schema = schema // may be nil, not a problem

if ts.keepSchema {
if err := ts.inst.DropTablesInSchema(ts.schemaName, dropOpts); err != nil {
return fmt.Errorf("Cannot drop tables in temporary schema on %s: %s", ts.inst, err)
Expand Down
Loading

0 comments on commit 762da89

Please sign in to comment.