Skip to content

Commit

Permalink
feat: add support for a lazy refresh
Browse files Browse the repository at this point in the history
When creating a Dialer with the WithLazyRefresh option, the connection
info and ephemeral certificate will be refreshed only when the cache
certificate has expired. No background goroutines run with this option,
making it ideal for use in Cloud Run and other serverless environments
where the CPU may be throttled.
  • Loading branch information
enocom committed Apr 15, 2024
1 parent d326558 commit bc6566f
Show file tree
Hide file tree
Showing 6 changed files with 419 additions and 3 deletions.
24 changes: 21 additions & 3 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ type Dialer struct {
sqladmin *sqladmin.Service
logger debug.Logger

// lazyRefresh determines what kind of caching is used for ephemeral
// certificates. When lazyRefresh is true, the dialer will use a lazy
// cache, refresh certificates only when a connection attempt needs a fresh
// certificate. Otherwise, a refresh ahead cache will be used. The refresh
// ahead cache assumes a background goroutine may run consistently.
lazyRefresh bool

// defaultDialConfig holds the constructor level DialOptions, so that it
// can be copied and mutated by the Dial function.
defaultDialConfig dialConfig
Expand Down Expand Up @@ -221,6 +228,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
d := &Dialer{
closed: make(chan struct{}),
cache: make(map[instance.ConnName]monitoredCache),
lazyRefresh: cfg.lazyRefresh,
key: cfg.rsaKey,
refreshTimeout: cfg.refreshTimeout,
sqladmin: client,
Expand Down Expand Up @@ -470,15 +478,25 @@ func (d *Dialer) connectionInfoCache(
useIAMAuthNDial = *useIAMAuthN
}
d.logger.Debugf("[%v] Connection info added to cache", cn.String())
c = monitoredCache{
connectionInfoCache: cloudsql.NewRefreshAheadCache(
var cache connectionInfoCache
if d.lazyRefresh {
cache = cloudsql.NewLazyRefreshCache(
cn,
d.logger,
d.sqladmin, d.key,
d.refreshTimeout, d.iamTokenSource,
d.dialerID, useIAMAuthNDial,
)
} else {
cache = cloudsql.NewRefreshAheadCache(
cn,
d.logger,
d.sqladmin, d.key,
d.refreshTimeout, d.iamTokenSource,
d.dialerID, useIAMAuthNDial,
),
)
}
c = monitoredCache{connectionInfoCache: cache}
d.cache[cn] = c
}
}
Expand Down
35 changes: 35 additions & 0 deletions dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,38 @@ func TestDialerCloseReportsFriendlyError(t *testing.T) {
t.Fatalf("want = %v, got = %v", ErrDialerClosed, err)
}
}

func TestDialerInitializesLazyCache(t *testing.T) {
cn, _ := instance.ParseConnName("my-project:my-region:my-instance")
inst := mock.NewFakeCSQLInstance(
cn.Project(), cn.Region(), cn.Name(),
)
d := setupDialer(t, setupConfig{
testInstance: inst,
reqs: []*mock.Request{
mock.InstanceGetSuccess(inst, 1),
mock.CreateEphemeralSuccess(inst, 1),
},
dialerOptions: []Option{
WithTokenSource(mock.EmptyTokenSource{}),
WithLazyRefresh(),
},
})

// Initialize the connection info cache
_, err := d.Dial(context.Background(), inst.String())
if err != nil {
t.Fatal(err)
}

c, ok := d.cache[cn]
if !ok {
t.Fatal("cache was not populated")
}
switch tt := c.connectionInfoCache.(type) {
case *cloudsql.LazyRefreshCache:
// Pass -- the cache was initialized with the correct type
default:
t.Fatalf("dialer was initialized with non-lazy type: %T", tt)
}
}
41 changes: 41 additions & 0 deletions e2e_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,47 @@ func TestPostgresConnectWithIAMUser(t *testing.T) {
t.Log(now)
}

func TestPostgresConnectWithLazyRefresh(t *testing.T) {
if testing.Short() {
t.Skip("skipping Postgres integration tests")
}
requirePostgresVars(t)

ctx := context.Background()

// password is intentionally blank
dsn := fmt.Sprintf("user=%s password=\"\" dbname=%s sslmode=disable", postgresUserIAM, postgresDB)
config, err := pgx.ParseConfig(dsn)
if err != nil {
t.Fatalf("failed to parse pgx config: %v", err)
}
d, err := cloudsqlconn.NewDialer(
ctx,
cloudsqlconn.WithLazyRefresh(),
cloudsqlconn.WithIAMAuthN(), // use IAM AuthN to exercise all paths
)
if err != nil {
t.Fatalf("failed to initiate Dialer: %v", err)
}
defer d.Close()
config.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) {
return d.Dial(ctx, postgresConnName)
}

