Skip to content

Commit

Permalink
fuse passthrough: fix some review comments
Browse files Browse the repository at this point in the history
Signed-off-by: abushwang <[email protected]>
  • Loading branch information
wswsmao committed Nov 21, 2024
1 parent 39a2e55 commit d16d065
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 39 deletions.
2 changes: 2 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type BlobCache interface {
type Reader interface {
io.ReaderAt
Close() error

// If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough
GetReaderAt() io.ReaderAt
}

Expand Down
21 changes: 4 additions & 17 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package main

import (
"bytes"
"context"
"flag"
"fmt"
Expand All @@ -27,10 +26,8 @@ import (
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"time"

snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
Expand Down Expand Up @@ -136,8 +133,10 @@ func main() {
rpc := grpc.NewServer()

// Configure FUSE passthrough
if config.Config.Config.FuseConfig.PassThrough, err = isFusePthEnable(); err != nil {
log.G(ctx).Warnf("failed to check FUSE passthrough support")
// Always set Direct to true to ensure that
// *directoryCache.Get always return *os.File instead of buffer
if config.Config.Config.FuseConfig.PassThrough {
config.Config.Config.DirectoryCacheConfig.Direct = true
}

// Configure keychain
Expand Down Expand Up @@ -193,18 +192,6 @@ func main() {
log.G(ctx).Info("Exiting")
}

// isFusePthEnable prevents users from enabling passthrough mode on unsupported kernel versions
func isFusePthEnable() (bool, error) {
cmd := exec.Command("sh", "-c", "grep 'CONFIG_FUSE_PASSTHROUGH=y' /boot/config-$(uname -r)")
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
return false, err
}
return strings.Contains(out.String(), "CONFIG_FUSE_PASSTHROUGH=y"), nil
}

func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snapshotter, config snapshotterConfig) (bool, error) {
// Convert the snapshotter to a gRPC service,
snsvc := snapshotservice.FromSnapshotter(rs)
Expand Down
2 changes: 1 addition & 1 deletion fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,5 @@ type FuseConfig struct {
EntryTimeout int64 `toml:"entry_timeout"`

// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
PassThrough bool `toml:"passthrough"`
PassThrough bool `toml:"passthrough" default:"false"`
}
7 changes: 4 additions & 3 deletions fs/layer/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,11 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu
if getter, ok := ra.(reader.PassthroughFdGetter); ok {
fd, err := getter.GetPassthroughFd()
if err != nil {
n.fs.s.report(fmt.Errorf("node.Open: %v", err))
return nil, 0, syscall.EIO
n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err))
n.fs.passThrough = false
} else {
f.InitFd(int(fd))
}
f.InitFd(int(fd))
}
}

Expand Down
21 changes: 15 additions & 6 deletions fs/layer/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var srcCompressions = map[string]tutil.CompressionFactory{
func TestSuiteLayer(t *testing.T, store metadata.Store) {
testPrefetch(t, store)
testNodeRead(t, store)
testPassThroughRead(t, store)
testNodes(t, store)
}

Expand Down Expand Up @@ -380,7 +381,15 @@ const (
lastChunkOffset1 = sampleChunkSize * (int64(len(sampleData1)) / sampleChunkSize)
)

func testPassThroughRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, true)
}

func testNodeRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, false)
}

func nodeRead(t *testing.T, factory metadata.Store, pth bool) {
sizeCond := map[string]int64{
"single_chunk": sampleChunkSize - sampleMiddleOffset,
"multi_chunks": sampleChunkSize + sampleMiddleOffset,
Expand Down Expand Up @@ -429,7 +438,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}

// data we get from the file node.
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl)
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl, pth)
defer closeFn()
tmpbuf := make([]byte, size) // fuse library can request bigger than remain
rr, errno := f.Read(context.Background(), tmpbuf, offset)
Expand Down Expand Up @@ -460,7 +469,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}
}

func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression) (_ *file, closeFn func() error) {
func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression, pth bool) (_ *file, closeFn func() error) {
testName := "test"
sr, tocDgst, err := tutil.BuildEStargz(
[]tutil.TarEntry{tutil.File(testName, string(contents))},
Expand All @@ -473,7 +482,7 @@ func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metada
if err != nil {
t.Fatalf("failed to create reader: %v", err)
}
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache())
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache(), pth)
var eo fuse.EntryOut
inode, errno := rootNode.Lookup(context.Background(), testName, &eo)
if errno != 0 {
Expand Down Expand Up @@ -725,7 +734,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
defer r.Close()
mcache := cache.NewMemoryCache()
rootNode := getRootNode(t, r, opaque, tocDgst, mcache)
rootNode := getRootNode(t, r, opaque, tocDgst, mcache, false)
for _, want := range tt.want {
want(t, rootNode, mcache, testR)
}
Expand All @@ -734,7 +743,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
}

func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache) *node {
func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache, pth bool) *node {
vr, err := reader.NewReader(r, cc, digest.FromString(""))
if err != nil {
t.Fatalf("failed to create reader: %v", err)
Expand All @@ -743,7 +752,7 @@ func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocD
if err != nil {
t.Fatalf("failed to verify reader: %v", err)
}
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, false)
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, pth)
if err != nil {
t.Fatalf("failed to get root node: %v", err)
}
Expand Down
43 changes: 31 additions & 12 deletions fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,9 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {

func (sf *file) GetPassthroughFd() (uintptr, error) {
var (
offset int64 = 0
offset int64
firstChunkOffset int64 = -1
totalSize int64 = 0
totalSize int64
)

for {
Expand All @@ -525,22 +525,23 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
}

readerAt := r.GetReaderAt()
if file, ok := readerAt.(*os.File); ok {
fd := file.Fd()
r.Close()
return fd, nil
} else {
file, ok := readerAt.(*os.File)
if !ok {
r.Close()
return 0, fmt.Errorf("The cached ReaderAt is not of type *os.File, fd obtain failed")
}

fd := file.Fd()
r.Close()
return fd, nil
}
}

func (sf *file) prefetchEntireFile() error {
var (
offset int64 = 0
offset int64
firstChunkOffset int64 = -1
totalSize int64 = 0
totalSize int64
)
combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer)
combinedBuffer.Reset()
Expand All @@ -554,10 +555,28 @@ func (sf *file) prefetchEntireFile() error {
if firstChunkOffset == -1 {
firstChunkOffset = chunkOffset
}

id := genID(sf.id, chunkOffset, chunkSize)
b := sf.gr.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Grow(int(chunkSize))
ip := b.Bytes()[:chunkSize]

// Check if the content exists in the cache
if r, err := sf.gr.cache.Get(id); err == nil {
n, err := r.ReadAt(ip, 0)
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
combinedBuffer.Write(ip[:n])
totalSize += int64(n)
offset = chunkOffset + int64(n)
r.Close()
sf.gr.putBuffer(b)
continue
}
r.Close()
}

// cache miss, prefetch the whole chunk
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF {
sf.gr.putBuffer(b)
return fmt.Errorf("failed to read data: %w", err)
Expand All @@ -571,9 +590,9 @@ func (sf *file) prefetchEntireFile() error {
offset = chunkOffset + chunkSize
sf.gr.putBuffer(b)
}
combinedIp := combinedBuffer.Bytes()
id := genID(sf.id, firstChunkOffset, totalSize)
sf.gr.cacheData(combinedIp, id)
combinedIP := combinedBuffer.Bytes()
combinedID := genID(sf.id, firstChunkOffset, totalSize)
sf.gr.cacheData(combinedIP, combinedID)
return nil
}

Expand Down

0 comments on commit d16d065

Please sign in to comment.