Skip to content

Commit

Permalink
Merge pull request #1868 from wswsmao/passthrough
Browse files Browse the repository at this point in the history
Add FUSE Passthrough Support in Stargz-Snapshotter #1867
  • Loading branch information
ktock authored Nov 21, 2024
2 parents 946a04d + d16d065 commit 2a280d6
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 17 deletions.
7 changes: 7 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type BlobCache interface {
type Reader interface {
io.ReaderAt
Close() error

// If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough
GetReaderAt() io.ReaderAt
}

// Writer enables the client to cache byte data. Commit() must be
Expand Down Expand Up @@ -414,6 +417,10 @@ type reader struct {

func (r *reader) Close() error { return r.closeFunc() }

func (r *reader) GetReaderAt() io.ReaderAt {
return r.ReaderAt
}

type writer struct {
io.WriteCloser
commitFunc func() error
Expand Down
7 changes: 7 additions & 0 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func main() {
// Create a gRPC server
rpc := grpc.NewServer()

// Configure FUSE passthrough
// Always set Direct to true to ensure that
// *directoryCache.Get always return *os.File instead of buffer
if config.Config.Config.FuseConfig.PassThrough {
config.Config.Config.DirectoryCacheConfig.Direct = true
}

// Configure keychain
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
if config.Config.KubeconfigKeychainConfig.EnableKeychain {
Expand Down
3 changes: 3 additions & 0 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ type FuseConfig struct {

// EntryTimeout defines TTL for directory, name lookup in seconds.
EntryTimeout int64 `toml:"entry_timeout"`

// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
PassThrough bool `toml:"passthrough" default:"false"`
}
7 changes: 5 additions & 2 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
}

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
l := newLayer(r, desc, blobR, vr, r.config.FuseConfig.PassThrough)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
Expand Down Expand Up @@ -375,13 +375,15 @@ func newLayer(
desc ocispec.Descriptor,
blob *blobRef,
vr *reader.VerifiableReader,
pth bool,
) *layer {
return &layer{
resolver: resolver,
desc: desc,
blob: blob,
verifiableReader: vr,
prefetchWaiter: newWaiter(),
passThrough: pth,
}
}

Expand All @@ -402,6 +404,7 @@ type layer struct {

prefetchOnce sync.Once
backgroundFetchOnce sync.Once
passThrough bool
}

func (l *layer) Info() Info {
Expand Down Expand Up @@ -583,7 +586,7 @@ func (l *layer) RootNode(baseInode uint32) (fusefs.InodeEmbedder, error) {
if l.r == nil {
return nil, fmt.Errorf("layer hasn't been verified yet")
}
return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType)
return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType, l.passThrough)
}

func (l *layer) ReadAt(p []byte, offset int64, opts ...remote.Option) (int, error) {
Expand Down
35 changes: 32 additions & 3 deletions fs/layer/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var opaqueXattrs = map[OverlayOpaqueType][]string{
OverlayOpaqueUser: {"user.overlay.opaque"},
}

func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType) (fusefs.InodeEmbedder, error) {
func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType, pth bool) (fusefs.InodeEmbedder, error) {
rootID := r.Metadata().RootID()
rootAttr, err := r.Metadata().GetAttr(rootID)
if err != nil {
Expand All @@ -92,6 +92,7 @@ func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseIno
baseInode: baseInode,
rootID: rootID,
opaqueXattrs: opq,
passThrough: pth,
}
ffs.s = ffs.newState(layerDgst, blob)
return &node{
Expand All @@ -109,6 +110,7 @@ type fs struct {
baseInode uint32
rootID uint32
opaqueXattrs []string
passThrough bool
}

func (fs *fs) inodeOfState() uint64 {
Expand Down Expand Up @@ -344,10 +346,25 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu
n.fs.s.report(fmt.Errorf("node.Open: %v", err))
return nil, 0, syscall.EIO
}
return &file{

f := &file{
n: n,
ra: ra,
}, fuse.FOPEN_KEEP_CACHE, 0
}

if n.fs.passThrough {
if getter, ok := ra.(reader.PassthroughFdGetter); ok {
fd, err := getter.GetPassthroughFd()
if err != nil {
n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err))
n.fs.passThrough = false
} else {
f.InitFd(int(fd))
}
}
}

return f, fuse.FOPEN_KEEP_CACHE, 0
}

var _ = (fusefs.NodeGetattrer)((*node)(nil))
Expand Down Expand Up @@ -424,6 +441,7 @@ func (n *node) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno {
type file struct {
n *node
ra io.ReaderAt
fd int
}

var _ = (fusefs.FileReader)((*file)(nil))
Expand Down Expand Up @@ -451,6 +469,17 @@ func (f *file) Getattr(ctx context.Context, out *fuse.AttrOut) syscall.Errno {
return 0
}

// Implement PassthroughFd to enable go-fuse passthrough
var _ = (fusefs.FilePassthroughFder)((*file)(nil))

func (f *file) PassthroughFd() (int, bool) {
return f.fd, true
}

func (f *file) InitFd(fd int) {
f.fd = fd
}

// whiteout is a whiteout abstraction compliant to overlayfs.
type whiteout struct {
fusefs.Inode
Expand Down
22 changes: 16 additions & 6 deletions fs/layer/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var srcCompressions = map[string]tutil.CompressionFactory{
func TestSuiteLayer(t *testing.T, store metadata.Store) {
testPrefetch(t, store)
testNodeRead(t, store)
testPassThroughRead(t, store)
testNodes(t, store)
}

Expand Down Expand Up @@ -218,6 +219,7 @@ func testPrefetch(t *testing.T, factory metadata.Store) {
ocispec.Descriptor{Digest: testStateLayerDigest},
&blobRef{blob, func() {}},
vr,
false,
)
if err := l.Verify(dgst); err != nil {
t.Errorf("failed to verify reader: %v", err)
Expand Down Expand Up @@ -379,7 +381,15 @@ const (
lastChunkOffset1 = sampleChunkSize * (int64(len(sampleData1)) / sampleChunkSize)
)

func testPassThroughRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, true)
}

func testNodeRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, false)
}

func nodeRead(t *testing.T, factory metadata.Store, pth bool) {
sizeCond := map[string]int64{
"single_chunk": sampleChunkSize - sampleMiddleOffset,
"multi_chunks": sampleChunkSize + sampleMiddleOffset,
Expand Down Expand Up @@ -428,7 +438,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}

// data we get from the file node.
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl)
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl, pth)
defer closeFn()
tmpbuf := make([]byte, size) // fuse library can request bigger than remain
rr, errno := f.Read(context.Background(), tmpbuf, offset)
Expand Down Expand Up @@ -459,7 +469,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}
}

