Skip to content

Commit

Permalink
[Go SDK] S3 implementation of the Beam filesystem (#23992)
Browse files Browse the repository at this point in the history
* Implement filesystem for S3

* Update CHANGES.md
  • Loading branch information
johannaojeling authored Nov 9, 2022
1 parent 73142ba commit 63362f5
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## I/Os

* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).

## New Features / Improvements

Expand Down
14 changes: 14 additions & 0 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ require (
cloud.google.com/go/profiler v0.3.0
cloud.google.com/go/pubsub v1.26.0
cloud.google.com/go/storage v1.28.0
github.com/aws/aws-sdk-go-v2 v1.7.1
github.com/aws/aws-sdk-go-v2/config v1.5.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1
github.com/docker/go-connections v0.4.0
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.6.0
Expand Down Expand Up @@ -67,6 +71,15 @@ require (
github.com/Microsoft/hcsshim v0.9.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.3.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0 // indirect
github.com/aws/smithy-go v1.6.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand All @@ -86,6 +99,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
Expand Down
15 changes: 15 additions & 0 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,31 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.7.1 h1:TswSc7KNqZ/K1Ijt3IkpXk/2+62vi3Q82Yrr5wSbRBQ=
github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250=
github.com/aws/aws-sdk-go-v2/config v1.5.0 h1:tRQcWXVmO7wC+ApwYc2LiYKfIBoIrdzcJ+7HIh6AlR0=
github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA=
github.com/aws/aws-sdk-go-v2/credentials v1.3.1 h1:fFeqL5+9kwFKsCb2oci5yAIDsWYqn/Nga8oQ5bIasI8=
github.com/aws/aws-sdk-go-v2/credentials v1.3.1/go.mod h1:r0n73xwsIVagq8RsxmZbGSRQFj9As3je72C2WzUIToc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0 h1:s4vtv3Mv1CisI3qm2HGHi1Ls9ZtbCOEqeQn6oz7fTyU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0/go.mod h1:2LAuqPx1I6jNfaGDucWfA2zqQCYCOMCDHiCOciALyNw=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2 h1:fzEMxnHQWh+bUV0ZzfhMbgUG8zjIPnAgApjtdHtC9Yg=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1 h1:SDLwr1NKyowP7uqxuLNdvFZhjnoVWxNv456zAp+ZFjU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1 h1:s/uV8UyMB4UcO0ERHxG9BJhYJAD9MiY0QeYvJmlC7PE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1/go.mod h1:v33JQ57i2nekYTA70Mb+O18KeH4KqhdqxTJZNK1zdRE=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1 h1:VJe/XEhrfyfBLupcGg1BfUSK2VMZNdbDcZQ49jnp+h0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1/go.mod h1:zceowr5Z1Nh2WVP8bf/3ikB41IZW59E4yIYbg+pC6mw=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1 h1:1ds3HkMQEBx9XvOkqsPuqBmNFn0w8XEDuB4LOi6KepU=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.5.1/go.mod h1:6EQZIwNNvHpq/2/QSJnp4+ECvqIy55w95Ofs0ze+nGQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1 h1:HiXhafnqG0AkVJIZA/BHhFvuc/8xFdUO1uaeqF2Artc=
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32yTv8GpBquCApZEycDLunI=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1 h1:H2ZLWHUbbeYtghuqCY5s/7tbBM99PAwCioRJF8QvV/U=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0 h1:Y9r6mrzOyAYz4qKaluSH19zqH1236il/nGbsPKOUT0s=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg=
github.com/aws/smithy-go v1.6.0 h1:T6puApfBcYiTIsaI+SYWqanjMt5pc3aoyyDrI+0YH54=
github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -661,7 +674,9 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joefitzgerald/rainbow-reporter v0.1.0/go.mod h1:481CNgqmVHQZzdIbN52CupLJyoVwB10FQ/IQlF1pdL8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/io/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ import (

var registry = make(map[string]func(context.Context) Interface)

// wellKnownSchemeImportPaths is used for deliverng useful error messages when a
// wellKnownSchemeImportPaths is used for delivering useful error messages when a
// scheme is not found.
var wellKnownSchemeImportPaths = map[string]string{
"memfs": "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/memfs",
"default": "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local",
"gs": "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs",
"s3": "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/s3",
}

// Register registers a file system backend under the given scheme. For
Expand Down
208 changes: 208 additions & 0 deletions sdks/go/pkg/beam/io/filesystem/s3/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package s3 contains an AWS S3 implementation of the Beam file system.
package s3

import (
"context"
"fmt"
"io"
"path/filepath"

"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func init() {
filesystem.Register("s3", New)
}

type fs struct {
client *s3.Client
}

// New creates a new S3 filesystem using AWS default configuration sources.
func New(ctx context.Context) filesystem.Interface {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
panic(fmt.Sprintf("error loading AWS config: %v", err))
}

client := s3.NewFromConfig(cfg)
return &fs{client: client}
}

// Close closes the filesystem.
func (f *fs) Close() error {
return nil
}

// List returns a slice of the files in the filesystem that match the glob pattern.
func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
bucket, keyPattern, err := parseUri(glob)
if err != nil {
return nil, fmt.Errorf("error parsing S3 uri: %v", err)
}

keys, err := f.listObjectKeys(ctx, bucket, keyPattern)
if err != nil {
return nil, fmt.Errorf("error listing object keys: %v", err)
}

uris := make([]string, len(keys))
for i, key := range keys {
uris[i] = makeUri(bucket, key)
}

return uris, nil
}

// listObjectKeys returns a slice of the keys in the bucket that match the key pattern.
func (f *fs) listObjectKeys(
ctx context.Context,
bucket string,
keyPattern string,
) ([]string, error) {
prefix := getPrefix(keyPattern)
params := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}
paginator := s3.NewListObjectsV2Paginator(f.client, params)

var objects []string
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("error retrieving page: %v", err)
}

