Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RemoteFilePersister #1202

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions storage/file_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package storage

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
)

// LocalFilePersister will persist files to the local disk.
Expand Down Expand Up @@ -43,3 +50,144 @@ func (l *LocalFilePersister) Persist(path string, data io.Reader) (err error) {

return nil
}

// RemoteFilePersister is to be used when files created by the browser module need
// to be uploaded to a remote location. This uses a preSignedURLGetterURL to
// retrieve one pre-signed URL. The pre-signed url is used to upload the file
// to the remote location.
type RemoteFilePersister struct {
preSignedURLGetterURL string
headers map[string]string
basePath string

httpClient *http.Client
}

// NewRemoteFilePersister creates a new instance of RemoteFilePersister.
func NewRemoteFilePersister(
preSignedURLGetterURL string,
headers map[string]string,
basePath string,
) *RemoteFilePersister {
return &RemoteFilePersister{
preSignedURLGetterURL: preSignedURLGetterURL,
headers: headers,
basePath: basePath,
httpClient: &http.Client{
Timeout: time.Second * 10,
},
}
}

// Persist will upload the contents of data to a remote location.
func (r *RemoteFilePersister) Persist(ctx context.Context, path string, data io.Reader) (err error) {
pURL, err := r.getPreSignedURL(ctx, path)
if err != nil {
return fmt.Errorf("getting presigned url: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPut, pURL, data)
if err != nil {
return fmt.Errorf("creating upload request: %w", err)
}

resp, err := r.httpClient.Do(req)
if err != nil {
return fmt.Errorf("performing upload request: %w", err)
}
defer resp.Body.Close() //nolint:errcheck

if _, err := io.Copy(io.Discard, resp.Body); err != nil {
return fmt.Errorf("draining upload response body: %w", err)
}

if err := checkStatusCode(resp); err != nil {
return fmt.Errorf("uploading: %w", err)
}

return nil
}

func checkStatusCode(resp *http.Response) error {
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
return fmt.Errorf("server returned %d (%s)", resp.StatusCode, strings.ToLower(http.StatusText(resp.StatusCode)))
}

return nil
}

// getPreSignedURL will retrieve the presigned url for the current file.
func (r *RemoteFilePersister) getPreSignedURL(ctx context.Context, path string) (string, error) {
b, err := buildPresignedRequestBody(r.basePath, path)
if err != nil {
return "", fmt.Errorf("building request body: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.preSignedURLGetterURL, bytes.NewReader(b))
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
}

for k, v := range r.headers {
req.Header.Add(k, v)
}

resp, err := r.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("performing request: %w", err)
}
defer resp.Body.Close() //nolint:errcheck

if err := checkStatusCode(resp); err != nil {
return "", err
}

return readResponseBody(resp)
}

func buildPresignedRequestBody(basePath, path string) ([]byte, error) {
b := struct {
Service string `json:"service"`
Files []struct {
Name string `json:"name"`
} `json:"files"`
}{
Service: "aws_s3",
Files: []struct {
Name string `json:"name"`
}{
{
Name: filepath.Join(basePath, path),
},
},
inancgumus marked this conversation as resolved.
Show resolved Hide resolved
}

bb, err := json.Marshal(b)
if err != nil {
return nil, fmt.Errorf("marshaling request body: %w", err)
}

return bb, nil
}

func readResponseBody(resp *http.Response) (string, error) {
rb := struct {
Service string `json:"service"`
URLs []struct {
Name string `json:"name"`
PreSignedURL string `json:"pre_signed_url"` //nolint:tagliatelle
} `json:"urls"`
}{}

decoder := json.NewDecoder(resp.Body)
err := decoder.Decode(&rb)
if err != nil {
return "", fmt.Errorf("decoding response body: %w", err)
}

if len(rb.URLs) == 0 {
return "", errors.New("missing presigned url in response body")
}

return rb.URLs[0].PreSignedURL, nil
}
169 changes: 169 additions & 0 deletions storage/file_persister_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package storage

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -74,3 +79,167 @@ func TestLocalFilePersister(t *testing.T) {
})
}
}

