Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FUSE Passthrough Support in Stargz-Snapshotter #1867 #1868

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type BlobCache interface {
type Reader interface {
io.ReaderAt
Close() error

// If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough
GetReaderAt() io.ReaderAt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment like If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough.?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When FUSE passthrough is enabled, we should always set directoryCache.direct to true so that we can ensure that *directoryCache.Get always return *os.File (not a buffer).

}

// Writer enables the client to cache byte data. Commit() must be
Expand Down Expand Up @@ -414,6 +417,10 @@ type reader struct {

func (r *reader) Close() error { return r.closeFunc() }

func (r *reader) GetReaderAt() io.ReaderAt {
return r.ReaderAt
}

type writer struct {
io.WriteCloser
commitFunc func() error
Expand Down
7 changes: 7 additions & 0 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func main() {
// Create a gRPC server
rpc := grpc.NewServer()

// Configure FUSE passthrough
// Always set Direct to true to ensure that
// *directoryCache.Get always return *os.File instead of buffer
if config.Config.Config.FuseConfig.PassThrough {
config.Config.Config.DirectoryCacheConfig.Direct = true
}

// Configure keychain
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
if config.Config.KubeconfigKeychainConfig.EnableKeychain {
Expand Down
3 changes: 3 additions & 0 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ type FuseConfig struct {

// EntryTimeout defines TTL for directory, name lookup in seconds.
EntryTimeout int64 `toml:"entry_timeout"`

// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
PassThrough bool `toml:"passthrough" default:"false"`
}
7 changes: 5 additions & 2 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
}

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
l := newLayer(r, desc, blobR, vr, r.config.FuseConfig.PassThrough)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
Expand Down Expand Up @@ -375,13 +375,15 @@ func newLayer(
desc ocispec.Descriptor,
blob *blobRef,
vr *reader.VerifiableReader,
pth bool,
) *layer {
return &layer{
resolver: resolver,
desc: desc,
blob: blob,
verifiableReader: vr,
prefetchWaiter: newWaiter(),
passThrough: pth,
}
}

Expand All @@ -402,6 +404,7 @@ type layer struct {

prefetchOnce sync.Once
backgroundFetchOnce sync.Once
passThrough bool
}

func (l *layer) Info() Info {
Expand Down Expand Up @@ -583,7 +586,7 @@ func (l *layer) RootNode(baseInode uint32) (fusefs.InodeEmbedder, error) {
if l.r == nil {
return nil, fmt.Errorf("layer hasn't been verified yet")
}
return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType)
return newNode(l.desc.Digest, l.r, l.blob, baseInode, l.resolver.overlayOpaqueType, l.passThrough)
}

func (l *layer) ReadAt(p []byte, offset int64, opts ...remote.Option) (int, error) {
Expand Down
35 changes: 32 additions & 3 deletions fs/layer/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var opaqueXattrs = map[OverlayOpaqueType][]string{
OverlayOpaqueUser: {"user.overlay.opaque"},
}

func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType) (fusefs.InodeEmbedder, error) {
func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType, pth bool) (fusefs.InodeEmbedder, error) {
rootID := r.Metadata().RootID()
rootAttr, err := r.Metadata().GetAttr(rootID)
if err != nil {
Expand All @@ -92,6 +92,7 @@ func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseIno
baseInode: baseInode,
rootID: rootID,
opaqueXattrs: opq,
passThrough: pth,
}
ffs.s = ffs.newState(layerDgst, blob)
return &node{
Expand All @@ -109,6 +110,7 @@ type fs struct {
baseInode uint32
rootID uint32
opaqueXattrs []string
passThrough bool
}

func (fs *fs) inodeOfState() uint64 {
Expand Down Expand Up @@ -344,10 +346,25 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu
n.fs.s.report(fmt.Errorf("node.Open: %v", err))
return nil, 0, syscall.EIO
}
return &file{

f := &file{
n: n,
ra: ra,
}, fuse.FOPEN_KEEP_CACHE, 0
}

if n.fs.passThrough {
if getter, ok := ra.(reader.PassthroughFdGetter); ok {
fd, err := getter.GetPassthroughFd()
if err != nil {
n.fs.s.report(fmt.Errorf("passThrough model failed due to node.Open: %v", err))
n.fs.passThrough = false
} else {
f.InitFd(int(fd))
}
}
}

return f, fuse.FOPEN_KEEP_CACHE, 0
}

var _ = (fusefs.NodeGetattrer)((*node)(nil))
Expand Down Expand Up @@ -424,6 +441,7 @@ func (n *node) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno {
type file struct {
n *node
ra io.ReaderAt
fd int
}

var _ = (fusefs.FileReader)((*file)(nil))
Expand Down Expand Up @@ -451,6 +469,17 @@ func (f *file) Getattr(ctx context.Context, out *fuse.AttrOut) syscall.Errno {
return 0
}

// Implement PassthroughFd to enable go-fuse passthrough
var _ = (fusefs.FilePassthroughFder)((*file)(nil))

func (f *file) PassthroughFd() (int, bool) {
return f.fd, true
}

func (f *file) InitFd(fd int) {
f.fd = fd
}

// whiteout is a whiteout abstraction compliant to overlayfs.
type whiteout struct {
fusefs.Inode
Expand Down
22 changes: 16 additions & 6 deletions fs/layer/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var srcCompressions = map[string]tutil.CompressionFactory{
func TestSuiteLayer(t *testing.T, store metadata.Store) {
testPrefetch(t, store)
testNodeRead(t, store)
testPassThroughRead(t, store)
testNodes(t, store)
}

Expand Down Expand Up @@ -218,6 +219,7 @@ func testPrefetch(t *testing.T, factory metadata.Store) {
ocispec.Descriptor{Digest: testStateLayerDigest},
&blobRef{blob, func() {}},
vr,
false,
)
if err := l.Verify(dgst); err != nil {
t.Errorf("failed to verify reader: %v", err)
Expand Down Expand Up @@ -379,7 +381,15 @@ const (
lastChunkOffset1 = sampleChunkSize * (int64(len(sampleData1)) / sampleChunkSize)
)

func testPassThroughRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, true)
}

func testNodeRead(t *testing.T, factory metadata.Store) {
nodeRead(t, factory, false)
}

func nodeRead(t *testing.T, factory metadata.Store, pth bool) {
sizeCond := map[string]int64{
"single_chunk": sampleChunkSize - sampleMiddleOffset,
"multi_chunks": sampleChunkSize + sampleMiddleOffset,
Expand Down Expand Up @@ -428,7 +438,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}

// data we get from the file node.
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl)
f, closeFn := makeNodeReader(t, []byte(sampleData1)[:filesize], sampleChunkSize, factory, cl, pth)
defer closeFn()
tmpbuf := make([]byte, size) // fuse library can request bigger than remain
rr, errno := f.Read(context.Background(), tmpbuf, offset)
Expand Down Expand Up @@ -459,7 +469,7 @@ func testNodeRead(t *testing.T, factory metadata.Store) {
}
}

func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression) (_ *file, closeFn func() error) {
func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metadata.Store, cl tutil.Compression, pth bool) (_ *file, closeFn func() error) {
testName := "test"
sr, tocDgst, err := tutil.BuildEStargz(
[]tutil.TarEntry{tutil.File(testName, string(contents))},
Expand All @@ -472,7 +482,7 @@ func makeNodeReader(t *testing.T, contents []byte, chunkSize int, factory metada
if err != nil {
t.Fatalf("failed to create reader: %v", err)
}
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache())
rootNode := getRootNode(t, r, OverlayOpaqueAll, tocDgst, cache.NewMemoryCache(), pth)
var eo fuse.EntryOut
inode, errno := rootNode.Lookup(context.Background(), testName, &eo)
if errno != 0 {
Expand Down Expand Up @@ -724,7 +734,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
defer r.Close()
mcache := cache.NewMemoryCache()
rootNode := getRootNode(t, r, opaque, tocDgst, mcache)
rootNode := getRootNode(t, r, opaque, tocDgst, mcache, false)
for _, want := range tt.want {
want(t, rootNode, mcache, testR)
}
Expand All @@ -733,7 +743,7 @@ func testNodesWithOpaque(t *testing.T, factory metadata.Store, opaque OverlayOpa
}
}

func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache) *node {
func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocDgst digest.Digest, cc cache.BlobCache, pth bool) *node {
vr, err := reader.NewReader(r, cc, digest.FromString(""))
if err != nil {
t.Fatalf("failed to create reader: %v", err)
Expand All @@ -742,7 +752,7 @@ func getRootNode(t *testing.T, r metadata.Reader, opaque OverlayOpaqueType, tocD
if err != nil {
t.Fatalf("failed to verify reader: %v", err)
}
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque)
rootNode, err := newNode(testStateLayerDigest, rr, &testBlobState{10, 5}, 100, opaque, pth)
if err != nil {
t.Fatalf("failed to get root node: %v", err)
}
Expand Down
Loading
Loading