Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9] backport #10706 (concurrent sqlite access) #11190

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/auth/state_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewProcessStorage(ctx context.Context, path string) (*ProcessStorage, error
litebk, err := lite.NewWithConfig(ctx, lite.Config{
Path: path,
EventsOff: true,
Sync: lite.SyncFull,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
163 changes: 113 additions & 50 deletions lib/backend/lite/lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"net/url"
"os"
"path/filepath"
"runtime/debug"
"strconv"
"sync/atomic"
"time"

Expand All @@ -39,15 +40,36 @@ import (
)

const (
// BackendName is the name of this backend
// BackendName is the name of this backend.
BackendName = "sqlite"
// AlternativeName is another name of this backend.
AlternativeName = "dir"
defaultDirMode os.FileMode = 0770
defaultDBFile = "sqlite.db"
slowTransactionThreshold = time.Second
syncOFF = "OFF"
busyTimeout = 10000
AlternativeName = "dir"

// SyncOff disables file system sync after writing.
SyncOff = "OFF"
// SyncFull fsyncs the database file on disk after every write.
SyncFull = "FULL"

// JournalMemory keeps the rollback journal in memory instead of storing it
// on disk.
JournalMemory = "MEMORY"
)

const (
// defaultDirMode is the mode of the newly created directories that are part
// of the Path
defaultDirMode os.FileMode = 0770

// defaultDBFile is the file name of the sqlite db in the directory
// specified by Path
defaultDBFile = "sqlite.db"
slowTransactionThreshold = time.Second

// defaultSync is the default value for Sync
defaultSync = SyncOff

// defaultBusyTimeout is the default value for BusyTimeout, in ms
defaultBusyTimeout = 10000
)

// GetName is a part of backend API and it returns SQLite backend type
Expand All @@ -56,14 +78,6 @@ func GetName() string {
return BackendName
}

func init() {
sql.Register(BackendName, &sqlite3.SQLiteDriver{
ConnectHook: func(conn *sqlite3.SQLiteConn) error {
return nil
},
})
}

// Config structure represents configuration section
type Config struct {
// Path is a path to the database directory
Expand All @@ -77,15 +91,12 @@ type Config struct {
EventsOff bool `json:"events_off,omitempty"`
// Clock allows to override clock used in the backend
Clock clockwork.Clock `json:"-"`
// Sync sets synchronous pragrma
// Sync sets the synchronous pragma
Sync string `json:"sync,omitempty"`
// BusyTimeout sets busy timeout in milliseconds
BusyTimeout int `json:"busy_timeout,omitempty"`
// Memory turns memory mode of the database
Memory bool `json:"memory"`
// MemoryName sets the name of the database,
// set to "sqlite.db" by default
MemoryName string `json:"memory_name"`
// Journal sets the journal_mode pragma
Journal string `json:"journal,omitempty"`
// Mirror turns on mirror mode for the backend,
// which will use record IDs for Put and PutRange passed from
// the resources, not generate a new one
Expand All @@ -95,7 +106,7 @@ type Config struct {
// CheckAndSetDefaults is a helper returns an error if the supplied configuration
// is not enough to connect to sqlite
func (cfg *Config) CheckAndSetDefaults() error {
if cfg.Path == "" && !cfg.Memory {
if cfg.Path == "" {
return trace.BadParameter("specify directory path to the database using 'path' parameter")
}
if cfg.BufferSize == 0 {
Expand All @@ -108,17 +119,69 @@ func (cfg *Config) CheckAndSetDefaults() error {
cfg.Clock = clockwork.NewRealClock()
}
if cfg.Sync == "" {
cfg.Sync = syncOFF
cfg.Sync = defaultSync
}
if cfg.BusyTimeout == 0 {
cfg.BusyTimeout = busyTimeout
}
if cfg.MemoryName == "" {
cfg.MemoryName = defaultDBFile
cfg.BusyTimeout = defaultBusyTimeout
}
return nil
}

// ConnectionURI returns a connection string usable with sqlite according to the
// Config.
func (cfg *Config) ConnectionURI() string {
params := url.Values{}
params.Set("_busy_timeout", strconv.Itoa(cfg.BusyTimeout))
// The _txlock parameter is parsed by go-sqlite to determine if (all)
// transactions should be started with `BEGIN DEFERRED` (the default, same
// as `BEGIN`), `BEGIN IMMEDIATE` or `BEGIN EXCLUSIVE`.
//
// The way we use sqlite relies entirely on the busy timeout handler (also
// configured through the connection URL, with the _busy_timeout parameter)
// to address concurrency problems, and treats any SQLITE_BUSY errors as a
// fatal issue with the database; however, in scenarios with multiple
// readwriters it is possible to still get a busy error even with a generous
// busy timeout handler configured, as two transactions that both start off
// with a SELECT - thus acquiring a SHARED lock, see
// https://www.sqlite.org/lockingv3.html#transaction_control - then attempt
// to upgrade to a RESERVED lock to upsert or delete something can end up
// requiring one of the two transactions to forcibly rollback to avoid a
// deadlock, which is signaled by the sqlite engine with a SQLITE_BUSY error
// returned to one of the two. When that happens, a concurrent-aware program
// can just try the transaction again a few times - making sure to disregard
// what was read before the transaction actually committed.
//
// As we're not really interested in concurrent sqlite access (process
// storage has very little written to, sharing a sqlite database as the
// backend between two auths is not really supported, and caches shouldn't
// ever run on the same underlying sqlite backend) we instead start every
// transaction with `BEGIN IMMEDIATE`, which grabs a RESERVED lock
// immediately (waiting for the busy timeout in case some other connection
// to the database has the lock) at the beginning of the transaction, thus
// avoiding any spurious SQLITE_BUSY error that can happen halfway through a
// transaction.
//
// If we end up requiring better concurrent access to sqlite in the future
// we should consider enabling Write-Ahead Logging mode, to actually allow
// for reads to happen at the same time as writes, adding some amount of
// retries to inTransaction, and double-checking that all uses of it
// correctly handle the possibility of the transaction being restarted.
params.Set("_txlock", "immediate")
if cfg.Sync != "" {
params.Set("_sync", cfg.Sync)
}
if cfg.Journal != "" {
params.Set("_journal", cfg.Journal)
}

u := url.URL{
Scheme: "file",
Opaque: url.QueryEscape(filepath.Join(cfg.Path, defaultDBFile)),
RawQuery: params.Encode(),
}
return u.String()
}

// New returns a new instance of sqlite backend
func New(ctx context.Context, params backend.Params) (*Backend, error) {
var cfg *Config
Expand All @@ -135,23 +198,18 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
var connectorURL string
if !cfg.Memory {
// Ensure that the path to the root directory exists.
err := os.MkdirAll(cfg.Path, defaultDirMode)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
fullPath := filepath.Join(cfg.Path, defaultDBFile)
connectorURL = fmt.Sprintf("file:%v?_busy_timeout=%v&_sync=%v", fullPath, cfg.BusyTimeout, cfg.Sync)
} else {
connectorURL = fmt.Sprintf("file:%v?mode=memory", cfg.MemoryName)
connectionURI := cfg.ConnectionURI()
// Ensure that the path to the root directory exists.
err := os.MkdirAll(cfg.Path, defaultDirMode)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
db, err := sql.Open(BackendName, connectorURL)
db, err := sql.Open("sqlite3", cfg.ConnectionURI())
if err != nil {
return nil, trace.Wrap(err, "error opening URI: %v", connectorURL)
return nil, trace.Wrap(err, "error opening URI: %v", connectionURI)
}
// serialize access to sqlite to avoid database is locked errors
// serialize access to sqlite, as we're using immediate transactions anyway,
// and in-memory go locks are faster than sqlite locks
db.SetMaxOpenConns(1)
buf := backend.NewCircularBuffer(
backend.BufferCapacity(cfg.BufferSize),
Expand All @@ -166,9 +224,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
ctx: closeCtx,
cancel: cancel,
}
l.Debugf("Connected to: %v, poll stream period: %v", connectorURL, cfg.PollStreamPeriod)
l.Debugf("Connected to: %v, poll stream period: %v", connectionURI, cfg.PollStreamPeriod)
if err := l.createSchema(); err != nil {
return nil, trace.Wrap(err, "error creating schema: %v", connectorURL)
return nil, trace.Wrap(err, "error creating schema: %v", connectionURI)
}
if err := l.showPragmas(); err != nil {
l.Warningf("Failed to show pragma settings: %v.", err)
Expand Down Expand Up @@ -199,17 +257,22 @@ type Backend struct {
// parameters, when called, logs some key PRAGMA values
func (l *Backend) showPragmas() error {
return l.inTransaction(l.ctx, func(tx *sql.Tx) error {
row := tx.QueryRowContext(l.ctx, "PRAGMA synchronous;")
var syncValue string
if err := row.Scan(&syncValue); err != nil {
var journalMode string
row := tx.QueryRowContext(l.ctx, "PRAGMA journal_mode;")
if err := row.Scan(&journalMode); err != nil {
return trace.Wrap(err)
}
row = tx.QueryRowContext(l.ctx, "PRAGMA synchronous;")
var synchronous string
if err := row.Scan(&synchronous); err != nil {
return trace.Wrap(err)
}
var timeoutValue string
var busyTimeout string
row = tx.QueryRowContext(l.ctx, "PRAGMA busy_timeout;")
if err := row.Scan(&timeoutValue); err != nil {
if err := row.Scan(&busyTimeout); err != nil {
return trace.Wrap(err)
}
l.Debugf("Synchronous: %v, busy timeout: %v", syncValue, timeoutValue)
l.Debugf("journal_mode=%v, synchronous=%v, busy_timeout=%v", journalMode, synchronous, busyTimeout)
return nil
})
}
Expand Down
56 changes: 0 additions & 56 deletions lib/backend/lite/litemem_test.go

This file was deleted.

19 changes: 12 additions & 7 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,6 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca
lite.Config{
Path: path,
EventsOff: !cfg.events,
Memory: false,
Mirror: true,
PollStreamPeriod: 100 * time.Millisecond,
})
Expand Down Expand Up @@ -3803,12 +3802,18 @@ func (process *TeleportProcess) StartShutdown(ctx context.Context) context.Conte
process.log.Warnf("Error waiting for all services to complete: %v", err)
}
process.log.Debug("All supervisor functions are completed.")
localAuth := process.getLocalAuth()
if localAuth != nil {
if err := process.localAuth.Close(); err != nil {

if localAuth := process.getLocalAuth(); localAuth != nil {
if err := localAuth.Close(); err != nil {
process.log.Warningf("Failed closing auth server: %v.", err)
}
}

if process.storage != nil {
if err := process.storage.Close(); err != nil {
process.log.Warningf("Failed closing process storage: %v.", err)
}
}
}()
go process.printShutdownStatus(localCtx)
return localCtx
Expand All @@ -3830,9 +3835,9 @@ func (process *TeleportProcess) Close() error {
process.Config.Keygen.Close()

var errors []error
localAuth := process.getLocalAuth()
if localAuth != nil {
errors = append(errors, process.localAuth.Close())

if localAuth := process.getLocalAuth(); localAuth != nil {
errors = append(errors, localAuth.Close())
}

if process.storage != nil {
Expand Down