func TestRemoteFilePersister(t *testing.T) {
t.Parallel()

basePath := "screenshots"
presignedEndpoint := "/presigned"
uploadEndpoint := "/upload"

tests := []struct {
name string
path string
dataToUpload string
wantPresignedURLBody string
wantPresignedHeaders map[string]string
uploadResponse int
getPresignedURLResponse int
wantError string
}{
{
name: "upload_file",
path: "some/path/file.png",
dataToUpload: "here's some data",
wantPresignedURLBody: `{
"service":"aws_s3",
"files":[{"name":"screenshots/some/path/file.png"}]
}`,
wantPresignedHeaders: map[string]string{
"Authorization": "token asd123",
"Run_id": "123456",
},
uploadResponse: http.StatusOK,
getPresignedURLResponse: http.StatusOK,
},
{
name: "get_presigned_rate_limited",
path: "some/path/file.png",
dataToUpload: "here's some data",
wantPresignedURLBody: `{
"service":"aws_s3",
"files":[{"name":"screenshots/some/path/file.png"}]
}`,
wantPresignedHeaders: map[string]string{
"Authorization": "token asd123",
"Run_id": "123456",
},
getPresignedURLResponse: http.StatusTooManyRequests,
wantError: "getting presigned url: server returned 429 (too many requests)",
},
{
name: "get_presigned_fails",
path: "some/path/file.png",
dataToUpload: "here's some data",
wantPresignedURLBody: `{
"service":"aws_s3",
"files":[{"name":"screenshots/some/path/file.png"}]
}`,
wantPresignedHeaders: map[string]string{
"Authorization": "token asd123",
"Run_id": "123456",
},
getPresignedURLResponse: http.StatusInternalServerError,
wantError: "getting presigned url: server returned 500 (internal server error)",
},
{
name: "upload_rate_limited",
path: "some/path/file.png",
dataToUpload: "here's some data",
wantPresignedURLBody: `{
"service":"aws_s3",
"files":[{"name":"screenshots/some/path/file.png"}]
}`,
wantPresignedHeaders: map[string]string{
"Authorization": "token asd123",
"Run_id": "123456",
},
uploadResponse: http.StatusTooManyRequests,
getPresignedURLResponse: http.StatusOK,
wantError: "uploading: server returned 429 (too many requests)",
},
{
name: "upload_fails",
path: "some/path/file.png",
dataToUpload: "here's some data",
wantPresignedURLBody: `{
"service":"aws_s3",
"files":[{"name":"screenshots/some/path/file.png"}]
}`,
wantPresignedHeaders: map[string]string{
"Authorization": "token asd123",
"Run_id": "123456",
},
uploadResponse: http.StatusInternalServerError,
getPresignedURLResponse: http.StatusOK,
wantError: "uploading: server returned 500 (internal server error)",
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

mux := http.NewServeMux()
s := httptest.NewServer(mux)
defer s.Close()

// This handles the request to retrieve a presigned url.
mux.HandleFunc(presignedEndpoint, http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() //nolint:errcheck

bb, err := io.ReadAll(r.Body)
require.NoError(t, err)

// Ensures that the body of the request matches the
// expected format.
assert.JSONEq(t, tt.wantPresignedURLBody, string(bb))
inancgumus marked this conversation as resolved.
Show resolved Hide resolved

// Ensures that the headers are sent to the server from
// the browser module.
for k, v := range tt.wantPresignedHeaders {
assert.Equal(t, v, r.Header[k][0])
}

w.WriteHeader(tt.getPresignedURLResponse)
_, err = fmt.Fprintf(w, `{
"service": "aws_s3",
"urls": [{
"name": "%s",
"pre_signed_url": "%s"
}]
}`, basePath, s.URL+uploadEndpoint)

require.NoError(t, err)
},
))

// This handles the upload of the files with the presigned url that
// is retrieved from the handler above.
mux.HandleFunc(uploadEndpoint, http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() //nolint:errcheck

bb, err := io.ReadAll(r.Body)
require.NoError(t, err)

// Ensures that the data is uploaded to the server and matches
// what was sent.
assert.Equal(t, tt.dataToUpload, string(bb))

w.WriteHeader(tt.uploadResponse)
}))

r := NewRemoteFilePersister(s.URL+presignedEndpoint, tt.wantPresignedHeaders, basePath)
err := r.Persist(context.Background(), tt.path, strings.NewReader(tt.dataToUpload))
if tt.wantError != "" {
assert.EqualError(t, err, tt.wantError)
return
}

assert.NoError(t, err)
})
}
}
Loading