Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: limit calls to SQL Admin API on startup #1723

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type Config struct {
// CredentialsJSON is a JSON representation of the service account key.
CredentialsJSON string

// GcloudAuth set whether to use Gcloud's config helper to retrieve a
// GcloudAuth set whether to use gcloud's config helper to retrieve a
// token for authentication.
GcloudAuth bool

Expand All @@ -133,7 +133,7 @@ type Config struct {
// increments from this value.
Port int

// APIEndpointURL is the URL of the google cloud sql api. When left blank,
// APIEndpointURL is the URL of the Google Cloud SQL Admin API. When left blank,
// the proxy will use the main public api: https://sqladmin.googleapis.com/
APIEndpointURL string

Expand Down Expand Up @@ -204,7 +204,7 @@ type Config struct {
// Prometheus enables a Prometheus endpoint served at the address and
// port specified by HTTPAddress and HTTPPort.
Prometheus bool
// PrometheusNamespace configures the namespace underwhich metrics are written.
// PrometheusNamespace configures the namespace under which metrics are written.
PrometheusNamespace string

// HealthCheck enables a health check server. It's address and port are
Expand Down Expand Up @@ -479,18 +479,13 @@ func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf *

for _, inst := range conf.Instances {
// Initiate refresh operation and warm the cache.
go func(name string) { d.EngineVersion(ctx, name) }(inst.Name)
go func(name string) { _, _ = d.EngineVersion(ctx, name) }(inst.Name)
}

var mnts []*socketMount
pc := newPortConfig(conf.Port)
for _, inst := range conf.Instances {
version, err := d.EngineVersion(ctx, inst.Name)
if err != nil {
return nil, err
}

m, err := newSocketMount(ctx, conf, pc, inst, version)
m, err := c.newSocketMount(ctx, conf, pc, inst)
if err != nil {
for _, m := range mnts {
mErr := m.Close()
Expand Down Expand Up @@ -609,7 +604,7 @@ func (m MultiErr) Error() string {
return strings.Join(errs, ", ")
}

// Close triggers the proxyClient to shutdown.
// Close triggers the proxyClient to shut down.
func (c *Client) Close() error {
mnts := c.mnts

Expand Down Expand Up @@ -704,7 +699,7 @@ func (c *Client) serveSocketMount(_ context.Context, s *socketMount) error {
sConn, err := c.dialer.Dial(ctx, s.inst, s.dialOpts...)
if err != nil {
c.logger.Errorf("[%s] failed to connect to instance: %v", s.inst, err)
cConn.Close()
_ = cConn.Close()
return
}
c.proxyConn(s.inst, cConn, sConn)
Expand All @@ -719,7 +714,7 @@ type socketMount struct {
dialOpts []cloudsqlconn.DialOption
}

func newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst InstanceConnConfig, version string) (*socketMount, error) {
func (c *Client) newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst InstanceConnConfig) (*socketMount, error) {
var (
// network is one of "tcp" or "unix"
network string
Expand Down Expand Up @@ -753,13 +748,24 @@ func newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst Inst
case conf.Port != 0:
np = pc.nextPort()
default:
version, err := c.dialer.EngineVersion(ctx, inst.Name)
if err != nil {
c.logger.Errorf("could not resolve version for %q: %v", inst.Name, err)
return nil, err
}
np = pc.nextDBPort(version)
}

address = net.JoinHostPort(a, fmt.Sprint(np))
} else {
network = "unix"

version, err := c.dialer.EngineVersion(ctx, inst.Name)
if err != nil {
c.logger.Errorf("could not resolve version for %q: %v", inst.Name, err)
return nil, err
}

address, err = newUnixSocketMount(inst, conf.UnixSocket, strings.HasPrefix(version, "POSTGRES"))
if err != nil {
return nil, err
Expand All @@ -771,7 +777,7 @@ func newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst Inst
if err != nil {
return nil, err
}
// Change file permisions to allow access for user, group, and other.
// Change file permissions to allow access for user, group, and other.
if network == "unix" {
// Best effort. If this call fails, group and other won't have write
// access.
Expand All @@ -785,7 +791,6 @@ func newSocketMount(ctx context.Context, conf *Config, pc *portConfig, inst Inst
// newUnixSocketMount parses the configuration and returns the path to the unix
// socket, or an error if that path is not valid.
func newUnixSocketMount(inst InstanceConnConfig, unixSocketDir string, postgres bool) (string, error) {

var (
// the path to the unix socket
address string
Expand Down Expand Up @@ -831,7 +836,6 @@ func newUnixSocketMount(inst InstanceConnConfig, unixSocketDir string, postgres
}

return address, nil

}

func (s *socketMount) Addr() net.Addr {
Expand All @@ -842,7 +846,7 @@ func (s *socketMount) Accept() (net.Conn, error) {
return s.listener.Accept()
}

// close stops the mount from listening for any more connections
// Close stops the mount from listening for any more connections
func (s *socketMount) Close() error {
return s.listener.Close()
}
Expand All @@ -853,8 +857,8 @@ func (c *Client) proxyConn(inst string, client, server net.Conn) {
var o sync.Once
cleanup := func(errDesc string, isErr bool) {
o.Do(func() {
client.Close()
server.Close()
_ = client.Close()
_ = server.Close()
if isErr {
c.logger.Errorf(errDesc)
} else {
Expand Down
10 changes: 2 additions & 8 deletions internal/proxy/proxy_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,9 @@ func (c *Client) Lookup(ctx context.Context, instance string, _ *fuse.EntryOut)
return l.symlink.EmbeddedInode(), fs.OK
}

version, err := c.dialer.EngineVersion(ctx, instance)
if err != nil {
c.logger.Errorf("could not resolve version for %q: %v", instance, err)
return nil, syscall.ENOENT
}

s, err := newSocketMount(
s, err := c.newSocketMount(
ctx, &Config{UnixSocket: c.fuseTempDir},
nil, InstanceConnConfig{Name: instance}, version,
nil, InstanceConnConfig{Name: instance},
)
if err != nil {
c.logger.Errorf("could not create socket for %q: %v", instance, err)
Expand Down