conn, connErr := pgx.ConnectConfig(ctx, config)
if connErr != nil {
t.Fatalf("failed to connect: %s", connErr)
}
defer conn.Close(ctx)

var now time.Time
err = conn.QueryRow(context.Background(), "SELECT NOW()").Scan(&now)
if err != nil {
t.Fatalf("QueryRow failed: %s", err)
}
t.Log(now)
}

func TestPostgresEngineVersion(t *testing.T) {
if testing.Short() {
t.Skip("skipping Postgres integration tests")
Expand Down
139 changes: 139 additions & 0 deletions internal/cloudsql/lazy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudsql

import (
"context"
"crypto/rsa"
"sync"
"time"

"cloud.google.com/go/cloudsqlconn/debug"
"cloud.google.com/go/cloudsqlconn/instance"
"golang.org/x/oauth2"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
)

// LazyRefreshCache is caches connection info and refreshes the cache only when
// a caller requests connection info and the current certificate is expired.
type LazyRefreshCache struct {
connName instance.ConnName
logger debug.Logger
key *rsa.PrivateKey
r refresher
mu sync.Mutex
useIAMAuthNDial bool
needsRefresh bool
cached ConnectionInfo
}

// NewLazyRefreshCache initializes a new LazyRefreshCache.
func NewLazyRefreshCache(
cn instance.ConnName,
l debug.Logger,
client *sqladmin.Service,
key *rsa.PrivateKey,
_ time.Duration,
ts oauth2.TokenSource,
dialerID string,
useIAMAuthNDial bool,
) *LazyRefreshCache {
return &LazyRefreshCache{
connName: cn,
logger: l,
key: key,
r: newRefresher(
l,
client,
ts,
dialerID,
),
useIAMAuthNDial: useIAMAuthNDial,
}
}

// ConnectionInfo returns connection info for the associated instance. New
// connection info is retrieved under two conditions:
// - the current connection info's certificate has expired, or
// - a caller has separately called ForceRefresh
func (c *LazyRefreshCache) ConnectionInfo(
ctx context.Context,
) (ConnectionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
// strip monotonic clock with UTC()
now := time.Now().UTC()
// Pad expiration with a buffer to give the client plenty of time to
// establish a connection to the server with the certificate.
exp := c.cached.Expiration.UTC().Add(-refreshBuffer)
if !c.needsRefresh && now.Before(exp) {
c.logger.Debugf(
"[%v] Connection info is still valid, using cached info",
c.connName.String(),
)
return c.cached, nil
}

c.logger.Debugf(
"[%v] Connection info refresh operation started",
c.connName.String(),
)
ci, err := c.r.ConnectionInfo(ctx, c.connName, c.key, c.useIAMAuthNDial)
if err != nil {
c.logger.Debugf(
"[%v] Connection info refresh operation failed, err = %v",
c.connName.String(),
err,
)
return ConnectionInfo{}, err
}
c.logger.Debugf(
"[%v] Connection info refresh operation complete",
c.connName.String(),
)
c.logger.Debugf(
"[%v] Current certificate expiration = %v",
c.connName.String(),
ci.Expiration.UTC().Format(time.RFC3339),
)
c.cached = ci
c.needsRefresh = false
return ci, nil
}

// UpdateRefresh updates the refresh operation to either enable or disable IAM
// authentication for the cached connection info.
func (c *LazyRefreshCache) UpdateRefresh(useIAMAuthNDial *bool) {
c.mu.Lock()
defer c.mu.Unlock()
if useIAMAuthNDial != nil && *useIAMAuthNDial != c.useIAMAuthNDial {
c.useIAMAuthNDial = *useIAMAuthNDial
c.needsRefresh = true
}
}

// ForceRefresh invalidates the caches and configures the next call to
// ConnectionInfo to retrieve a fresh connection info.
func (c *LazyRefreshCache) ForceRefresh() {
c.mu.Lock()
defer c.mu.Unlock()
c.needsRefresh = true
}

// Close is a no-op and provided purely for a consistent interface with other
// caching types.
func (c *LazyRefreshCache) Close() error {
return nil
}
Loading

0 comments on commit bc6566f

Please sign in to comment.