Skip to content

Commit

Permalink
Merge #58384
Browse files Browse the repository at this point in the history
58384: cloudimpl: add ReadFileAt to ExternalStorage API r=dt a=dt

This adds `ReadFileAt` to the external storage API.

`ReadFileAt` returns a Reader, like ReadFile, but starting from the specified offset.

Unlike `ReadFile`, `ReadFileAt` also returns the total size of the file. This is added as callers reading at offsets may often need to know the total file size, for example to know the offset at which to read the footer of an sstable. Most remote storage providers include this information in their object fetch responses already, so passing it along to them could allow them to avoid making a separate RPC in some cases.

Support is added for each storage provider in individual commits, and then to the API in the last commit. A follow-up change will then use this API to implement pebble's vfs.File API.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Jan 1, 2021
2 parents d7bbe00 + 5f1b7d1 commit 7d60e44
Show file tree
Hide file tree
Showing 21 changed files with 373 additions and 219 deletions.
2 changes: 1 addition & 1 deletion pkg/blobs/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func benchmarkStreamingReadFile(b *testing.B, tc *benchmarkTestCase) {
b.ResetTimer()
b.SetBytes(tc.fileSize)
for i := 0; i < b.N; i++ {
reader, err := tc.blobClient.ReadFile(context.Background(), tc.fileName)
reader, _, err := tc.blobClient.ReadFile(context.Background(), tc.fileName, 0)
if err != nil {
b.Fatal(err)
}
Expand Down
115 changes: 72 additions & 43 deletions pkg/blobs/blobspb/blobs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/blobs/blobspb/blobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import "gogoproto/gogo.proto";
// an absolute path, which must be contained in external IO dir.
message GetRequest {
string filename = 1;
int64 offset = 2;
}

// GetResponse returns contents of the file requested by GetRequest.
Expand Down
21 changes: 13 additions & 8 deletions pkg/blobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type BlobClient interface {
// ReadFile fetches the named payload from the requested node,
// and stores it in memory. It then returns an io.ReadCloser to
// read the contents.
ReadFile(ctx context.Context, file string) (io.ReadCloser, error)
ReadFile(ctx context.Context, file string, offset int64) (io.ReadCloser, int64, error)

// WriteFile sends the named payload to the requested node.
// This method will read entire content of file and send
Expand Down Expand Up @@ -60,16 +60,19 @@ func newRemoteClient(blobClient blobspb.BlobClient) BlobClient {
return &remoteClient{blobClient: blobClient}
}

func (c *remoteClient) ReadFile(ctx context.Context, file string) (io.ReadCloser, error) {
// Check that file exists before reading from it
_, err := c.Stat(ctx, file)
func (c *remoteClient) ReadFile(
ctx context.Context, file string, offset int64,
) (io.ReadCloser, int64, error) {
// Check that file exists before reading from it and get size to return.
st, err := c.Stat(ctx, file)
if err != nil {
return nil, err
return nil, 0, err
}
stream, err := c.blobClient.GetStream(ctx, &blobspb.GetRequest{
Filename: file,
Offset: offset,
})
return newGetStreamReader(stream), errors.Wrap(err, "fetching file")
return newGetStreamReader(stream), st.Filesize, errors.Wrap(err, "fetching file")
}

func (c *remoteClient) WriteFile(
Expand Down Expand Up @@ -134,8 +137,10 @@ func newLocalClient(externalIODir string) (BlobClient, error) {
return &localClient{localStorage: storage}, nil
}

func (c *localClient) ReadFile(ctx context.Context, file string) (io.ReadCloser, error) {
return c.localStorage.ReadFile(file)
func (c *localClient) ReadFile(
ctx context.Context, file string, offset int64,
) (io.ReadCloser, int64, error) {
return c.localStorage.ReadFile(file, offset)
}

func (c *localClient) WriteFile(ctx context.Context, file string, content io.ReadSeeker) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestBlobClientReadFile(t *testing.T) {
if err != nil {
t.Fatal(err)
}
reader, err := blobClient.ReadFile(ctx, tc.filename)
reader, _, err := blobClient.ReadFile(ctx, tc.filename, 0)
if err != nil {
if testutils.IsError(err, tc.err) {
// correct error was returned
Expand Down
21 changes: 15 additions & 6 deletions pkg/blobs/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,16 @@ func (l *LocalStorage) WriteFile(filename string, content io.Reader) (err error)
}

// ReadFile prepends IO dir to filename and reads the content of that local file.
func (l *LocalStorage) ReadFile(filename string) (res io.ReadCloser, err error) {
func (l *LocalStorage) ReadFile(
filename string, offset int64,
) (res io.ReadCloser, size int64, err error) {
fullPath, err := l.prependExternalIODir(filename)
if err != nil {
return nil, err
return nil, 0, err
}
f, err := os.Open(fullPath)
if err != nil {
return nil, err
return nil, 0, err
}
defer func() {
if err != nil {
Expand All @@ -145,12 +147,19 @@ func (l *LocalStorage) ReadFile(filename string) (res io.ReadCloser, err error)
}()
fi, err := f.Stat()
if err != nil {
return nil, err
return nil, 0, err
}
if fi.IsDir() {
return nil, errors.Errorf("expected a file but %q is a directory", fi.Name())
return nil, 0, errors.Errorf("expected a file but %q is a directory", fi.Name())
}
if offset != 0 {
if ret, err := f.Seek(offset, 0); err != nil {
return nil, 0, err
} else if ret != offset {
return nil, 0, errors.Errorf("seek to offset %d returned %d", offset, ret)
}
}
return f, nil
return f, fi.Size(), nil
}

// List prepends IO dir to pattern and glob matches all local files against that pattern.
Expand Down
2 changes: 1 addition & 1 deletion pkg/blobs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewBlobService(externalIODir string) (*Service, error) {

// GetStream implements the gRPC service.
func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStreamServer) error {
content, err := s.localStorage.ReadFile(req.Filename)
content, _, err := s.localStorage.ReadFile(req.Filename, req.Offset)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func (es *generatorExternalStorage) ReadFile(
return es.gen.Open()
}

func (es *generatorExternalStorage) ReadFileAt(
ctx context.Context, basename string, offset int64,
) (io.ReadCloser, int64, error) {
panic("unimplemented")
}

func (es *generatorExternalStorage) Close() error {
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ type ExternalStorage interface {
// ExternalStorage implementation.
Settings() *cluster.Settings

// ReadFile should return a Reader for requested name.
// ReadFile is shorthand for ReadFileAt with offset 0.
ReadFile(ctx context.Context, basename string) (io.ReadCloser, error)

// ReadFileAt returns a Reader for requested name reading at offset.
// ErrFileDoesNotExist is raised if `basename` cannot be located in storage.
// This can be leveraged for an existence check.
ReadFile(ctx context.Context, basename string) (io.ReadCloser, error)
ReadFileAt(ctx context.Context, basename string, offset int64) (io.ReadCloser, int64, error)

// WriteFile should write the content to requested name.
WriteFile(ctx context.Context, basename string, content io.ReadSeeker) error
Expand Down
Loading

0 comments on commit 7d60e44

Please sign in to comment.