From e47e145daaa7e5f112574a149aececed80beb804 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 1 Apr 2022 15:58:12 +0200 Subject: [PATCH] [v8] backport #11386 #11387 (in-memory cache and sqlite sync) (#11659) * Always use in-memory caches (#11386) * Always use in-memory caches This also cleans up now-useless fields and constants related to on-disk caches. * Remove the cache tombstone mechanism As we're never reopening the same cache backend twice, this is no longer useful. * Warn if a cache directory exists on disk We can't remove it automatically because we might be in the middle of an upgrade with a old version of Teleport still running. * Default to synchronous FULL for sqlite (#11387) --- integration/helpers.go | 1 - lib/backend/lite/lite.go | 4 +- lib/cache/cache.go | 51 -------------------- lib/cache/cache_test.go | 81 -------------------------------- lib/config/configuration.go | 8 +++- lib/config/configuration_test.go | 12 ++--- lib/config/fileconf.go | 1 - lib/defaults/defaults.go | 6 --- lib/service/cfg.go | 15 +----- lib/service/service.go | 53 +++++---------------- 10 files changed, 28 insertions(+), 204 deletions(-) diff --git a/integration/helpers.go b/integration/helpers.go index c4605664531ff..9af6c95c0e5f0 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -89,7 +89,6 @@ func SetTestTimeouts(t time.Duration) { defaults.ResyncInterval = t defaults.SessionRefreshPeriod = t defaults.HeartbeatCheckPeriod = t - defaults.CachePollPeriod = t } // TeleInstance represents an in-memory instance of a teleport diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index 0aced3788b1d6..790f81ef31d66 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -45,8 +45,6 @@ const ( // AlternativeName is another name of this backend. AlternativeName = "dir" - // SyncOff disables file system sync after writing. - SyncOff = "OFF" // SyncFull fsyncs the database file on disk after every write. SyncFull = "FULL" @@ -66,7 +64,7 @@ const ( slowTransactionThreshold = time.Second // defaultSync is the default value for Sync - defaultSync = SyncOff + defaultSync = SyncFull // defaultBusyTimeout is the default value for BusyTimeout, in ms defaultBusyTimeout = 10000 diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 456434df67d94..49eb10abe4239 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -60,10 +60,6 @@ var ( cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived} ) -func tombstoneKey() []byte { - return backend.Key("cache", teleport.Version, "tombstone", "ok") -} - // ForAuth sets up watch configuration for the auth server func ForAuth(cfg Config) Config { cfg.target = "auth" @@ -606,9 +602,6 @@ const ( WatcherStarted = "watcher_started" // WatcherFailed is emitted when event watcher has failed WatcherFailed = "watcher_failed" - // TombstoneWritten is emitted if cache is closed in a healthy - // state and successfully writes its tombstone. - TombstoneWritten = "tombstone_written" // Reloading is emitted when an error occurred watching events // and the cache is waiting to create a new watcher Reloading = "reloading_cache" @@ -674,28 +667,6 @@ func New(config Config) (*Cache, error) { } cs.collections = collections - // if the ok tombstone is present, set the initial read state of the cache - // to ok. this tombstone's presence indicates that we are dealing with an - // on-disk cache produced by the same teleport version which gracefully shutdown - // while in an ok state. We delete the tombstone rather than check for its - // presence to ensure self-healing in the event that the tombstone wasn't actually - // valid. Note that setting the cache's read state to ok does not cause us to skip - // our normal init logic, it just means that reads against the local cache will - // be allowed in the event the init step fails before it starts applying. - // Note also that we aren't setting our event fanout system to an initialized state - // or incrementing the generation counter; this cache isn't so much "healthy" as it is - // "slightly preferable to an unreachable auth server". - err = cs.wrapper.Delete(ctx, tombstoneKey()) - switch { - case err == nil: - cs.setReadOK(true) - case trace.IsNotFound(err): - // do nothing - default: - cs.Close() - return nil, trace.Wrap(err) - } - retry, err := utils.NewLinear(utils.LinearConfig{ First: utils.HalfJitter(cs.MaxRetryPeriod / 10), Step: cs.MaxRetryPeriod / 5, @@ -749,10 +720,6 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) { c.Debugf("Cache is closing, returning from update loop.") // ensure that close operations have been run c.Close() - // run tombstone operations in an orphaned context - tombCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - c.writeTombstone(tombCtx) }() timer := time.NewTimer(c.Config.WatcherInitTimeout) for { @@ -785,24 +752,6 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) { } } -// writeTombstone writes the cache tombstone. -func (c *Cache) writeTombstone(ctx context.Context) { - if !c.getReadOK() || c.generation.Load() == 0 { - // state is unhealthy or was loaded from a previously - // entombed state; do nothing. - return - } - item := backend.Item{ - Key: tombstoneKey(), - Value: []byte("{}"), - } - if _, err := c.wrapper.Create(ctx, item); err != nil { - c.Warningf("Failed to set tombstone: %v", err) - } else { - c.notify(ctx, Event{Type: TombstoneWritten}) - } -} - func (c *Cache) notify(ctx context.Context, event Event) { if c.EventsC == nil { return diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 660a23ec6ca8c..2b58a35c5b601 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -520,87 +520,6 @@ func TestCompletenessReset(t *testing.T) { } } -// TestTombstones verifies that healthy caches leave tombstones -// on closure, giving new caches the ability to start from a known -// good state if the origin state is unavailable. -func TestTombstones(t *testing.T) { - ctx := context.Background() - const caCount = 10 - p := newTestPackWithoutCache(t) - t.Cleanup(p.Close) - - // put lots of CAs in the backend - for i := 0; i < caCount; i++ { - ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i)) - require.NoError(t, p.trustS.UpsertCertAuthority(ca)) - } - - var err error - p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - Apps: p.apps, - Databases: p.databases, - WindowsDesktops: p.windowsDesktops, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, - })) - require.NoError(t, err) - - // verify that CAs are immediately available - cas, err := p.cache.GetCertAuthorities(ctx, types.UserCA, false) - require.NoError(t, err) - require.Len(t, cas, caCount) - - require.NoError(t, p.cache.Close()) - // wait for TombstoneWritten, ignoring all other event types - waitForEvent(t, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed) - // simulate bad connection to auth server - p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) - p.eventsS.closeWatchers() - - p.cache, err = New(ForAuth(Config{ - Context: ctx, - Backend: p.cacheBackend, - Events: p.eventsS, - ClusterConfig: p.clusterConfigS, - Provisioner: p.provisionerS, - Trust: p.trustS, - Users: p.usersS, - Access: p.accessS, - DynamicAccess: p.dynamicAccessS, - Presence: p.presenceS, - AppSession: p.appSessionS, - WebSession: p.webSessionS, - WebToken: p.webTokenS, - Restrictions: p.restrictions, - Apps: p.apps, - Databases: p.databases, - WindowsDesktops: p.windowsDesktops, - MaxRetryPeriod: 200 * time.Millisecond, - EventsC: p.eventsC, - })) - require.NoError(t, err) - - // verify that CAs are immediately available despite the fact - // that the origin state was never available. - cas, err = p.cache.GetCertAuthorities(ctx, types.UserCA, false) - require.NoError(t, err) - require.Len(t, cas, caCount) -} - // TestInitStrategy verifies that cache uses expected init strategy // of serving backend state when init is taking too long. func TestInitStrategy(t *testing.T) { diff --git a/lib/config/configuration.go b/lib/config/configuration.go index f00d5f5a1f3c9..c1b3aca089193 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -49,6 +49,7 @@ import ( "github.com/gravitational/teleport/lib" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/lite" + "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/limiter" @@ -290,7 +291,12 @@ func ApplyFileConfig(fc *FileConfig, cfg *service.Config) error { } if fc.CachePolicy.TTL != "" { - log.Warnf("cache.ttl config option is deprecated and will be ignored, caches no longer attempt to anticipate resource expiration.") + log.Warn("cache.ttl config option is deprecated and will be ignored, caches no longer attempt to anticipate resource expiration.") + } + if fc.CachePolicy.Type == memory.GetName() { + log.Debugf("cache.type config option is explicitly set to %v.", memory.GetName()) + } else if fc.CachePolicy.Type != "" { + log.Warn("cache.type config option is deprecated and will be ignored, caches are always in memory in this version.") } // apply cache policy for node and proxy diff --git a/lib/config/configuration_test.go b/lib/config/configuration_test.go index 8df9e8128528e..dcd8d590ea5a7 100644 --- a/lib/config/configuration_test.go +++ b/lib/config/configuration_test.go @@ -38,7 +38,6 @@ import ( "github.com/gravitational/teleport/lib" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/lite" - "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/limiter" @@ -941,12 +940,11 @@ func TestParseCachePolicy(t *testing.T) { out *service.CachePolicy err error }{ - {in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, Type: lite.GetName()}}, - {in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, Type: lite.GetName()}}, - {in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, Type: memory.GetName()}}, - {in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, Type: memory.GetName()}}, - {in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Type: lite.GetName(), Enabled: false}}, - {in: &CachePolicy{Type: "memsql"}, err: trace.BadParameter("unsupported backend")}, + {in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true}}, + {in: &CachePolicy{EnabledFlag: "true", TTL: "10h"}, out: &service.CachePolicy{Enabled: true}}, + {in: &CachePolicy{Type: "whatever", EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false}}, + {in: &CachePolicy{Type: "name", EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true}}, + {in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Enabled: false}}, } for i, tc := range tcs { comment := fmt.Sprintf("test case #%v", i) diff --git a/lib/config/fileconf.go b/lib/config/fileconf.go index 3ff13c096c1e9..649efde3c9f69 100644 --- a/lib/config/fileconf.go +++ b/lib/config/fileconf.go @@ -437,7 +437,6 @@ func (c *CachePolicy) Enabled() bool { // Parse parses cache policy from Teleport config func (c *CachePolicy) Parse() (*service.CachePolicy, error) { out := service.CachePolicy{ - Type: c.Type, Enabled: c.Enabled(), } if err := out.CheckAndSetDefaults(); err != nil { diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index b3ee2c4b1b27c..01de84baf19bb 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -372,12 +372,6 @@ var ( // TopRequestsCapacity sets up default top requests capacity TopRequestsCapacity = 128 - // CachePollPeriod is a period for cache internal events polling, - // used in cases when cache is being used to subscribe for events - // and this parameter controls how often cache checks for new events - // to arrive - CachePollPeriod = 500 * time.Millisecond - // AuthQueueSize is auth service queue size AuthQueueSize = 8192 diff --git a/lib/service/cfg.go b/lib/service/cfg.go index d15cb62c20532..1eca9813fc352 100644 --- a/lib/service/cfg.go +++ b/lib/service/cfg.go @@ -42,7 +42,6 @@ import ( "github.com/gravitational/teleport/lib/auth/keystore" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/lite" - "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/bpf" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" @@ -307,31 +306,21 @@ func (cfg *Config) DebugDumpToYAML() string { // CachePolicy sets caching policy for proxies and nodes type CachePolicy struct { - // Type sets the cache type - Type string // Enabled enables or disables caching Enabled bool } // CheckAndSetDefaults checks and sets default values func (c *CachePolicy) CheckAndSetDefaults() error { - switch c.Type { - case "", lite.GetName(): - c.Type = lite.GetName() - case memory.GetName(): - default: - return trace.BadParameter("unsupported cache type %q, supported values are %q and %q", - c.Type, lite.GetName(), memory.GetName()) - } return nil } // String returns human-friendly representation of the policy func (c CachePolicy) String() string { if !c.Enabled { - return "no cache policy" + return "no cache" } - return fmt.Sprintf("%v cache will store frequently accessed items", c.Type) + return "in-memory cache" } // ProxyConfig specifies configuration for proxy service diff --git a/lib/service/service.go b/lib/service/service.go index 70995a89c7a2b..d84654b4b26e7 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -631,6 +631,10 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { } } + if fi, err := os.Stat(filepath.Join(cfg.DataDir, "cache")); err == nil && fi.IsDir() { + cfg.Log.Warnf("An old cache directory exists at %q. It can be safely deleted after ensuring that no other Teleport instance is running.", filepath.Join(cfg.DataDir, "cache")) + } + if len(cfg.FileDescriptors) == 0 { cfg.FileDescriptors, err = importFileDescriptors(cfg.Log) if err != nil { @@ -1308,7 +1312,6 @@ func (process *TeleportProcess) initAuthService() error { services: authServer.Services, setup: cache.ForAuth, cacheName: []string{teleport.ComponentAuth}, - inMemory: true, events: true, }) if err != nil { @@ -1528,13 +1531,8 @@ type accessCacheConfig struct { setup cache.SetupConfigFn // cacheName is a cache name cacheName []string - // inMemory is true if cache - // should use memory - inMemory bool // events is true if cache should turn on events events bool - // pollPeriod contains period for polling - pollPeriod time.Duration } func (c *accessCacheConfig) CheckAndSetDefaults() error { @@ -1547,9 +1545,6 @@ func (c *accessCacheConfig) CheckAndSetDefaults() error { if len(c.cacheName) == 0 { return trace.BadParameter("missing parameter cacheName") } - if c.pollPeriod == 0 { - c.pollPeriod = defaults.CachePollPeriod - } return nil } @@ -1558,39 +1553,18 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - var cacheBackend backend.Backend - if cfg.inMemory { - process.log.Debugf("Creating in-memory backend for %v.", cfg.cacheName) - mem, err := memory.New(memory.Config{ - Context: process.ExitContext(), - EventsOff: !cfg.events, - Mirror: true, - }) - if err != nil { - return nil, trace.Wrap(err) - } - cacheBackend = mem - } else { - process.log.Debugf("Creating sqlite backend for %v.", cfg.cacheName) - path := filepath.Join(append([]string{process.Config.DataDir, "cache"}, cfg.cacheName...)...) - if err := os.MkdirAll(path, teleport.SharedDirMode); err != nil { - return nil, trace.ConvertSystemError(err) - } - liteBackend, err := lite.NewWithConfig(process.ExitContext(), - lite.Config{ - Path: path, - EventsOff: !cfg.events, - Mirror: true, - PollStreamPeriod: 100 * time.Millisecond, - }) - if err != nil { - return nil, trace.Wrap(err) - } - cacheBackend = liteBackend + process.log.Debugf("Creating in-memory backend for %v.", cfg.cacheName) + mem, err := memory.New(memory.Config{ + Context: process.ExitContext(), + EventsOff: !cfg.events, + Mirror: true, + }) + if err != nil { + return nil, trace.Wrap(err) } reporter, err := backend.NewReporter(backend.ReporterConfig{ Component: teleport.ComponentCache, - Backend: cacheBackend, + Backend: mem, }) if err != nil { return nil, trace.Wrap(err) @@ -1745,7 +1719,6 @@ func (process *TeleportProcess) newLocalCacheForWindowsDesktop(clt auth.ClientI, // newLocalCache returns new instance of access point func (process *TeleportProcess) newLocalCache(clt auth.ClientI, setupConfig cache.SetupConfigFn, cacheName []string) (*cache.Cache, error) { return process.newAccessCache(accessCacheConfig{ - inMemory: process.Config.CachePolicy.Type == memory.GetName(), services: clt, setup: setupConfig, cacheName: cacheName,