Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/index/cmd: Fix index single watch, slow down the retry on not-found ledgers #4582

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,16 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
//
// [1]: https://stellarfoundation.slack.com/archives/C02B04RMK/p1654903342555669

// We sleep with linear backoff starting with 1s. Ledgers get posted
// We sleep with linear backoff starting with 6s. Ledgers get posted
// every 5-7s on average, but to be extra careful, let's give it a full
// minute before we give up entirely.
timedCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

sleepTime := time.Second

sleepTime := (6 * time.Second)
outer:
for {
time.Sleep(sleepTime)
Copy link
Contributor Author

@sreuland sreuland Sep 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an up-front wait before first attempt at next ledger tx meta, noticed it was always getting not-found on first attempt of new iteration, watching logs with this in place, it is pretty consistent at showing not-found on first attempt and the second attempt usually resolves with getting the ledger's tx meta.

[edit] i increased the first wait time to 6s before attempting to get exported tx meta from s3 for 'latest' ledger, this produces a cleaner 'watch' loop that doesn't typically fall into not-found/back-off/retry at all:

time="2022-09-11T01:28:57.453Z" level=info msg="Awaiting next ledger (42615214)" pid=7
time="2022-09-11T01:29:07.769Z" level=info msg="Awaiting next ledger (42615215)" pid=7
time="2022-09-11T01:29:15.744Z" level=info msg="Awaiting next ledger (42615216)" pid=7
time="2022-09-11T01:29:25.881Z" level=info msg="Awaiting next ledger (42615217)" pid=7
....

select {
case <-timedCtx.Done():
return errors.Wrap(timedCtx.Err(), "awaiting next ledger failed")
Expand All @@ -318,8 +318,7 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
}

if os.IsNotExist(buildErr) {
time.Sleep(sleepTime)
sleepTime += 2
sleepTime += (time.Second * 2)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion support/storage/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func checkResp(r *http.Response) error {
if r.StatusCode >= 200 && r.StatusCode < 400 {
return nil
} else {
return fmt.Errorf("Bad HTTP response '%s' for %s '%s'",
return fmt.Errorf("bad HTTP response '%s' for %s '%s'",
r.Status, r.Request.Method, r.Request.URL.String())
}
}
Expand Down
48 changes: 39 additions & 9 deletions support/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,48 @@ import (
"context"
"io"
"net/http"
"os"
"path"

log "github.com/sirupsen/logrus"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stellar/go/support/errors"
)

type s3HttpProxy interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to enable test access, i refactored the s3 Send to be composed as app level interface, so tests could override it and simulate err's, the native s3 sdk was struct and couldn't be mocked.

Send(*s3.GetObjectInput) (io.ReadCloser, error)
}

type defaultS3HttpProxy struct {
*S3Storage
}

func (proxy *defaultS3HttpProxy) Send(params *s3.GetObjectInput) (io.ReadCloser, error) {
req, resp := proxy.svc.GetObjectRequest(params)
if proxy.unsignedRequests {
req.Handlers.Sign.Clear() // makes this request unsigned
}
req.SetContext(proxy.ctx)
logReq(req.HTTPRequest)
err := req.Send()
logResp(req.HTTPResponse)

return resp.Body, err
}

type S3Storage struct {
ctx context.Context
svc s3iface.S3API
bucket string
prefix string
unsignedRequests bool
writeACLrule string
s3Http s3HttpProxy
}

func NewS3Storage(
Expand Down Expand Up @@ -67,19 +91,16 @@ func (b *S3Storage) GetFile(pth string) (io.ReadCloser, error) {
Key: aws.String(key),
}

req, resp := b.svc.GetObjectRequest(params)
if b.unsignedRequests {
req.Handlers.Sign.Clear() // makes this request unsigned
}
req.SetContext(b.ctx)
logReq(req.HTTPRequest)
err := req.Send()
logResp(req.HTTPResponse)
resp, err := b.s3HttpProxy().Send(params)

if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, os.ErrNotExist
}
return nil, err
}

return resp.Body, nil
return resp, nil
}

func (b *S3Storage) Head(pth string) (*http.Response, error) {
Expand Down Expand Up @@ -237,3 +258,12 @@ func (b *S3Storage) CanListFiles() bool {
func (b *S3Storage) Close() error {
return nil
}

func (b *S3Storage) s3HttpProxy() s3HttpProxy {
if b.s3Http != nil {
return b.s3Http
}
return &defaultS3HttpProxy{
S3Storage: b,
}
}
62 changes: 62 additions & 0 deletions support/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package storage

import (
"context"
"errors"
"io"
"os"
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/stretchr/testify/assert"
Expand All @@ -19,6 +24,19 @@ type MockS3 struct {
s3iface.S3API
}

type MockS3HttpProxy struct {
mock.Mock
s3HttpProxy
}

func (m *MockS3HttpProxy) Send(input *s3.GetObjectInput) (io.ReadCloser, error) {
args := m.Called(input)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(io.ReadCloser), args.Error(1)
}

func TestWriteACLRuleOverride(t *testing.T) {

mockS3 := &MockS3{}
Expand Down Expand Up @@ -50,3 +68,47 @@ func TestWriteACLRuleDefault(t *testing.T) {
aclRule := s3Storage.GetACLWriteRule()
assert.Equal(t, aclRule, s3.ObjectCannedACLPublicRead)
}

func TestGetFileNotFound(t *testing.T) {
mockS3 := &MockS3{}
mockS3HttpProxy := &MockS3HttpProxy{}

mockS3HttpProxy.On("Send", mock.Anything).Return(nil,
awserr.New(s3.ErrCodeNoSuchKey, "message", errors.New("not found")))

s3Storage := S3Storage{
ctx: context.Background(),
svc: mockS3,
bucket: "bucket",
prefix: "prefix",
unsignedRequests: false,
writeACLrule: "",
s3Http: mockS3HttpProxy,
}

_, err := s3Storage.GetFile("path")

assert.Equal(t, err, os.ErrNotExist)
}

func TestGetFileFound(t *testing.T) {
mockS3 := &MockS3{}
mockS3HttpProxy := &MockS3HttpProxy{}
testCloser := io.NopCloser(strings.NewReader(""))

mockS3HttpProxy.On("Send", mock.Anything).Return(testCloser, nil)

s3Storage := S3Storage{
ctx: context.Background(),
svc: mockS3,
bucket: "bucket",
prefix: "prefix",
unsignedRequests: false,
writeACLrule: "",
s3Http: mockS3HttpProxy,
}

closer, err := s3Storage.GetFile("path")
assert.Nil(t, err)
assert.Equal(t, closer, testCloser)
}