diff --git a/internal/proxy/fuse.go b/internal/proxy/fuse.go index f03125b6f..63aa242dd 100644 --- a/internal/proxy/fuse.go +++ b/internal/proxy/fuse.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !windows +// +build !windows + package proxy import ( diff --git a/internal/proxy/fuse_test.go b/internal/proxy/fuse_test.go index 35e1d851e..fa629af66 100644 --- a/internal/proxy/fuse_test.go +++ b/internal/proxy/fuse_test.go @@ -310,3 +310,11 @@ func TestFUSEClose(t *testing.T) { t.Fatal("net.Dial() should fail") } } + +func TestFUSEWithBadDir(t *testing.T) { + conf := &proxy.Config{FUSEDir: "/not/a/dir", FUSETempDir: randTmpDir(t)} + _, err := proxy.NewClient(context.Background(), &fakeDialer{}, testLogger, conf) + if err == nil { + t.Fatal("proxy client should fail with bad dir") + } +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 7683f1685..834a0077d 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -20,19 +20,15 @@ import ( "io" "net" "os" - "path/filepath" "regexp" "strings" "sync" "sync/atomic" - "syscall" "time" "cloud.google.com/go/cloudsqlconn" "github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/cloudsql" "github.com/GoogleCloudPlatform/cloud-sql-proxy/v2/internal/gcloud" - "github.com/hanwen/go-fuse/v2/fs" - "github.com/hanwen/go-fuse/v2/fuse" "golang.org/x/oauth2" ) @@ -274,11 +270,6 @@ func (c *portConfig) nextDBPort(version string) int { } } -type socketSymlink struct { - socket *socketMount - symlink *symlink -} - // Client proxies connections from a local client to the remote server side // proxy for multiple Cloud SQL instances. type Client struct { @@ -301,22 +292,7 @@ type Client struct { logger cloudsql.Logger - // fuseDir specifies the directory where a FUSE server is mounted. The value - // is empty if FUSE is not enabled. The directory holds symlinks to Unix - // domain sockets in the fuseTmpDir. - fuseDir string - fuseTempDir string - // fuseMu protects access to fuseSockets. - fuseMu sync.Mutex - // fuseSockets is a map of instance connection name to socketMount and - // symlink. - fuseSockets map[string]socketSymlink - fuseServerMu sync.Mutex - fuseServer *fuse.Server - fuseWg sync.WaitGroup - - // Inode adds support for FUSE operations. - fs.Inode + fuseMount } // NewClient completes the initial setup required to get the proxy to a "steady" @@ -343,13 +319,7 @@ func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf * } if conf.FUSEDir != "" { - if err := os.MkdirAll(conf.FUSETempDir, 0777); err != nil { - return nil, err - } - c.fuseDir = conf.FUSEDir - c.fuseTempDir = conf.FUSETempDir - c.fuseSockets = map[string]socketSymlink{} - return c, nil + return configureFUSE(c, conf) } for _, inst := range conf.Instances { @@ -383,86 +353,6 @@ func NewClient(ctx context.Context, d cloudsql.Dialer, l cloudsql.Logger, conf * return c, nil } -// Readdir returns a list of all active Unix sockets in addition to the README. -func (c *Client) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) { - entries := []fuse.DirEntry{ - {Name: "README", Mode: 0555 | fuse.S_IFREG}, - } - var active []string - c.fuseMu.Lock() - for k := range c.fuseSockets { - active = append(active, k) - } - c.fuseMu.Unlock() - - for _, a := range active { - entries = append(entries, fuse.DirEntry{ - Name: a, - Mode: 0777 | syscall.S_IFSOCK, - }) - } - return fs.NewListDirStream(entries), fs.OK -} - -// Lookup implements the fs.NodeLookuper interface and returns an index node -// (inode) for a symlink that points to a Unix domain socket. The Unix domain -// socket is connected to the requested Cloud SQL instance. Lookup returns a -// symlink (instead of the socket itself) so that multiple callers all use the -// same Unix socket. -func (c *Client) Lookup(ctx context.Context, instance string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno) { - if instance == "README" { - return c.NewInode(ctx, &readme{}, fs.StableAttr{}), fs.OK - } - - if _, err := parseConnName(instance); err != nil { - return nil, syscall.ENOENT - } - - c.fuseMu.Lock() - defer c.fuseMu.Unlock() - if l, ok := c.fuseSockets[instance]; ok { - return l.symlink.EmbeddedInode(), fs.OK - } - - version, err := c.dialer.EngineVersion(ctx, instance) - if err != nil { - c.logger.Errorf("could not resolve version for %q: %v", instance, err) - return nil, syscall.ENOENT - } - - s, err := newSocketMount( - ctx, &Config{UnixSocket: c.fuseTempDir}, - nil, InstanceConnConfig{Name: instance}, version, - ) - if err != nil { - c.logger.Errorf("could not create socket for %q: %v", instance, err) - return nil, syscall.ENOENT - } - - c.fuseWg.Add(1) - go func() { - defer c.fuseWg.Done() - sErr := c.serveSocketMount(ctx, s) - if sErr != nil { - c.fuseMu.Lock() - delete(c.fuseSockets, instance) - c.fuseMu.Unlock() - } - }() - - // Return a symlink that points to the actual Unix socket within the - // temporary directory. For Postgres, return a symlink that points to the - // directory which holds the ".s.PGSQL.5432" Unix socket. - sl := &symlink{path: filepath.Join(c.fuseTempDir, instance)} - c.fuseSockets[instance] = socketSymlink{ - socket: s, - symlink: sl, - } - return c.NewInode(ctx, sl, fs.StableAttr{ - Mode: 0777 | fuse.S_IFLNK}, - ), fs.OK -} - // CheckConnections dials each registered instance and reports any errors that // may have occurred. func (c *Client) CheckConnections(ctx context.Context) error { @@ -472,12 +362,7 @@ func (c *Client) CheckConnections(ctx context.Context) error { mnts = c.mnts ) if c.fuseDir != "" { - mnts = []*socketMount{} - c.fuseMu.Lock() - for _, m := range c.fuseSockets { - mnts = append(mnts, m.socket) - } - c.fuseMu.Unlock() + mnts = c.fuseMounts() } for _, mnt := range mnts { wg.Add(1) @@ -524,18 +409,7 @@ func (c *Client) Serve(ctx context.Context, notify func()) error { defer cancel() if c.fuseDir != "" { - srv, err := fs.Mount(c.fuseDir, c, &fs.Options{ - MountOptions: fuse.MountOptions{AllowOther: true}, - }) - if err != nil { - return fmt.Errorf("FUSE mount failed: %q: %v", c.fuseDir, err) - } - c.fuseServerMu.Lock() - c.fuseServer = srv - c.fuseServerMu.Unlock() - notify() - <-ctx.Done() - return ctx.Err() + return c.serveFuse(ctx, notify) } exitCh := make(chan error) @@ -580,21 +454,13 @@ func (m MultiErr) Error() string { func (c *Client) Close() error { mnts := c.mnts - c.fuseServerMu.Lock() - hasFuseServer := c.fuseServer != nil - c.fuseServerMu.Unlock() - var mErr MultiErr - if hasFuseServer { - if err := c.fuseServer.Unmount(); err != nil { + + if c.fuseDir != "" { + if err := c.unmountFUSE(); err != nil { mErr = append(mErr, err) } - mnts = []*socketMount{} - c.fuseMu.Lock() - for _, m := range c.fuseSockets { - mnts = append(mnts, m.socket) - } - c.fuseMu.Unlock() + mnts = c.fuseMounts() } // First, close all open socket listeners to prevent additional connections. @@ -604,8 +470,8 @@ func (c *Client) Close() error { mErr = append(mErr, err) } } - if hasFuseServer { - c.fuseWg.Wait() + if c.fuseDir != "" { + c.waitForFUSEMounts() } // Next, close the dialer to prevent any additional refreshes. cErr := c.dialer.Close() @@ -688,7 +554,6 @@ func (c *Client) serveSocketMount(ctx context.Context, s *socketMount) error { // socketMount is a tcp/unix socket that listens for a Cloud SQL instance. type socketMount struct { - fs.Inode inst string listener net.Listener dialOpts []cloudsqlconn.DialOption diff --git a/internal/proxy/proxy_other.go b/internal/proxy/proxy_other.go index 782b6691e..61bc1273f 100644 --- a/internal/proxy/proxy_other.go +++ b/internal/proxy/proxy_other.go @@ -17,7 +17,17 @@ package proxy -import "path/filepath" +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/hanwen/go-fuse/v2/fs" + "github.com/hanwen/go-fuse/v2/fuse" +) // UnixAddress is defined as a function to distinguish between Linux-based // implementations where the dir and inst and simply joins, and Windows-based @@ -25,3 +35,160 @@ import "path/filepath" func UnixAddress(dir, inst string) string { return filepath.Join(dir, inst) } + +type socketSymlink struct { + socket *socketMount + symlink *symlink +} + +func configureFUSE(c *Client, conf *Config) (*Client, error) { + if _, err := os.Stat(conf.FUSEDir); err != nil { + return nil, err + } + if err := os.MkdirAll(conf.FUSETempDir, 0777); err != nil { + return nil, err + } + c.fuseMount = fuseMount{ + fuseDir: conf.FUSEDir, + fuseTempDir: conf.FUSETempDir, + fuseSockets: map[string]socketSymlink{}, + // Use pointers for the following mutexes so fuseMount may be embedded + // as a value and support zero value lookups on fuseDir. + fuseMu: &sync.Mutex{}, + fuseServerMu: &sync.Mutex{}, + fuseWg: &sync.WaitGroup{}, + } + return c, nil +} + +type fuseMount struct { + // fuseDir specifies the directory where a FUSE server is mounted. The value + // is empty if FUSE is not enabled. The directory holds symlinks to Unix + // domain sockets in the fuseTmpDir. + fuseDir string + fuseTempDir string + // fuseMu protects access to fuseSockets. + fuseMu *sync.Mutex + // fuseSockets is a map of instance connection name to socketMount and + // symlink. + fuseSockets map[string]socketSymlink + fuseServerMu *sync.Mutex + fuseServer *fuse.Server + fuseWg *sync.WaitGroup + + // Inode adds support for FUSE operations. + fs.Inode +} + +// Readdir returns a list of all active Unix sockets in addition to the README. +func (c *Client) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) { + entries := []fuse.DirEntry{ + {Name: "README", Mode: 0555 | fuse.S_IFREG}, + } + var active []string + c.fuseMu.Lock() + for k := range c.fuseSockets { + active = append(active, k) + } + c.fuseMu.Unlock() + + for _, a := range active { + entries = append(entries, fuse.DirEntry{ + Name: a, + Mode: 0777 | syscall.S_IFSOCK, + }) + } + return fs.NewListDirStream(entries), fs.OK +} + +// Lookup implements the fs.NodeLookuper interface and returns an index node +// (inode) for a symlink that points to a Unix domain socket. The Unix domain +// socket is connected to the requested Cloud SQL instance. Lookup returns a +// symlink (instead of the socket itself) so that multiple callers all use the +// same Unix socket. +func (c *Client) Lookup(ctx context.Context, instance string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno) { + if instance == "README" { + return c.NewInode(ctx, &readme{}, fs.StableAttr{}), fs.OK + } + + if _, err := parseConnName(instance); err != nil { + return nil, syscall.ENOENT + } + + c.fuseMu.Lock() + defer c.fuseMu.Unlock() + if l, ok := c.fuseSockets[instance]; ok { + return l.symlink.EmbeddedInode(), fs.OK + } + + version, err := c.dialer.EngineVersion(ctx, instance) + if err != nil { + c.logger.Errorf("could not resolve version for %q: %v", instance, err) + return nil, syscall.ENOENT + } + + s, err := newSocketMount( + ctx, &Config{UnixSocket: c.fuseTempDir}, + nil, InstanceConnConfig{Name: instance}, version, + ) + if err != nil { + c.logger.Errorf("could not create socket for %q: %v", instance, err) + return nil, syscall.ENOENT + } + + c.fuseWg.Add(1) + go func() { + defer c.fuseWg.Done() + sErr := c.serveSocketMount(ctx, s) + if sErr != nil { + c.fuseMu.Lock() + delete(c.fuseSockets, instance) + c.fuseMu.Unlock() + } + }() + + // Return a symlink that points to the actual Unix socket within the + // temporary directory. For Postgres, return a symlink that points to the + // directory which holds the ".s.PGSQL.5432" Unix socket. + sl := &symlink{path: filepath.Join(c.fuseTempDir, instance)} + c.fuseSockets[instance] = socketSymlink{ + socket: s, + symlink: sl, + } + return c.NewInode(ctx, sl, fs.StableAttr{ + Mode: 0777 | fuse.S_IFLNK}, + ), fs.OK +} + +func (c *Client) serveFuse(ctx context.Context, notify func()) error { + srv, err := fs.Mount(c.fuseDir, c, &fs.Options{ + MountOptions: fuse.MountOptions{AllowOther: true}, + }) + if err != nil { + return fmt.Errorf("FUSE mount failed: %q: %v", c.fuseDir, err) + } + c.fuseServerMu.Lock() + c.fuseServer = srv + c.fuseServerMu.Unlock() + notify() + <-ctx.Done() + return ctx.Err() +} + +func (c *Client) fuseMounts() []*socketMount { + var mnts []*socketMount + c.fuseMu.Lock() + for _, m := range c.fuseSockets { + mnts = append(mnts, m.socket) + } + c.fuseMu.Unlock() + return mnts +} + +func (c *Client) unmountFUSE() error { + c.fuseServerMu.Lock() + defer c.fuseServerMu.Unlock() + return c.fuseServer.Unmount() +} + +func (c *Client) waitForFUSEMounts() { c.fuseWg.Wait() } diff --git a/internal/proxy/proxy_windows.go b/internal/proxy/proxy_windows.go index 0ce536fa4..2f4fe9924 100644 --- a/internal/proxy/proxy_windows.go +++ b/internal/proxy/proxy_windows.go @@ -15,13 +15,28 @@ package proxy import ( + "context" + "errors" "path/filepath" "strings" ) +var errFUSENotSupported = errors.New("FUSE is not supported on Windows") + // UnixAddress returns the Unix socket for a given instance in the provided // directory, by replacing all colons in the instance's name with periods. func UnixAddress(dir, inst string) string { inst2 := strings.ReplaceAll(inst, ":", ".") return filepath.Join(dir, inst2) } + +type fuseMount struct { + // fuseDir is always an empty string on Windows. + fuseDir string +} + +func configureFUSE(c *Client, conf *Config) (*Client, error) { return nil, errFUSENotSupported } +func (c *Client) fuseMounts() []*socketMount { return nil } +func (c *Client) serveFuse(ctx context.Context, notify func()) error { return errFUSENotSupported } +func (c *Client) unmountFUSE() error { return nil } +func (c *Client) waitForFUSEMounts() {}