-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FUSE Passthrough Support in Stargz-Snapshotter #1867 #1868
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,7 @@ type BlobCache interface { | |
type Reader interface { | ||
io.ReaderAt | ||
Close() error | ||
GetReaderAt() io.ReaderAt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When FUSE passthrough is enabled, we should always set |
||
} | ||
|
||
// Writer enables the client to cache byte data. Commit() must be | ||
|
@@ -414,6 +415,10 @@ type reader struct { | |
|
||
func (r *reader) Close() error { return r.closeFunc() } | ||
|
||
func (r *reader) GetReaderAt() io.ReaderAt { | ||
return r.ReaderAt | ||
} | ||
|
||
type writer struct { | ||
io.WriteCloser | ||
commitFunc func() error | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package main | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"flag" | ||
"fmt" | ||
|
@@ -26,8 +27,10 @@ import ( | |
"net" | ||
"net/http" | ||
"os" | ||
"os/exec" | ||
"os/signal" | ||
"path/filepath" | ||
"strings" | ||
"time" | ||
|
||
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1" | ||
|
@@ -132,6 +135,11 @@ func main() { | |
// Create a gRPC server | ||
rpc := grpc.NewServer() | ||
|
||
// Configure FUSE passthrough | ||
if config.Config.Config.FuseConfig.PassThrough, err = isFusePthEnable(); err != nil { | ||
log.G(ctx).Warnf("failed to check FUSE passthrough support") | ||
} | ||
|
||
// Configure keychain | ||
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)} | ||
if config.Config.KubeconfigKeychainConfig.EnableKeychain { | ||
|
@@ -185,6 +193,18 @@ func main() { | |
log.G(ctx).Info("Exiting") | ||
} | ||
|
||
// isFusePthEnable prevents users from enabling passthrough mode on unsupported kernel versions | ||
func isFusePthEnable() (bool, error) { | ||
cmd := exec.Command("sh", "-c", "grep 'CONFIG_FUSE_PASSTHROUGH=y' /boot/config-$(uname -r)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Let's try not to rely on the shell commands for now. Instead of having this check, let's put a document about how to check if passthrough is a supported on the node (maybe in the following PRs) |
||
var out bytes.Buffer | ||
cmd.Stdout = &out | ||
cmd.Stderr = &out | ||
if err := cmd.Run(); err != nil { | ||
return false, err | ||
} | ||
return strings.Contains(out.String(), "CONFIG_FUSE_PASSTHROUGH=y"), nil | ||
} | ||
|
||
func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snapshotter, config snapshotterConfig) (bool, error) { | ||
// Convert the snapshotter to a gRPC service, | ||
snsvc := snapshotservice.FromSnapshotter(rs) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,4 +148,7 @@ type FuseConfig struct { | |
|
||
// EntryTimeout defines TTL for directory, name lookup in seconds. | ||
EntryTimeout int64 `toml:"entry_timeout"` | ||
|
||
// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false. | ||
PassThrough bool `toml:"passthrough"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we test this in our CI? |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,7 +76,7 @@ var opaqueXattrs = map[OverlayOpaqueType][]string{ | |
OverlayOpaqueUser: {"user.overlay.opaque"}, | ||
} | ||
|
||
func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType) (fusefs.InodeEmbedder, error) { | ||
func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseInode uint32, opaque OverlayOpaqueType, pth bool) (fusefs.InodeEmbedder, error) { | ||
rootID := r.Metadata().RootID() | ||
rootAttr, err := r.Metadata().GetAttr(rootID) | ||
if err != nil { | ||
|
@@ -92,6 +92,7 @@ func newNode(layerDgst digest.Digest, r reader.Reader, blob remote.Blob, baseIno | |
baseInode: baseInode, | ||
rootID: rootID, | ||
opaqueXattrs: opq, | ||
passThrough: pth, | ||
} | ||
ffs.s = ffs.newState(layerDgst, blob) | ||
return &node{ | ||
|
@@ -109,6 +110,7 @@ type fs struct { | |
baseInode uint32 | ||
rootID uint32 | ||
opaqueXattrs []string | ||
passThrough bool | ||
} | ||
|
||
func (fs *fs) inodeOfState() uint64 { | ||
|
@@ -344,10 +346,24 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fu | |
n.fs.s.report(fmt.Errorf("node.Open: %v", err)) | ||
return nil, 0, syscall.EIO | ||
} | ||
return &file{ | ||
|
||
f := &file{ | ||
n: n, | ||
ra: ra, | ||
}, fuse.FOPEN_KEEP_CACHE, 0 | ||
} | ||
|
||
if n.fs.passThrough { | ||
if getter, ok := ra.(reader.PassthroughFdGetter); ok { | ||
fd, err := getter.GetPassthroughFd() | ||
if err != nil { | ||
n.fs.s.report(fmt.Errorf("node.Open: %v", err)) | ||
return nil, 0, syscall.EIO | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we continue opening this as a non-passthrough file, instead of returning EIO? |
||
} | ||
f.InitFd(int(fd)) | ||
} | ||
} | ||
|
||
return f, fuse.FOPEN_KEEP_CACHE, 0 | ||
} | ||
|
||
var _ = (fusefs.NodeGetattrer)((*node)(nil)) | ||
|
@@ -424,6 +440,7 @@ func (n *node) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno { | |
type file struct { | ||
n *node | ||
ra io.ReaderAt | ||
fd int | ||
} | ||
|
||
var _ = (fusefs.FileReader)((*file)(nil)) | ||
|
@@ -451,6 +468,17 @@ func (f *file) Getattr(ctx context.Context, out *fuse.AttrOut) syscall.Errno { | |
return 0 | ||
} | ||
|
||
// Implement PassthroughFd to enable go-fuse passthrough | ||
var _ = (fusefs.FilePassthroughFder)((*file)(nil)) | ||
|
||
func (f *file) PassthroughFd() (int, bool) { | ||
return f.fd, true | ||
} | ||
|
||
func (f *file) InitFd(fd int) { | ||
f.fd = fd | ||
} | ||
|
||
// whiteout is a whiteout abstraction compliant to overlayfs. | ||
type whiteout struct { | ||
fusefs.Inode | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,10 @@ | |
LastOnDemandReadTime() time.Time | ||
} | ||
|
||
type PassthroughFdGetter interface { | ||
GetPassthroughFd() (uintptr, error) | ||
} | ||
|
||
// VerifiableReader produces a Reader with a given verifier. | ||
type VerifiableReader struct { | ||
r *reader | ||
|
@@ -490,18 +494,101 @@ | |
return nr, nil | ||
} | ||
|
||
func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr string, cacheID string) error { | ||
func (sf *file) GetPassthroughFd() (uintptr, error) { | ||
var ( | ||
offset int64 = 0 | ||
firstChunkOffset int64 = -1 | ||
totalSize int64 = 0 | ||
) | ||
|
||
for { | ||
chunkOffset, chunkSize, _, ok := sf.fr.ChunkEntryForOffset(offset) | ||
if !ok { | ||
break | ||
} | ||
if firstChunkOffset == -1 { | ||
firstChunkOffset = chunkOffset | ||
} | ||
totalSize += chunkSize | ||
offset = chunkOffset + chunkSize | ||
} | ||
|
||
id := genID(sf.id, firstChunkOffset, totalSize) | ||
|
||
for { | ||
r, err := sf.gr.cache.Get(id) | ||
if err != nil { | ||
if err := sf.prefetchEntireFile(); err != nil { | ||
return 0, err | ||
} | ||
continue | ||
} | ||
|
||
readerAt := r.GetReaderAt() | ||
if file, ok := readerAt.(*os.File); ok { | ||
fd := file.Fd() | ||
r.Close() | ||
return fd, nil | ||
} else { | ||
r.Close() | ||
return 0, fmt.Errorf("The cached ReaderAt is not of type *os.File, fd obtain failed") | ||
} | ||
} | ||
} | ||
|
||
func (sf *file) prefetchEntireFile() error { | ||
var ( | ||
offset int64 = 0 | ||
firstChunkOffset int64 = -1 | ||
totalSize int64 = 0 | ||
) | ||
combinedBuffer := sf.gr.bufPool.Get().(*bytes.Buffer) | ||
combinedBuffer.Reset() | ||
defer sf.gr.putBuffer(combinedBuffer) | ||
|
||
for { | ||
chunkOffset, chunkSize, chunkDigestStr, ok := sf.fr.ChunkEntryForOffset(offset) | ||
if !ok { | ||
break | ||
} | ||
if firstChunkOffset == -1 { | ||
firstChunkOffset = chunkOffset | ||
} | ||
b := sf.gr.bufPool.Get().(*bytes.Buffer) | ||
b.Reset() | ||
b.Grow(int(chunkSize)) | ||
ip := b.Bytes()[:chunkSize] | ||
if _, err := sf.fr.ReadAt(ip, chunkOffset); err != nil && err != io.EOF { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we check if the data can be read from the cache? |
||
sf.gr.putBuffer(b) | ||
return fmt.Errorf("failed to read data: %w", err) | ||
} | ||
if err := sf.gr.verifyOneChunk(sf.id, ip, chunkDigestStr); err != nil { | ||
sf.gr.putBuffer(b) | ||
return err | ||
} | ||
combinedBuffer.Write(ip) | ||
totalSize += chunkSize | ||
offset = chunkOffset + chunkSize | ||
sf.gr.putBuffer(b) | ||
} | ||
combinedIp := combinedBuffer.Bytes() | ||
id := genID(sf.id, firstChunkOffset, totalSize) | ||
sf.gr.cacheData(combinedIp, id) | ||
return nil | ||
} | ||
|
||
func (gr *reader) verifyOneChunk(entryID uint32, ip []byte, chunkDigestStr string) error { | ||
// We can end up doing on demand registry fetch when aligning the chunk | ||
commonmetrics.IncOperationCount(commonmetrics.OnDemandRemoteRegistryFetchCount, gr.layerSha) // increment the number of on demand file fetches from remote registry | ||
commonmetrics.AddBytesCount(commonmetrics.OnDemandBytesFetched, gr.layerSha, int64(len(ip))) // record total bytes fetched | ||
commonmetrics.IncOperationCount(commonmetrics.OnDemandRemoteRegistryFetchCount, gr.layerSha) | ||
commonmetrics.AddBytesCount(commonmetrics.OnDemandBytesFetched, gr.layerSha, int64(len(ip))) | ||
gr.setLastReadTime(time.Now()) | ||
|
||
// Verify this chunk | ||
if err := gr.verifyChunk(entryID, ip, chunkDigestStr); err != nil { | ||
return fmt.Errorf("invalid chunk: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
// Cache this chunk | ||
func (gr *reader) cacheData(ip []byte, cacheID string) { | ||
if w, err := gr.cache.Add(cacheID); err == nil { | ||
if cn, err := w.Write(ip); err != nil || cn != len(ip) { | ||
w.Abort() | ||
|
@@ -510,7 +597,13 @@ | |
} | ||
w.Close() | ||
} | ||
} | ||
|
||
func (gr *reader) verifyAndCache(entryID uint32, ip []byte, chunkDigestStr string, cacheID string) error { | ||
if err := gr.verifyOneChunk(entryID, ip, chunkDigestStr); err != nil { | ||
return err | ||
} | ||
gr.cacheData(ip, cacheID) | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment like
If a blob is backed by a file, it should return *os.File so that it can be used for FUSE passthrough.
?