From 9b8123faf84ea77f21da33cd1847ef3ed64647c7 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 9 Sep 2022 20:10:18 -0700 Subject: [PATCH 1/3] #4317: fixed s3 KeyNotFound translation to ErrNotExist --- support/storage/http.go | 2 +- support/storage/s3.go | 46 ++++++++++++++++++++++++++++++-------- support/storage/s3_test.go | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/support/storage/http.go b/support/storage/http.go index ed68d49bd0..87e9221e6c 100644 --- a/support/storage/http.go +++ b/support/storage/http.go @@ -30,7 +30,7 @@ func checkResp(r *http.Response) error { if r.StatusCode >= 200 && r.StatusCode < 400 { return nil } else { - return fmt.Errorf("Bad HTTP response '%s' for %s '%s'", + return fmt.Errorf("bad HTTP response '%s' for %s '%s'", r.Status, r.Request.Method, r.Request.URL.String()) } } diff --git a/support/storage/s3.go b/support/storage/s3.go index ac7b13fa80..b7ddb8029d 100644 --- a/support/storage/s3.go +++ b/support/storage/s3.go @@ -5,17 +5,40 @@ import ( "context" "io" "net/http" + "os" "path" log "github.com/sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/stellar/go/support/errors" ) +type s3HttpProxy interface { + Send(*s3.GetObjectInput) (io.ReadCloser, error) +} + +type defaultS3HttpProxy struct { + *S3Storage +} + +func (proxy *defaultS3HttpProxy) Send(params *s3.GetObjectInput) (io.ReadCloser, error) { + req, resp := proxy.svc.GetObjectRequest(params) + if proxy.unsignedRequests { + req.Handlers.Sign.Clear() // makes this request unsigned + } + req.SetContext(proxy.ctx) + logReq(req.HTTPRequest) + err := req.Send() + logResp(req.HTTPResponse) + + return resp.Body, err +} + type S3Storage struct { ctx context.Context svc s3iface.S3API @@ -23,6 +46,7 @@ type S3Storage struct { prefix string unsignedRequests bool writeACLrule string + s3Http s3HttpProxy } func NewS3Storage( @@ -67,19 +91,16 @@ func (b *S3Storage) GetFile(pth string) (io.ReadCloser, error) { Key: aws.String(key), } - req, resp := b.svc.GetObjectRequest(params) - if b.unsignedRequests { - req.Handlers.Sign.Clear() // makes this request unsigned - } - req.SetContext(b.ctx) - logReq(req.HTTPRequest) - err := req.Send() - logResp(req.HTTPResponse) + resp, err := b.s3HttpProxy().Send(params) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchKey { + return nil, os.ErrNotExist + } return nil, err } - return resp.Body, nil + return resp, nil } func (b *S3Storage) Head(pth string) (*http.Response, error) { @@ -237,3 +258,10 @@ func (b *S3Storage) CanListFiles() bool { func (b *S3Storage) Close() error { return nil } + +func (b *S3Storage) s3HttpProxy() s3HttpProxy { + if b.s3Http != nil { + return b.s3Http + } + return &defaultS3HttpProxy{} +} diff --git a/support/storage/s3_test.go b/support/storage/s3_test.go index 4064af9539..d8ff80b0bd 100644 --- a/support/storage/s3_test.go +++ b/support/storage/s3_test.go @@ -6,8 +6,12 @@ package storage import ( "context" + "errors" + "io" + "os" "testing" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/stretchr/testify/assert" @@ -19,6 +23,19 @@ type MockS3 struct { s3iface.S3API } +type MockS3HttpProxy struct { + mock.Mock + s3HttpProxy +} + +func (m *MockS3HttpProxy) Send(input *s3.GetObjectInput) (io.ReadCloser, error) { + args := m.Called(input) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(io.ReadCloser), args.Error(1) +} + func TestWriteACLRuleOverride(t *testing.T) { mockS3 := &MockS3{} @@ -50,3 +67,25 @@ func TestWriteACLRuleDefault(t *testing.T) { aclRule := s3Storage.GetACLWriteRule() assert.Equal(t, aclRule, s3.ObjectCannedACLPublicRead) } + +func TestGetFileNotFound(t *testing.T) { + mockS3 := &MockS3{} + mockS3HttpProxy := &MockS3HttpProxy{} + + mockS3HttpProxy.On("Send", mock.Anything).Return(nil, + awserr.New(s3.ErrCodeNoSuchKey, "message", errors.New("not found"))) + + s3Storage := S3Storage{ + ctx: context.Background(), + svc: mockS3, + bucket: "bucket", + prefix: "prefix", + unsignedRequests: false, + writeACLrule: "", + s3Http: mockS3HttpProxy, + } + + _, err := s3Storage.GetFile("path") + + assert.Equal(t, err, os.ErrNotExist) +} From 5a0272bf356f2604c539bac30fd8e0685d5745e2 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Sat, 10 Sep 2022 13:31:59 -0700 Subject: [PATCH 2/3] #4317: enabled more time between latest ledger read attempts during single cmd watch iterations --- exp/lighthorizon/index/builder.go | 9 ++++----- support/storage/s3.go | 4 +++- support/storage/s3_test.go | 23 +++++++++++++++++++++++ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index b3ce5509f5..c221be598e 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -294,16 +294,16 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error { // // [1]: https://stellarfoundation.slack.com/archives/C02B04RMK/p1654903342555669 - // We sleep with linear backoff starting with 1s. Ledgers get posted + // We sleep with linear backoff starting with 3s. Ledgers get posted // every 5-7s on average, but to be extra careful, let's give it a full // minute before we give up entirely. timedCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - sleepTime := time.Second - + sleepTime := (3 * time.Second) outer: for { + time.Sleep(sleepTime) select { case <-timedCtx.Done(): return errors.Wrap(timedCtx.Err(), "awaiting next ledger failed") @@ -318,8 +318,7 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error { } if os.IsNotExist(buildErr) { - time.Sleep(sleepTime) - sleepTime += 2 + sleepTime += (time.Second * 2) continue } diff --git a/support/storage/s3.go b/support/storage/s3.go index b7ddb8029d..87790e7426 100644 --- a/support/storage/s3.go +++ b/support/storage/s3.go @@ -263,5 +263,7 @@ func (b *S3Storage) s3HttpProxy() s3HttpProxy { if b.s3Http != nil { return b.s3Http } - return &defaultS3HttpProxy{} + return &defaultS3HttpProxy{ + S3Storage: b, + } } diff --git a/support/storage/s3_test.go b/support/storage/s3_test.go index d8ff80b0bd..e1b5f7c1e0 100644 --- a/support/storage/s3_test.go +++ b/support/storage/s3_test.go @@ -9,6 +9,7 @@ import ( "errors" "io" "os" + "strings" "testing" "github.com/aws/aws-sdk-go/aws/awserr" @@ -89,3 +90,25 @@ func TestGetFileNotFound(t *testing.T) { assert.Equal(t, err, os.ErrNotExist) } + +func TestGetFileFound(t *testing.T) { + mockS3 := &MockS3{} + mockS3HttpProxy := &MockS3HttpProxy{} + testCloser := io.NopCloser(strings.NewReader("")) + + mockS3HttpProxy.On("Send", mock.Anything).Return(testCloser, nil) + + s3Storage := S3Storage{ + ctx: context.Background(), + svc: mockS3, + bucket: "bucket", + prefix: "prefix", + unsignedRequests: false, + writeACLrule: "", + s3Http: mockS3HttpProxy, + } + + closer, err := s3Storage.GetFile("path") + assert.Nil(t, err) + assert.Equal(t, closer, testCloser) +} From f47eb7e807bd4fc913f3020458767460fef318ea Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Sat, 10 Sep 2022 18:28:35 -0700 Subject: [PATCH 3/3] #4317: wait a miniumu of 6 seconds inbetween each attept to get tx meta data from s3 for 'latest' ledger --- exp/lighthorizon/index/builder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index c221be598e..71b29314ed 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -294,13 +294,13 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error { // // [1]: https://stellarfoundation.slack.com/archives/C02B04RMK/p1654903342555669 - // We sleep with linear backoff starting with 3s. Ledgers get posted + // We sleep with linear backoff starting with 6s. Ledgers get posted // every 5-7s on average, but to be extra careful, let's give it a full // minute before we give up entirely. timedCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - sleepTime := (3 * time.Second) + sleepTime := (6 * time.Second) outer: for { time.Sleep(sleepTime)