Skip to content

Commit

Permalink
Gcs remote data (flyteorg#121)
Browse files Browse the repository at this point in the history
* signed GCS URL

Impersonate the signingPrincipal to get object attributes and sign the GCS URL

This will make sure only the `signingPrincipal` needs readonly permission on the
object instead of the service account associated with flyteadmin.
  • Loading branch information
honnix authored Sep 3, 2020
1 parent 48c93d6 commit 6e2c136
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 262 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/lyft/flyteadmin
go 1.13

require (
cloud.google.com/go v0.56.0
cloud.google.com/go/storage v1.6.0
github.com/NYTimes/gizmo v1.3.5
github.com/Selvatico/go-mocket v1.0.7
github.com/aws/aws-sdk-go v1.29.23
Expand All @@ -13,6 +15,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.5
github.com/googleapis/gax-go/v2 v2.0.5
github.com/gorilla/handlers v1.4.2
github.com/gorilla/securecookie v1.1.1
github.com/graymeta/stow v0.2.5
Expand All @@ -34,6 +37,8 @@ require (
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/tools v0.0.0-20200818005847-188abfa75333 // indirect
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940
google.golang.org/grpc v1.28.0
gopkg.in/gormigrate.v1 v1.6.0
k8s.io/api v0.17.3
Expand Down
268 changes: 9 additions & 259 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions pkg/data/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type RemoteDataHandlerConfig struct {
Retries int // Number of times to attempt to initialize a new config on failure.
Region string
SignedURLDurationMinutes int
SigningPrincipal string
RemoteDataStoreClient *storage.DataStore
}

Expand All @@ -45,6 +46,12 @@ func GetRemoteDataHandler(cfg RemoteDataHandlerConfig) RemoteDataHandler {
return &remoteDataHandler{
remoteURL: implementations.NewAWSRemoteURL(awsConfig, presignedURLDuration),
}
case common.GCP:
signedURLDuration := time.Minute * time.Duration(cfg.SignedURLDurationMinutes)
return &remoteDataHandler{
remoteURL: implementations.NewGCPRemoteURL(cfg.SigningPrincipal, signedURLDuration),
}

case common.Local:
logger.Infof(context.TODO(), "setting up local signer ----- ")
// Since minio = aws s3, we are creating the same client but using the config primitives from aws
Expand Down
6 changes: 3 additions & 3 deletions pkg/data/implementations/aws_remote_url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/stretchr/testify/assert"
)

func TestSplitURI(t *testing.T) {
func TestAWSSplitURI(t *testing.T) {
remoteURL := AWSRemoteURL{}
s3Object, err := remoteURL.splitURI(context.Background(), "s3://i/am/valid")
assert.Nil(t, err)
assert.Equal(t, "i", s3Object.bucket)
assert.Equal(t, "am/valid", s3Object.key)
}

func TestSplitURI_InvalidScheme(t *testing.T) {
func TestAWSSplitURI_InvalidScheme(t *testing.T) {
remoteURL := AWSRemoteURL{}
_, err := remoteURL.splitURI(context.Background(), "azure://i/am/invalid")
assert.NotNil(t, err)
Expand All @@ -46,7 +46,7 @@ func (m *mockS3Impl) GetObjectRequest(input *s3.GetObjectInput) (req *request.Re
return m.getObjectFunc(input)
}

func TestGet(t *testing.T) {
func TestAWSGet(t *testing.T) {
contentLength := int64(100)
presignDuration := 3 * time.Minute

Expand Down
193 changes: 193 additions & 0 deletions pkg/data/implementations/gcp_remote_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package implementations

import (
"context"
"time"

gax "github.com/googleapis/gax-go/v2"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"golang.org/x/oauth2"

credentials "cloud.google.com/go/iam/credentials/apiv1"
gcs "cloud.google.com/go/storage"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/lyft/flyteadmin/pkg/data/interfaces"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/storage"
"google.golang.org/api/option"
credentialspb "google.golang.org/genproto/googleapis/iam/credentials/v1"
"google.golang.org/grpc/codes"
)

const gcsScheme = "gs"

type iamCredentialsInterface interface {
SignBlob(ctx context.Context, req *credentialspb.SignBlobRequest, opts ...gax.CallOption) (*credentialspb.SignBlobResponse, error)
GenerateAccessToken(ctx context.Context, req *credentialspb.GenerateAccessTokenRequest, opts ...gax.CallOption) (*credentialspb.GenerateAccessTokenResponse, error)
}

type gcsClientWrapper struct {
delegate *gcs.Client
}

type bucketHandleWrapper struct {
delegate *gcs.BucketHandle
}

type objectHandleWrapper struct {
delegate *gcs.ObjectHandle
}

type gcsInterface interface {
Bucket(name string) bucketHandleInterface
}

type bucketHandleInterface interface {
Object(name string) objectHandleInterface
}

type objectHandleInterface interface {
Attrs(ctx context.Context) (attrs *gcs.ObjectAttrs, err error)
}

// GCP-specific implementation of RemoteURLInterface
type GCPRemoteURL struct {
iamCredentialsClient iamCredentialsInterface
gcsClient gcsInterface
signDuration time.Duration
signingPrincipal string
}

type GCPGCSObject struct {
bucket string
object string
}

type impersonationTokenSource struct {
iamCredentialsClient iamCredentialsInterface
signingPrincipal string
}

func (c *gcsClientWrapper) Bucket(name string) bucketHandleInterface {
return &bucketHandleWrapper{delegate: c.delegate.Bucket(name)}
}

func (b *bucketHandleWrapper) Object(name string) objectHandleInterface {
return &objectHandleWrapper{delegate: b.delegate.Object(name)}
}

func (o *objectHandleWrapper) Attrs(ctx context.Context) (attrs *gcs.ObjectAttrs, err error) {
return o.delegate.Attrs(ctx)
}

func (g *GCPRemoteURL) splitURI(ctx context.Context, uri string) (GCPGCSObject, error) {
scheme, container, key, err := storage.DataReference(uri).Split()
if err != nil {
return GCPGCSObject{}, err
}
if scheme != gcsScheme {
logger.Debugf(ctx, "encountered unexpected scheme: %s for GCS URI: %s", scheme, uri)
return GCPGCSObject{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"unexpected scheme %s for GCS URI", scheme)
}
return GCPGCSObject{
bucket: container,
object: key,
}, nil
}

func (g *GCPRemoteURL) signURL(ctx context.Context, gcsURI GCPGCSObject) (string, error) {
opts := &gcs.SignedURLOptions{
Method: "GET",
GoogleAccessID: g.signingPrincipal,
SignBytes: func(b []byte) ([]byte, error) {
req := &credentialspb.SignBlobRequest{
Payload: b,
Name: "projects/-/serviceAccounts/" + g.signingPrincipal,
}
resp, err := g.iamCredentialsClient.SignBlob(ctx, req)
if err != nil {
return nil, err
}
return resp.SignedBlob, nil
},
Expires: time.Now().Add(g.signDuration),
}

return gcs.SignedURL(gcsURI.bucket, gcsURI.object, opts)
}

func (g *GCPRemoteURL) Get(ctx context.Context, uri string) (admin.UrlBlob, error) {
logger.Debugf(ctx, "Getting signed url for - %s", uri)
gcsURI, err := g.splitURI(ctx, uri)
if err != nil {
logger.Debugf(ctx, "failed to extract gcs bucket and object from uri: %s", uri)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid uri: %s", uri)
}

// First, get the size of the url blob.
attrs, err := g.gcsClient.Bucket(gcsURI.bucket).Object(gcsURI.object).Attrs(ctx)
if err != nil {
logger.Debugf(ctx, "failed to get object size for %s with %v", uri, err)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(
codes.Internal, "failed to get object size for %s with %v", uri, err)
}

urlStr, err := g.signURL(ctx, gcsURI)
if err != nil {
logger.Warning(ctx,
"failed to presign url for uri [%s] for %v with err %v", uri, g.signDuration, err)
return admin.UrlBlob{}, errors.NewFlyteAdminErrorf(codes.Internal,
"failed to presign url for uri [%s] for %v with err %v", uri, g.signDuration, err)
}
return admin.UrlBlob{
Url: urlStr,
Bytes: attrs.Size,
}, nil
}

func (ts impersonationTokenSource) Token() (*oauth2.Token, error) {
req := credentialspb.GenerateAccessTokenRequest{
Name: "projects/-/serviceAccounts/" + ts.signingPrincipal,
Scope: []string{"https://www.googleapis.com/auth/devstorage.read_only"},
}

resp, err := ts.iamCredentialsClient.GenerateAccessToken(context.Background(), &req)
if err != nil {
return nil, err
}

return &oauth2.Token{
AccessToken: resp.AccessToken,
Expiry: asTime(resp.ExpireTime),
}, nil
}

func asTime(t *timestamp.Timestamp) time.Time {
return time.Unix(t.GetSeconds(), int64(t.GetNanos())).UTC()
}

func NewGCPRemoteURL(signingPrincipal string, signDuration time.Duration) interfaces.RemoteURLInterface {
iamCredentialsClient, err := credentials.NewIamCredentialsClient(context.Background())
if err != nil {
panic(err)
}

gcsClient, err := gcs.NewClient(context.Background(),
option.WithScopes(gcs.ScopeReadOnly),
option.WithTokenSource(impersonationTokenSource{
iamCredentialsClient: iamCredentialsClient,
signingPrincipal: signingPrincipal,
}))
if err != nil {
panic(err)
}

return &GCPRemoteURL{
iamCredentialsClient: iamCredentialsClient,
gcsClient: &gcsClientWrapper{delegate: gcsClient},
signDuration: signDuration,
signingPrincipal: signingPrincipal,
}
}
Loading

0 comments on commit 6e2c136

Please sign in to comment.