Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
add s3 permission check and deprecated skip-check-path
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Apr 20, 2021
1 parent 667b999 commit c16890a
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 69 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ error = '''
invalid external storage config
'''

["BR:ExternalStorage:ErrStorageInvalidPermission"]
error = '''
external storage permission
'''

["BR:ExternalStorage:ErrStorageUnknown"]
error = '''
unknown external storage error
Expand Down
5 changes: 3 additions & 2 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ var (

ErrPiTRInvalidCDCLogFormat = errors.Normalize("invalid cdc log format", errors.RFCCodeText("BR:PiTR:ErrPiTRInvalidCDCLogFormat"))

ErrStorageUnknown = errors.Normalize("unknown external storage error", errors.RFCCodeText("BR:ExternalStorage:ErrStorageUnknown"))
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))
ErrStorageUnknown = errors.Normalize("unknown external storage error", errors.RFCCodeText("BR:ExternalStorage:ErrStorageUnknown"))
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))
ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission"))

// Errors reported from TiKV.
ErrKVUnknown = errors.Normalize("unknown tikv error", errors.RFCCodeText("BR:KV:ErrKVUnknown"))
Expand Down
6 changes: 1 addition & 5 deletions pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
if err != nil {
return errors.Annotate(err, "parse backend failed")
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{
// we skip check path in favor of delaying the error to when we actually access the file.
// on S3, performing "check path" requires the additional "s3:ListBucket" permission.
SkipCheckPath: true,
})
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return errors.Annotate(err, "create storage failed")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewMyDumpLoader(ctx context.Context, cfg *config.Config) (*MDLoader, error)
if err != nil {
return nil, errors.Trace(err)
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{SkipCheckPath: true})
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
)

const (
pdWriteFlow = "/pd/api/v1/regions/writeflow"
pdReadFlow = "/pd/api/v1/regions/readflow"
pdWriteFlow = "/pd/api/v1/regions/writeflow"
pdReadFlow = "/pd/api/v1/regions/readflow"

// OnlineBytesLimitation/OnlineKeysLimitation is the statistics of
// Bytes/Keys used per region from pdWriteFlow/pdReadFlow
Expand Down Expand Up @@ -199,9 +199,15 @@ func (rc *Controller) StoragePermission(ctx context.Context) error {
if err != nil {
return errors.Annotate(err, "parse backend failed")
}
s3 := u.GetS3()
if s3 != nil {
// TODO finish s3 permission check
_, err = storage.New(ctx, u, &storage.ExternalStorageOptions{
CheckPermissions: []storage.Permission{
storage.ListObjects,
storage.GetObject,
},
})
if err != nil {
passed = false
message = err.Error()
}
return nil
}
Expand Down
34 changes: 16 additions & 18 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,13 +1425,13 @@ func (s *restoreSchemaSuite) TestRestoreSchemaContextCancel(c *C) {
}

func (s *tableRestoreSuite) TestCheckClusterIsOnline(c *C) {
cases := []struct{
cases := []struct {
mockHttpResponse []byte
expectMsg string
expectResult bool
expectWarnCount int
expectMsg string
expectResult bool
expectWarnCount int
expectErrorCount int
} {
}{
{
[]byte(`{
"count": 1,
Expand Down Expand Up @@ -1502,13 +1502,13 @@ func (s *tableRestoreSuite) TestCheckClusterIsOnline(c *C) {
}

func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
cases := []struct{
mockStoreResponse []byte
cases := []struct {
mockStoreResponse []byte
mockReplicaResponse []byte
expectMsg string
expectResult bool
expectErrorCount int
} {
expectMsg string
expectResult bool
expectErrorCount int
}{
{
[]byte(`{
"count": 1,
Expand Down Expand Up @@ -1551,7 +1551,6 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
false,
1,
},

}

ctx := context.Background()
Expand Down Expand Up @@ -1595,13 +1594,12 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
}

func (s *tableRestoreSuite) TestCheckClusterAvailable(c *C) {

cases := []struct{
cases := []struct {
checkRequirements bool
expectMsg string
expectResult bool
expectErrorCount int
} {
expectMsg string
expectResult bool
expectErrorCount int
}{
{
false,
"(.*)Cluster's available check is skipped by user requirement(.*)",
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func newGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
// so we need find sst in slash directory
gcs.Prefix += "//"
}
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
// check bucket exists
_, err = bucket.Attrs(ctx)
Expand Down
42 changes: 21 additions & 21 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func (r *testStorageSuite) TestGCS(c *C) {
CredentialsBlob: "Fake Credentials",
}
stg, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)

Expand Down Expand Up @@ -82,9 +82,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "FakeCredentials",
}
_, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: true,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "FakeCredentials")
Expand All @@ -99,9 +99,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "FakeCredentials",
}
_, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "")
Expand All @@ -128,9 +128,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "",
}
_, err = newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: true,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, `{"type": "service_account"}`)
Expand All @@ -157,9 +157,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "",
}
s, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "")
Expand All @@ -176,9 +176,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "",
}
_, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: true,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, NotNil)
}
Expand All @@ -192,9 +192,9 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) {
CredentialsBlob: "FakeCredentials",
}
s, err := newGCSStorage(ctx, gcs, &ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: false,
HTTPClient: server.HTTPClient(),
SendCredentials: false,
CheckPermissions: []Permission{AccessBuckets},
HTTPClient: server.HTTPClient(),
})
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "")
Expand Down
58 changes: 53 additions & 5 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ const (
hardcodedS3ChunkSize = 5 * 1024 * 1024
)

var permissionCheckFn = map[Permission]func(*s3.S3, *backuppb.S3) error{
AccessBuckets: checkS3Bucket,
ListObjects: listObjects,
GetObject: getObject,
}

// S3Storage info for s3 storage.
type S3Storage struct {
session *session.Session
Expand Down Expand Up @@ -228,8 +234,8 @@ func NewS3Storage( // revive:disable-line:flag-parameter
sendCredential bool,
) (*S3Storage, error) {
return newS3Storage(backend, &ExternalStorageOptions{
SendCredentials: sendCredential,
SkipCheckPath: false,
SendCredentials: sendCredential,
CheckPermissions: []Permission{AccessBuckets},
})
}

Expand Down Expand Up @@ -277,16 +283,25 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storag
}

c := s3.New(ses)
// TODO remove it after BR remove cfg skip-check-path
if !opts.SkipCheckPath {
err = checkS3Bucket(c, qs.Bucket)
err = checkS3Bucket(c, &qs)
if err != nil {
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err)
}
}

if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") {
qs.Prefix += "/"
}

for _, p := range opts.CheckPermissions {
err := permissionCheckFn[p](c, &qs)
if err != nil {
return nil, errors.Annotatef(berrors.ErrStorageInvalidPermission, "check permission %s failed due to %v", p, err)
}
}

return &S3Storage{
session: ses,
svc: c,
Expand All @@ -295,14 +310,47 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storag
}

// checkBucket checks if a bucket exists.
func checkS3Bucket(svc *s3.S3, bucket string) error {
func checkS3Bucket(svc *s3.S3, qs *backuppb.S3) error {
input := &s3.HeadBucketInput{
Bucket: aws.String(bucket),
Bucket: aws.String(qs.Bucket),
}
_, err := svc.HeadBucket(input)
return errors.Trace(err)
}

// listObjects checks the permission of listObjects
func listObjects(svc *s3.S3, qs *backuppb.S3) error {
input := &s3.ListObjectsInput{
Bucket: aws.String(qs.Bucket),
Prefix: aws.String(qs.Prefix),
MaxKeys: aws.Int64(1),
}
_, err := svc.ListObjects(input)
if err != nil {
return errors.Trace(err)
}
return nil
}

// getObject checks the permission of getObject
func getObject(svc *s3.S3, qs *backuppb.S3) error {
input := &s3.GetObjectInput{
Bucket: aws.String(qs.Bucket),
Key: aws.String("not-exists"),
}
_, err := svc.GetObject(input)
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == "NoSuchKey" {
// if key not exists and we reach this error, that
// means we have the correct permission to GetObject
// other we will get another error
return nil
}
return errors.Trace(err)
}
return nil
}

// WriteFile writes data to a file to storage.
func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error {
input := &s3.PutObjectInput{
Expand Down
Loading

0 comments on commit c16890a

Please sign in to comment.