for _, object := range output.Contents {
key := aws.ToString(object.Key)
match, err := filepath.Match(keyPattern, key)
if err != nil {
return nil, fmt.Errorf("invalid key pattern: %s", keyPattern)
}

if match {
objects = append(objects, key)
}
}
}

return objects, nil
}

// OpenRead returns a new io.ReadCloser to read contents from the file. The caller must call Close
// on the returned io.ReadCloser when done reading.
func (f *fs) OpenRead(ctx context.Context, filename string) (io.ReadCloser, error) {
bucket, key, err := parseUri(filename)
if err != nil {
return nil, fmt.Errorf("error parsing S3 uri %s: %v", filename, err)
}

params := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
output, err := f.client.GetObject(ctx, params)
if err != nil {
return nil, fmt.Errorf("error getting object %s: %v", filename, err)
}

return output.Body, nil
}

// OpenWrite returns a new io.WriteCloser to write contents to the file. The caller must call Close
// on the returned io.WriteCloser when done writing.
func (f *fs) OpenWrite(ctx context.Context, filename string) (io.WriteCloser, error) {
bucket, key, err := parseUri(filename)
if err != nil {
return nil, fmt.Errorf("error parsing S3 uri %s: %v", filename, err)
}

return newWriter(ctx, f.client, bucket, key), nil
}

// Size returns the size of the file.
func (f *fs) Size(ctx context.Context, filename string) (int64, error) {
bucket, key, err := parseUri(filename)
if err != nil {
return -1, fmt.Errorf("error parsing S3 uri %s: %v", filename, err)
}

params := &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
output, err := f.client.HeadObject(ctx, params)
if err != nil {
return -1, fmt.Errorf("error getting metadata for object %s: %v", filename, err)
}

return output.ContentLength, err
}

// Remove removes the file from the filesystem.
func (f *fs) Remove(ctx context.Context, filename string) error {
bucket, key, err := parseUri(filename)
if err != nil {
return fmt.Errorf("error parsing S3 uri %s: %v", filename, err)
}

params := &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
if _, err = f.client.DeleteObject(ctx, params); err != nil {
return fmt.Errorf("error deleting object %s: %v", filename, err)
}

return nil
}

// Copy copies the file from the old path to the new path.
func (f *fs) Copy(ctx context.Context, oldpath, newpath string) error {
sourceBucket, sourceKey, err := parseUri(oldpath)
if err != nil {
return fmt.Errorf("error parsing S3 source uri %s: %v", oldpath, err)
}

copySource := fmt.Sprintf("%s/%s", sourceBucket, sourceKey)
destBucket, destKey, err := parseUri(newpath)
if err != nil {
return fmt.Errorf("error parsing S3 destination uri %s: %v", newpath, err)
}

params := &s3.CopyObjectInput{
Bucket: aws.String(destBucket),
CopySource: aws.String(copySource),
Key: aws.String(destKey),
}
if _, err = f.client.CopyObject(ctx, params); err != nil {
return fmt.Errorf("error copying object %s: %v", oldpath, err)
}

return nil
}

// Compile time check for interface implementations.
var (
_ filesystem.Remover = (*fs)(nil)
_ filesystem.Copier = (*fs)(nil)
)
60 changes: 60 additions & 0 deletions sdks/go/pkg/beam/io/filesystem/s3/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3

import (
"errors"
"fmt"
"net/url"
"strings"
)

// parseUri deconstructs the S3 uri in the format 's3://bucket/key' to (bucket, key)
func parseUri(uri string) (string, string, error) {
parsed, err := url.Parse(uri)
if err != nil {
return "", "", err
}

if parsed.Scheme != "s3" {
return "", "", errors.New("scheme must be 's3'")
}

bucket := parsed.Host
if bucket == "" {
return "", "", errors.New("bucket must not be empty")
}

var key string
if parsed.Path != "" {
key = parsed.Path[1:]
}

return bucket, key, nil
}

// makeUri constructs an S3 uri from the bucket and key to the format 's3://bucket/key'
func makeUri(bucket string, key string) string {
return fmt.Sprintf("s3://%s/%s", bucket, key)
}

// getPrefix returns the prefix of the key pattern before the first wildcard, if any
func getPrefix(keyPattern string) string {
if index := strings.Index(keyPattern, "*"); index >= 0 {
return keyPattern[:index]
}
return keyPattern
}
Loading

0 comments on commit 63362f5

Please sign in to comment.