func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression) (_ *file, closeFn func() error) {
func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression, pth bool) (_ *file, closeFn func() error) {
testName := "test"
sr, tocDgst, err := tutil.BuildEStargz(
[]tutil.TarEntry{tutil.File(testName, string(contents))},
Expand All @@ -472,7 +482,7 @@ func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metada
if err != nil {
t.Fatalf("failed to create reader: %v", err)
}
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache())
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache(), pth)
var eo fuse.EntryOut
inode, errno := rootNode.Lookup(context.Background(), testName, &eo)
if errno != 0 {
Expand Down Expand Up @@ -724,7 +734,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
defer r.Close()
mcache := cache.NewMemoryCache()
rootNode := getRootNode(t, r, opaque, tocDgst, mcache)
rootNode := getRootNode(t, r, opaque, tocDgst, mcache, false)
for _, want := range tt.want {
want(t, rootNode, mcache, testR)
}
Expand All @@ -733,7 +743,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
}

func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache) *node {
func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache, pth bool) *node {
vr, err := reader.NewReader(r, cc, digest.FromString(""))
if err != nil {
t.Fatalf("failed to create reader: %v", err)
Expand All @@ -742,7 +752,7 @@ func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocD
if err != nil {
t.Fatalf("failed to verify reader: %v", err)
}
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque)
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, pth)
if err != nil {
t.Fatalf("failed to get root node: %v", err)
}
Expand Down
Loading

0 comments on commit 2a280d6

Please sign in to comment.