diff --git a/cache/cache.go b/cache/cache.go index c21399f7f..ea43a6690 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -82,6 +82,8 @@ type BlobCache interface { type Reader interface { io.ReaderAt Close() error + + // If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough GetReaderAt() io.ReaderAt } diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index df12670a2..ff078ab2c 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -17,7 +17,6 @@ package main import ( - "bytes" "context" "flag" "fmt" @@ -27,10 +26,8 @@ import ( "net" "net/http" "os" - "os/exec" "os/signal" "path/filepath" - "strings" "time" snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" @@ -136,8 +133,10 @@ func main() { rpc := grpc.NewServer() // Configure FUSE passthrough - if config.Config.Config.FuseConfig.PassThrough, err = isFusePthEnable(); err != nil { - log.G(ctx).Warnf("failed to check FUSE passthrough support") + // Always set Direct to true to ensure that + // *directoryCache.Get always return *os.File instead of buffer + if config.Config.Config.FuseConfig.PassThrough { + config.Config.Config.DirectoryCacheConfig.Direct = true } // Configure keychain @@ -193,18 +192,6 @@ func main() { log.G(ctx).Info("Exiting") } -// isFusePthEnable prevents users from enabling passthrough mode on unsupported kernel versions -func isFusePthEnable() (bool, error) { - cmd := exec.Command("sh", "-c", "grep 'CONFIG_FUSE_PASSTHROUGH=y' /boot/config-$(uname -r)") - var out bytes.Buffer - cmd.Stdout = &out - cmd.Stderr = &out - if err := cmd.Run(); err != nil { - return false, err - } - return strings.Contains(out.String(), "CONFIG_FUSE_PASSTHROUGH=y"), nil -} - func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snapshotter, config snapshotterConfig) (bool, error) { // Convert the snapshotter to a gRPC service, snsvc := snapshotservice.FromSnapshotter(rs) diff --git a/fs/config/config.go b/fs/config/config.go index 4d9db30f2..44b87e8d6 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -150,5 +150,5 @@ type FuseConfig struct { EntryTimeout int64 `toml:"entry_timeout"` // PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false. - PassThrough bool `toml:"passthrough"` + PassThrough bool `toml:"passthrough" default:"false"` } diff --git a/fs/layer/node.go b/fs/layer/node.go index 0c30e80bd..02f9e6488 100644 --- a/fs/layer/node.go +++ b/fs/layer/node.go @@ -356,10 +356,11 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu if getter, ok := ra.(reader.PassthroughFdGetter); ok { fd, err := getter.GetPassthroughFd() if err != nil { - n.fs.s.report(fmt.Errorf("node.Open: %v", err)) - return nil, 0, syscall.EIO + n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err)) + n.fs.passThrough = false + } else { + f.InitFd(int(fd)) } - f.InitFd(int(fd)) } } diff --git a/fs/layer/testutil.go b/fs/layer/testutil.go index c1c6d3972..3729340c4 100644 --- a/fs/layer/testutil.go +++ b/fs/layer/testutil.go @@ -74,6 +74,7 @@ var srcCompressions = map[string]tutil.CompressionFactory{ func TestSuiteLayer(t *testing.T, store metadata.Store) { testPrefetch(t, store) testNodeRead(t, store) + testPassThroughRead(t, store) testNodes(t, store) } @@ -380,7 +381,15 @@ const ( lastChunkOffset1 = sampleChunkSize * (int64(len(sampleData1)) / sampleChunkSize) ) +func testPassThroughRead(t *testing.T, factory metadata.Store) { + nodeRead(t, factory, true) +} + func testNodeRead(t *testing.T, factory metadata.Store) { + nodeRead(t, factory, false) +} + +func nodeRead(t *testing.T, factory metadata.Store, pth bool) { sizeCond := map[string]int64{ "single_chunk": sampleChunkSize - sampleMiddleOffset, "multi_chunks": sampleChunkSize + sampleMiddleOffset, @@ -429,7 +438,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) { } // data we get from the file node. - f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl) + f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl, pth) defer closeFn() tmpbuf := make([]byte, size) // fuse library can request bigger than remain rr, errno := f.Read(context.Background(), tmpbuf, offset) @@ -460,7 +469,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) { } } -func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression) (_ *file, closeFn func() error) { +func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression, pth bool) (_ *file, closeFn func() error) { testName := "test" sr, tocDgst, err := tutil.BuildEStargz( []tutil.TarEntry{tutil.File(testName, string(contents))}, @@ -473,7 +482,7 @@ func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metada if err != nil { t.Fatalf("failed to create reader: %v", err) } - rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache()) + rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache(), pth) var eo fuse.EntryOut inode, errno := rootNode.Lookup(context.Background(), testName, &eo) if errno != 0 { @@ -725,7 +734,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa } defer r.Close() mcache := cache.NewMemoryCache() - rootNode := getRootNode(t, r, opaque, tocDgst, mcache) + rootNode := getRootNode(t, r, opaque, tocDgst, mcache, false) for _, want := range tt.want { want(t, rootNode, mcache, testR) } @@ -734,7 +743,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa } } -func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache) *node { +func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache, pth bool) *node { vr, err := reader.NewReader(r, cc, digest.FromString("")) if err != nil { t.Fatalf("failed to create reader: %v", err) @@ -743,7 +752,7 @@ func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocD if err != nil { t.Fatalf("failed to verify reader: %v", err) } - rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, false) + rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, pth) if err != nil { t.Fatalf("failed to get root node: %v", err) } diff --git a/fs/reader/reader.go b/fs/reader/reader.go index 086fb031b..70625f1b7 100644 --- a/fs/reader/reader.go +++ b/fs/reader/reader.go @@ -496,9 +496,9 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { func (sf *file) GetPassthroughFd() (uintptr, error) { var ( - offset int64 = 0 + offset int64 firstChunkOffset int64 = -1 - totalSize int64 = 0 + totalSize int64 ) for { @@ -525,22 +525,23 @@ func (sf *file) GetPassthroughFd() (uintptr, error) { } readerAt := r.GetReaderAt() - if file, ok := readerAt.(*os.File); ok { - fd := file.Fd() - r.Close() - return fd, nil - } else { + file, ok := readerAt.(*os.File) + if !ok { r.Close() return 0, fmt.Errorf("The cached ReaderAt is not of type *os.File, fd obtain failed") } + + fd := file.Fd() + r.Close() + return fd, nil } } func (sf *file) prefetchEntireFile() error { var ( - offset int64 = 0 + offset int64 firstChunkOffset int64 = -1 - totalSize int64 = 0 + totalSize int64 ) combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer) combinedBuffer.Reset() @@ -554,10 +555,28 @@ func (sf *file) prefetchEntireFile() error { if firstChunkOffset == -1 { firstChunkOffset = chunkOffset } + + id := genID(sf.id, chunkOffset, chunkSize) b := sf.gr.bufPool.Get().(*bytes.Buffer) b.Reset() b.Grow(int(chunkSize)) ip := b.Bytes()[:chunkSize] + + // Check if the content exists in the cache + if r, err := sf.gr.cache.Get(id); err == nil { + n, err := r.ReadAt(ip, 0) + if (err == nil || err == io.EOF) && int64(n) == chunkSize { + combinedBuffer.Write(ip[:n]) + totalSize += int64(n) + offset = chunkOffset + int64(n) + r.Close() + sf.gr.putBuffer(b) + continue + } + r.Close() + } + + // cache miss, prefetch the whole chunk if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF { sf.gr.putBuffer(b) return fmt.Errorf("failed to read data: %w", err) @@ -571,9 +590,9 @@ func (sf *file) prefetchEntireFile() error { offset = chunkOffset + chunkSize sf.gr.putBuffer(b) } - combinedIp := combinedBuffer.Bytes() - id := genID(sf.id, firstChunkOffset, totalSize) - sf.gr.cacheData(combinedIp, id) + combinedIP := combinedBuffer.Bytes() + combinedID := genID(sf.id, firstChunkOffset, totalSize) + sf.gr.cacheData(combinedIP, combinedID) return nil }