Skip to content

Commit

Permalink
[v8] backport #11386 #11387 (in-memory cache and sqlite sync) (#11659)
Browse files Browse the repository at this point in the history
* Always use in-memory caches (#11386)

* Always use in-memory caches

This also cleans up now-useless fields and constants related to on-disk
caches.

* Remove the cache tombstone mechanism

As we're never reopening the same cache backend twice, this is no longer
useful.

* Warn if a cache directory exists on disk

We can't remove it automatically because we might be in the middle of an
upgrade with a old version of Teleport still running.

* Default to synchronous FULL for sqlite (#11387)
  • Loading branch information
espadolini authored Apr 1, 2022
1 parent 1b30df6 commit e47e145
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 204 deletions.
1 change: 0 additions & 1 deletion integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func SetTestTimeouts(t time.Duration) {
defaults.ResyncInterval = t
defaults.SessionRefreshPeriod = t
defaults.HeartbeatCheckPeriod = t
defaults.CachePollPeriod = t
}

// TeleInstance represents an in-memory instance of a teleport
Expand Down
4 changes: 1 addition & 3 deletions lib/backend/lite/lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ const (
// AlternativeName is another name of this backend.
AlternativeName = "dir"

// SyncOff disables file system sync after writing.
SyncOff = "OFF"
// SyncFull fsyncs the database file on disk after every write.
SyncFull = "FULL"

Expand All @@ -66,7 +64,7 @@ const (
slowTransactionThreshold = time.Second

// defaultSync is the default value for Sync
defaultSync = SyncOff
defaultSync = SyncFull

// defaultBusyTimeout is the default value for BusyTimeout, in ms
defaultBusyTimeout = 10000
Expand Down
51 changes: 0 additions & 51 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ var (
cacheCollectors = []prometheus.Collector{cacheEventsReceived, cacheStaleEventsReceived}
)

func tombstoneKey() []byte {
return backend.Key("cache", teleport.Version, "tombstone", "ok")
}

// ForAuth sets up watch configuration for the auth server
func ForAuth(cfg Config) Config {
cfg.target = "auth"
Expand Down Expand Up @@ -606,9 +602,6 @@ const (
WatcherStarted = "watcher_started"
// WatcherFailed is emitted when event watcher has failed
WatcherFailed = "watcher_failed"
// TombstoneWritten is emitted if cache is closed in a healthy
// state and successfully writes its tombstone.
TombstoneWritten = "tombstone_written"
// Reloading is emitted when an error occurred watching events
// and the cache is waiting to create a new watcher
Reloading = "reloading_cache"
Expand Down Expand Up @@ -674,28 +667,6 @@ func New(config Config) (*Cache, error) {
}
cs.collections = collections

// if the ok tombstone is present, set the initial read state of the cache
// to ok. this tombstone's presence indicates that we are dealing with an
// on-disk cache produced by the same teleport version which gracefully shutdown
// while in an ok state. We delete the tombstone rather than check for its
// presence to ensure self-healing in the event that the tombstone wasn't actually
// valid. Note that setting the cache's read state to ok does not cause us to skip
// our normal init logic, it just means that reads against the local cache will
// be allowed in the event the init step fails before it starts applying.
// Note also that we aren't setting our event fanout system to an initialized state
// or incrementing the generation counter; this cache isn't so much "healthy" as it is
// "slightly preferable to an unreachable auth server".
err = cs.wrapper.Delete(ctx, tombstoneKey())
switch {
case err == nil:
cs.setReadOK(true)
case trace.IsNotFound(err):
// do nothing
default:
cs.Close()
return nil, trace.Wrap(err)
}

retry, err := utils.NewLinear(utils.LinearConfig{
First: utils.HalfJitter(cs.MaxRetryPeriod / 10),
Step: cs.MaxRetryPeriod / 5,
Expand Down Expand Up @@ -749,10 +720,6 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) {
c.Debugf("Cache is closing, returning from update loop.")
// ensure that close operations have been run
c.Close()
// run tombstone operations in an orphaned context
tombCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
c.writeTombstone(tombCtx)
}()
timer := time.NewTimer(c.Config.WatcherInitTimeout)
for {
Expand Down Expand Up @@ -785,24 +752,6 @@ func (c *Cache) update(ctx context.Context, retry utils.Retry) {
}
}

// writeTombstone writes the cache tombstone.
func (c *Cache) writeTombstone(ctx context.Context) {
if !c.getReadOK() || c.generation.Load() == 0 {
// state is unhealthy or was loaded from a previously
// entombed state; do nothing.
return
}
item := backend.Item{
Key: tombstoneKey(),
Value: []byte("{}"),
}
if _, err := c.wrapper.Create(ctx, item); err != nil {
c.Warningf("Failed to set tombstone: %v", err)
} else {
c.notify(ctx, Event{Type: TombstoneWritten})
}
}

func (c *Cache) notify(ctx context.Context, event Event) {
if c.EventsC == nil {
return
Expand Down
81 changes: 0 additions & 81 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,87 +520,6 @@ func TestCompletenessReset(t *testing.T) {
}
}

// TestTombstones verifies that healthy caches leave tombstones
// on closure, giving new caches the ability to start from a known
// good state if the origin state is unavailable.
func TestTombstones(t *testing.T) {
ctx := context.Background()
const caCount = 10
p := newTestPackWithoutCache(t)
t.Cleanup(p.Close)

// put lots of CAs in the backend
for i := 0; i < caCount; i++ {
ca := suite.NewTestCA(types.UserCA, fmt.Sprintf("%d.example.com", i))
require.NoError(t, p.trustS.UpsertCertAuthority(ca))
}

var err error
p.cache, err = New(ForAuth(Config{
Context: ctx,
Backend: p.cacheBackend,
Events: p.eventsS,
ClusterConfig: p.clusterConfigS,
Provisioner: p.provisionerS,
Trust: p.trustS,
Users: p.usersS,
Access: p.accessS,
DynamicAccess: p.dynamicAccessS,
Presence: p.presenceS,
AppSession: p.appSessionS,
WebSession: p.webSessionS,
WebToken: p.webTokenS,
Restrictions: p.restrictions,
Apps: p.apps,
Databases: p.databases,
WindowsDesktops: p.windowsDesktops,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
require.NoError(t, err)

// verify that CAs are immediately available
cas, err := p.cache.GetCertAuthorities(ctx, types.UserCA, false)
require.NoError(t, err)
require.Len(t, cas, caCount)

require.NoError(t, p.cache.Close())
// wait for TombstoneWritten, ignoring all other event types
waitForEvent(t, p.eventsC, TombstoneWritten, WatcherStarted, EventProcessed, WatcherFailed)
// simulate bad connection to auth server
p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable"))
p.eventsS.closeWatchers()

p.cache, err = New(ForAuth(Config{
Context: ctx,
Backend: p.cacheBackend,
Events: p.eventsS,
ClusterConfig: p.clusterConfigS,
Provisioner: p.provisionerS,
Trust: p.trustS,
Users: p.usersS,
Access: p.accessS,
DynamicAccess: p.dynamicAccessS,
Presence: p.presenceS,
AppSession: p.appSessionS,
WebSession: p.webSessionS,
WebToken: p.webTokenS,
Restrictions: p.restrictions,
Apps: p.apps,
Databases: p.databases,
WindowsDesktops: p.windowsDesktops,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
require.NoError(t, err)

// verify that CAs are immediately available despite the fact
// that the origin state was never available.
cas, err = p.cache.GetCertAuthorities(ctx, types.UserCA, false)
require.NoError(t, err)
require.Len(t, cas, caCount)
}

// TestInitStrategy verifies that cache uses expected init strategy
// of serving backend state when init is taking too long.
func TestInitStrategy(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion lib/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/limiter"
Expand Down Expand Up @@ -290,7 +291,12 @@ func ApplyFileConfig(fc *FileConfig, cfg *service.Config) error {
}

if fc.CachePolicy.TTL != "" {
log.Warnf("cache.ttl config option is deprecated and will be ignored, caches no longer attempt to anticipate resource expiration.")
log.Warn("cache.ttl config option is deprecated and will be ignored, caches no longer attempt to anticipate resource expiration.")
}
if fc.CachePolicy.Type == memory.GetName() {
log.Debugf("cache.type config option is explicitly set to %v.", memory.GetName())
} else if fc.CachePolicy.Type != "" {
log.Warn("cache.type config option is deprecated and will be ignored, caches are always in memory in this version.")
}

// apply cache policy for node and proxy
Expand Down
12 changes: 5 additions & 7 deletions lib/config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/gravitational/teleport/lib"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/limiter"
Expand Down Expand Up @@ -941,12 +940,11 @@ func TestParseCachePolicy(t *testing.T) {
out *service.CachePolicy
err error
}{
{in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, Type: lite.GetName()}},
{in: &CachePolicy{EnabledFlag: "yes", TTL: "10h"}, out: &service.CachePolicy{Enabled: true, Type: lite.GetName()}},
{in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false, Type: memory.GetName()}},
{in: &CachePolicy{Type: memory.GetName(), EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true, Type: memory.GetName()}},
{in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Type: lite.GetName(), Enabled: false}},
{in: &CachePolicy{Type: "memsql"}, err: trace.BadParameter("unsupported backend")},
{in: &CachePolicy{EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true}},
{in: &CachePolicy{EnabledFlag: "true", TTL: "10h"}, out: &service.CachePolicy{Enabled: true}},
{in: &CachePolicy{Type: "whatever", EnabledFlag: "false", TTL: "10h"}, out: &service.CachePolicy{Enabled: false}},
{in: &CachePolicy{Type: "name", EnabledFlag: "yes", TTL: "never"}, out: &service.CachePolicy{Enabled: true}},
{in: &CachePolicy{EnabledFlag: "no"}, out: &service.CachePolicy{Enabled: false}},
}
for i, tc := range tcs {
comment := fmt.Sprintf("test case #%v", i)
Expand Down
1 change: 0 additions & 1 deletion lib/config/fileconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ func (c *CachePolicy) Enabled() bool {
// Parse parses cache policy from Teleport config
func (c *CachePolicy) Parse() (*service.CachePolicy, error) {
out := service.CachePolicy{
Type: c.Type,
Enabled: c.Enabled(),
}
if err := out.CheckAndSetDefaults(); err != nil {
Expand Down
6 changes: 0 additions & 6 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,6 @@ var (
// TopRequestsCapacity sets up default top requests capacity
TopRequestsCapacity = 128

// CachePollPeriod is a period for cache internal events polling,
// used in cases when cache is being used to subscribe for events
// and this parameter controls how often cache checks for new events
// to arrive
CachePollPeriod = 500 * time.Millisecond

// AuthQueueSize is auth service queue size
AuthQueueSize = 8192

Expand Down
15 changes: 2 additions & 13 deletions lib/service/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/gravitational/teleport/lib/auth/keystore"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/bpf"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -307,31 +306,21 @@ func (cfg *Config) DebugDumpToYAML() string {

// CachePolicy sets caching policy for proxies and nodes
type CachePolicy struct {
// Type sets the cache type
Type string
// Enabled enables or disables caching
Enabled bool
}

// CheckAndSetDefaults checks and sets default values
func (c *CachePolicy) CheckAndSetDefaults() error {
switch c.Type {
case "", lite.GetName():
c.Type = lite.GetName()
case memory.GetName():
default:
return trace.BadParameter("unsupported cache type %q, supported values are %q and %q",
c.Type, lite.GetName(), memory.GetName())
}
return nil
}

// String returns human-friendly representation of the policy
func (c CachePolicy) String() string {
if !c.Enabled {
return "no cache policy"
return "no cache"
}
return fmt.Sprintf("%v cache will store frequently accessed items", c.Type)
return "in-memory cache"
}

// ProxyConfig specifies configuration for proxy service
Expand Down
Loading

0 comments on commit e47e145

Please sign in to comment.