diff --git a/cache/cache.go b/cache/cache.go index 417ae4b2b..ea43a6690 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -82,6 +82,9 @@ type BlobCache interface { type Reader interface { io.ReaderAt Close() error + + // If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough + GetReaderAt() io.ReaderAt } // Writer enables the client to cache byte data. Commit() must be @@ -414,6 +417,10 @@ type reader struct { func (r *reader) Close() error { return r.closeFunc() } +func (r *reader) GetReaderAt() io.ReaderAt { + return r.ReaderAt +} + type writer struct { io.WriteCloser commitFunc func() error diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index 82931fd06..ff078ab2c 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -132,6 +132,13 @@ func main() { // Create a gRPC server rpc := grpc.NewServer() + // Configure FUSE passthrough + // Always set Direct to true to ensure that + // *directoryCache.Get always return *os.File instead of buffer + if config.Config.Config.FuseConfig.PassThrough { + config.Config.Config.DirectoryCacheConfig.Direct = true + } + // Configure keychain credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} if config.Config.KubeconfigKeychainConfig.EnableKeychain { diff --git a/fs/config/config.go b/fs/config/config.go index 890aded74..44b87e8d6 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -148,4 +148,7 @@ type FuseConfig struct { // EntryTimeout defines TTL for directory, name lookup in seconds. EntryTimeout int64 `toml:"entry_timeout"` + + // PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false. + PassThrough bool `toml:"passthrough" default:"false"` } diff --git a/fs/layer/layer.go b/fs/layer/layer.go index a71848dbb..4efc32b24 100644 --- a/fs/layer/layer.go +++ b/fs/layer/layer.go @@ -315,7 +315,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs } // Combine layer information together and cache it. - l := newLayer(r, desc, blobR, vr) + l := newLayer(r, desc, blobR, vr, r.config.FuseConfig.PassThrough) r.layerCacheMu.Lock() cachedL, done2, added := r.layerCache.Add(name, l) r.layerCacheMu.Unlock() @@ -375,6 +375,7 @@ func newLayer( desc ocispec.Descriptor, blob *blobRef, vr *reader.VerifiableReader, + pth bool, ) *layer { return &layer{ resolver: resolver, @@ -382,6 +383,7 @@ func newLayer( blob: blob, verifiableReader: vr, prefetchWaiter: newWaiter(), + passThrough: pth, } } @@ -402,6 +404,7 @@ type layer struct { prefetchOnce sync.Once backgroundFetchOnce sync.Once + passThrough bool } func (l *layer) Info() Info { @@ -583,7 +586,7 @@ func (l *layer) RootNode(baseInode uint32) (fusefs.InodeEmbedder, error) { if l.r == nil { return nil, fmt.Errorf("layer hasn't been verified yet") } - return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType) + return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType, l.passThrough) } func (l *layer) ReadAt(p []byte, offset int64, opts ...remote.Option) (int, error) { diff --git a/fs/layer/node.go b/fs/layer/node.go index b6306b9b5..02f9e6488 100644 --- a/fs/layer/node.go +++ b/fs/layer/node.go @@ -76,7 +76,7 @@ var opaqueXattrs = map[OverlayOpaqueType][]string{ OverlayOpaqueUser: {"user.overlay.opaque"}, } -func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType) (fusefs.InodeEmbedder, error) { +func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType, pth bool) (fusefs.InodeEmbedder, error) { rootID := r.Metadata().RootID() rootAttr, err := r.Metadata().GetAttr(rootID) if err != nil { @@ -92,6 +92,7 @@ func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseIno baseInode: baseInode, rootID: rootID, opaqueXattrs: opq, + passThrough: pth, } ffs.s = ffs.newState(layerDgst, blob) return &node{ @@ -109,6 +110,7 @@ type fs struct { baseInode uint32 rootID uint32 opaqueXattrs []string + passThrough bool } func (fs *fs) inodeOfState() uint64 { @@ -344,10 +346,25 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu n.fs.s.report(fmt.Errorf("node.Open: %v", err)) return nil, 0, syscall.EIO } - return &file{ + + f := &file{ n: n, ra: ra, - }, fuse.FOPEN_KEEP_CACHE, 0 + } + + if n.fs.passThrough { + if getter, ok := ra.(reader.PassthroughFdGetter); ok { + fd, err := getter.GetPassthroughFd() + if err != nil { + n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err)) + n.fs.passThrough = false + } else { + f.InitFd(int(fd)) + } + } + } + + return f, fuse.FOPEN_KEEP_CACHE, 0 } var _ = (fusefs.NodeGetattrer)((*node)(nil)) @@ -424,6 +441,7 @@ func (n *node) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno { type file struct { n *node ra io.ReaderAt + fd int } var _ = (fusefs.FileReader)((*file)(nil)) @@ -451,6 +469,17 @@ func (f *file) Getattr(ctx context.Context, out *fuse.AttrOut) syscall.Errno { return 0 } +// Implement PassthroughFd to enable go-fuse passthrough +var _ = (fusefs.FilePassthroughFder)((*file)(nil)) + +func (f *file) PassthroughFd() (int, bool) { + return f.fd, true +} + +func (f *file) InitFd(fd int) { + f.fd = fd +} + // whiteout is a whiteout abstraction compliant to overlayfs. type whiteout struct { fusefs.Inode diff --git a/fs/layer/testutil.go b/fs/layer/testutil.go index 5a38825d0..3729340c4 100644 --- a/fs/layer/testutil.go +++ b/fs/layer/testutil.go @@ -74,6 +74,7 @@ var srcCompressions = map[string]tutil.CompressionFactory{ func TestSuiteLayer(t *testing.T, store metadata.Store) { testPrefetch(t, store) testNodeRead(t, store) + testPassThroughRead(t, store) testNodes(t, store) } @@ -218,6 +219,7 @@ func testPrefetch(t *testing.T, factory metadata.Store) { ocispec.Descriptor{Digest: testStateLayerDigest}, &blobRef{blob, func() {}}, vr, + false, ) if err := l.Verify(dgst); err != nil { t.Errorf("failed to verify reader: %v", err) @@ -379,7 +381,15 @@ const ( lastChunkOffset1 = sampleChunkSize * (int64(len(sampleData1)) / sampleChunkSize) ) +func testPassThroughRead(t *testing.T, factory metadata.Store) { + nodeRead(t, factory, true) +} + func testNodeRead(t *testing.T, factory metadata.Store) { + nodeRead(t, factory, false) +} + +func nodeRead(t *testing.T, factory metadata.Store, pth bool) { sizeCond := map[string]int64{ "single_chunk": sampleChunkSize - sampleMiddleOffset, "multi_chunks": sampleChunkSize + sampleMiddleOffset, @@ -428,7 +438,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) { } // data we get from the file node. - f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl) + f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl, pth) defer closeFn() tmpbuf := make([]byte, size) // fuse library can request bigger than remain rr, errno := f.Read(context.Background(), tmpbuf, offset) @@ -459,7 +469,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) { } } -func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression) (_ *file, closeFn func() error) { +func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression, pth bool) (_ *file, closeFn func() error) { testName := "test" sr, tocDgst, err := tutil.BuildEStargz( []tutil.TarEntry{tutil.File(testName, string(contents))}, @@ -472,7 +482,7 @@ func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metada if err != nil { t.Fatalf("failed to create reader: %v", err) } - rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache()) + rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache(), pth) var eo fuse.EntryOut inode, errno := rootNode.Lookup(context.Background(), testName, &eo) if errno != 0 { @@ -724,7 +734,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa } defer r.Close() mcache := cache.NewMemoryCache() - rootNode := getRootNode(t, r, opaque, tocDgst, mcache) + rootNode := getRootNode(t, r, opaque, tocDgst, mcache, false) for _, want := range tt.want { want(t, rootNode, mcache, testR) } @@ -733,7 +743,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa } } -func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache) *node { +func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache, pth bool) *node { vr, err := reader.NewReader(r, cc, digest.FromString("")) if err != nil { t.Fatalf("failed to create reader: %v", err) @@ -742,7 +752,7 @@ func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocD if err != nil { t.Fatalf("failed to verify reader: %v", err) } - rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque) + rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, pth) if err != nil { t.Fatalf("failed to get root node: %v", err) } diff --git a/fs/reader/reader.go b/fs/reader/reader.go index c860b87de..70625f1b7 100644 --- a/fs/reader/reader.go +++ b/fs/reader/reader.go @@ -53,6 +53,10 @@ type Reader interface { LastOnDemandReadTime() time.Time } +type PassthroughFdGetter interface { + GetPassthroughFd() (uintptr, error) +} + // VerifiableReader produces a Reader with a given verifier. type VerifiableReader struct { r *reader @@ -490,18 +494,120 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { return nr, nil } -func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr string, cacheID string) error { +func (sf *file) GetPassthroughFd() (uintptr, error) { + var ( + offset int64 + firstChunkOffset int64 = -1 + totalSize int64 + ) + + for { + chunkOffset, chunkSize, _, ok := sf.fr.ChunkEntryForOffset(offset) + if !ok { + break + } + if firstChunkOffset == -1 { + firstChunkOffset = chunkOffset + } + totalSize += chunkSize + offset = chunkOffset + chunkSize + } + + id := genID(sf.id, firstChunkOffset, totalSize) + + for { + r, err := sf.gr.cache.Get(id) + if err != nil { + if err := sf.prefetchEntireFile(); err != nil { + return 0, err + } + continue + } + + readerAt := r.GetReaderAt() + file, ok := readerAt.(*os.File) + if !ok { + r.Close() + return 0, fmt.Errorf("The cached ReaderAt is not of type *os.File, fd obtain failed") + } + + fd := file.Fd() + r.Close() + return fd, nil + } +} + +func (sf *file) prefetchEntireFile() error { + var ( + offset int64 + firstChunkOffset int64 = -1 + totalSize int64 + ) + combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer) + combinedBuffer.Reset() + defer sf.gr.putBuffer(combinedBuffer) + + for { + chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset) + if !ok { + break + } + if firstChunkOffset == -1 { + firstChunkOffset = chunkOffset + } + + id := genID(sf.id, chunkOffset, chunkSize) + b := sf.gr.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Grow(int(chunkSize)) + ip := b.Bytes()[:chunkSize] + + // Check if the content exists in the cache + if r, err := sf.gr.cache.Get(id); err == nil { + n, err := r.ReadAt(ip, 0) + if (err == nil || err == io.EOF) && int64(n) == chunkSize { + combinedBuffer.Write(ip[:n]) + totalSize += int64(n) + offset = chunkOffset + int64(n) + r.Close() + sf.gr.putBuffer(b) + continue + } + r.Close() + } + + // cache miss, prefetch the whole chunk + if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF { + sf.gr.putBuffer(b) + return fmt.Errorf("failed to read data: %w", err) + } + if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil { + sf.gr.putBuffer(b) + return err + } + combinedBuffer.Write(ip) + totalSize += chunkSize + offset = chunkOffset + chunkSize + sf.gr.putBuffer(b) + } + combinedIP := combinedBuffer.Bytes() + combinedID := genID(sf.id, firstChunkOffset, totalSize) + sf.gr.cacheData(combinedIP, combinedID) + return nil +} + +func (gr *reader) verifyOneChunk(entryID uint32, ip []byte, chunkDigestStr string) error { // We can end up doing on demand registry fetch when aligning the chunk - commonmetrics.IncOperationCount(commonmetrics.OnDemandRemoteRegistryFetchCount, gr.layerSha) // increment the number of on demand file fetches from remote registry - commonmetrics.AddBytesCount(commonmetrics.OnDemandBytesFetched, gr.layerSha, int64(len(ip))) // record total bytes fetched + commonmetrics.IncOperationCount(commonmetrics.OnDemandRemoteRegistryFetchCount, gr.layerSha) + commonmetrics.AddBytesCount(commonmetrics.OnDemandBytesFetched, gr.layerSha, int64(len(ip))) gr.setLastReadTime(time.Now()) - - // Verify this chunk if err := gr.verifyChunk(entryID, ip, chunkDigestStr); err != nil { return fmt.Errorf("invalid chunk: %w", err) } + return nil +} - // Cache this chunk +func (gr *reader) cacheData(ip []byte, cacheID string) { if w, err := gr.cache.Add(cacheID); err == nil { if cn, err := w.Write(ip); err != nil || cn != len(ip) { w.Abort() @@ -510,7 +616,13 @@ func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr strin } w.Close() } +} +func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr string, cacheID string) error { + if err := gr.verifyOneChunk(entryID, ip, chunkDigestStr); err != nil { + return err + } + gr.cacheData(ip, cacheID) return nil }