Skip to content

Commit

Permalink
[WIP][skip ci] Support multipart uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Sep 25, 2024
1 parent b43d12e commit 2a3de4b
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 261 deletions.
2 changes: 1 addition & 1 deletion agent/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 55 additions & 30 deletions api/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,47 @@ type Artifact struct {
}

type ArtifactBatch struct {
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
ID string `json:"id"`
Artifacts []*Artifact `json:"artifacts"`
UploadDestination string `json:"upload_destination"`
MultipartSupported bool `json:"multipart_supported,omitempty"`
}

// ArtifactUploadInstructions describes how to upload an artifact to Buildkite
// artifact storage.
type ArtifactUploadInstructions struct {
Data map[string]string `json:"data"`
// Used for a single-part upload.
Action ArtifactUploadAction `json:"action"`

// Used for a multi-part upload.
Actions []ArtifactUploadAction `json:"actions"`

// Contains other data necessary for interpreting instructions.
Data map[string]string `json:"data"`
}

// ArtifactUploadAction describes one action needed to upload an artifact or
// part of an artifact to Buildkite artifact storage.
type ArtifactUploadAction struct {
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
URL string `json:"url,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
FileInput string `json:"file_input"`
PartNumber int `json:"part_number,omitempty"`
}

type ArtifactBatchCreateResponse struct {
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`
UploadInstructions *ArtifactUploadInstructions `json:"upload_instructions"`
ID string `json:"id"`
ArtifactIDs []string `json:"artifact_ids"`

// These instructions apply to all artifacts. The template contains
// variable interpolations such as ${artifact:path}.
InstructionsTemplate *ArtifactUploadInstructions `json:"upload_instructions"`

// These instructions apply to specific artifacts, necessary for multipart
// uploads. It overrides InstructionTemplate and should not contain
// interpolations. Map: artifact ID -> instructions for that artifact.
PerArtifactInstructions map[string]*ArtifactUploadInstructions `json:"per_artifact_instructions"`
}

// ArtifactSearchOptions specifies the optional parameters to the
Expand All @@ -84,18 +104,30 @@ type ArtifactSearchOptions struct {
IncludeDuplicates bool `url:"include_duplicates,omitempty"`
}

type ArtifactBatchUpdateArtifact struct {
// ArtifactState represents the state of a single artifact, when calling UpdateArtifacts.
type ArtifactState struct {
ID string `json:"id"`
State string `json:"state"`
// If this artifact was a multipart upload and is complete, we need the
// the ETag from each uploaded part so that they can be joined together.
// The upload ID is also required.
MultipartETags []ArtifactPartETag `json:"multipart_etags,omitempty"`
MultipartUploadID string `json:"multipart_upload_id,omitempty"`
}

// ArtifactPartETag associates an ETag to a part number for a multipart upload.
type ArtifactPartETag struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}

type ArtifactBatchUpdateRequest struct {
Artifacts []*ArtifactBatchUpdateArtifact `json:"artifacts"`
Artifacts []ArtifactState `json:"artifacts"`
}

// CreateArtifacts takes a slice of artifacts, and creates them on Buildkite as a batch.
func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
func (c *Client) CreateArtifacts(ctx context.Context, jobID string, batch *ArtifactBatch) (*ArtifactBatchCreateResponse, *Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))

req, err := c.newRequest(ctx, "POST", u, batch)
if err != nil {
Expand All @@ -111,31 +143,24 @@ func (c *Client) CreateArtifacts(ctx context.Context, jobId string, batch *Artif
return createResponse, resp, err
}

// Updates a particular artifact
func (c *Client) UpdateArtifacts(ctx context.Context, jobId string, artifactStates map[string]string) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobId))
payload := ArtifactBatchUpdateRequest{}

for id, state := range artifactStates {
payload.Artifacts = append(payload.Artifacts, &ArtifactBatchUpdateArtifact{id, state})
// UpdateArtifacts updates Buildkite with one or more artifact states.
func (c *Client) UpdateArtifacts(ctx context.Context, jobID string, artifactStates []ArtifactState) (*Response, error) {
u := fmt.Sprintf("jobs/%s/artifacts", railsPathEscape(jobID))
payload := ArtifactBatchUpdateRequest{
Artifacts: artifactStates,
}

req, err := c.newRequest(ctx, "PUT", u, payload)
if err != nil {
return nil, err
}

resp, err := c.doRequest(req, nil)
if err != nil {
return resp, err
}

return resp, err
return c.doRequest(req, nil)
}

// SearchArtifacts searches Buildkite for a set of artifacts
func (c *Client) SearchArtifacts(ctx context.Context, buildId string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildId))
func (c *Client) SearchArtifacts(ctx context.Context, buildID string, opt *ArtifactSearchOptions) ([]*Artifact, *Response, error) {
u := fmt.Sprintf("builds/%s/artifacts/search", railsPathEscape(buildID))
u, err := addOptions(u, opt)
if err != nil {
return nil, nil, err
Expand Down
24 changes: 17 additions & 7 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"net/http/httputil"
"net/textproto"
"net/url"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -268,12 +270,12 @@ func (c *Client) doRequest(req *http.Request, v any) (*Response, error) {
// file contents into the debug log (especially if it's been
// gzipped)
var requestDump []byte
if strings.Contains(req.Header.Get("Content-Type"), "multipart/form-data") {
requestDump, err = httputil.DumpRequestOut(req, false)
} else {
requestDump, err = httputil.DumpRequestOut(req, true)
}
contentType := req.Header.Get("Content-Type")
includeBody := strings.Contains(contentType, "multipart/form-data") ||
contentType == "application/octet-stream" ||
req.Header.Get("Content-Encoding") != ""

requestDump, err := httputil.DumpRequestOut(req, includeBody)
if err != nil {
c.logger.Debug("ERR: %s\n%s", err, string(requestDump))
} else {
Expand Down Expand Up @@ -316,9 +318,17 @@ func (c *Client) doRequest(req *http.Request, v any) (*Response, error) {
responseDump, err := httputil.DumpResponse(resp, true)
if err != nil {
c.logger.Debug("\nERR: %s\n%s", err, string(responseDump))
} else {
c.logger.Debug("\n%s", string(responseDump))
}
// else {
// c.logger.Debug("\n%s", string(responseDump))
// }
fname := fmt.Sprintf("dumpresp-%d.txt", time.Now().UnixNano())
fpath := filepath.Join(os.TempDir(), fname)
if err := os.WriteFile(fpath, responseDump, os.FileMode(0o666)); err != nil {
c.logger.Error("Coudn't write response dump to file: %v", err)
return response, err
}
c.logger.Debug("Wrote response dump to %q", fpath)
}

if c.conf.TraceHTTP {
Expand Down
8 changes: 8 additions & 0 deletions clicommand/artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ArtifactUploadConfig struct {
// Uploader flags
GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"`
UploadSkipSymlinks bool `cli:"upload-skip-symlinks"`
NoMultipartUpload bool `cli:"no-multipart-upload"`

// deprecated
FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"`
Expand Down Expand Up @@ -120,6 +121,11 @@ var ArtifactUploadCommand = cli.Command{
Usage: "After the glob has been resolved to a list of files to upload, skip uploading those that are symlinks to files",
EnvVar: "BUILDKITE_ARTIFACT_UPLOAD_SKIP_SYMLINKS",
},
cli.BoolFlag{
Name: "no-multipart-upload",
Usage: "For Buildkite-hosted artifacts, disables the use of multipart uploads. Has no effect on uploads to other destinations such as custom cloud buckets",
EnvVar: "BUILDKITE_ARTIFACT_NO_MULTIPART_UPLOAD",
},
cli.BoolFlag{ // Deprecated
Name: "follow-symlinks",
Usage: "Follow symbolic links while resolving globs. Note this argument is deprecated. Use `--glob-resolve-follow-symlinks` instead",
Expand Down Expand Up @@ -155,6 +161,8 @@ var ArtifactUploadCommand = cli.Command{
ContentType: cfg.ContentType,
DebugHTTP: cfg.DebugHTTP,

AllowMultipart: !cfg.NoMultipartUpload,

// If the deprecated flag was set to true, pretend its replacement was set to true too
// this works as long as the user only sets one of the two flags
GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks),
Expand Down
2 changes: 1 addition & 1 deletion internal/artifact/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (
type APIClient interface {
CreateArtifacts(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error)
SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error)
UpdateArtifacts(context.Context, string, map[string]string) (*api.Response, error)
UpdateArtifacts(context.Context, string, []api.ArtifactState) (*api.Response, error)
}
16 changes: 8 additions & 8 deletions internal/artifact/artifactory_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ func (u *artifactoryUploaderWork) Description() string {
return singleUnitDescription(u.artifact)
}

func (u *artifactoryUploaderWork) DoWork(context.Context) error {
func (u *artifactoryUploaderWork) DoWork(context.Context) (*api.ArtifactPartETag, error) {
// Open file from filesystem
u.logger.Debug("Reading file %q", u.artifact.AbsolutePath)
f, err := os.Open(u.artifact.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
}

// Upload the file to Artifactory.
Expand All @@ -131,32 +131,32 @@ func (u *artifactoryUploaderWork) DoWork(context.Context) error {
req, err := http.NewRequest("PUT", u.URL(u.artifact), f)
req.SetBasicAuth(u.user, u.password)
if err != nil {
return err
return nil, err
}

md5Checksum, err := checksumFile(md5.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-MD5", md5Checksum)

sha1Checksum, err := checksumFile(sha1.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA1", sha1Checksum)

sha256Checksum, err := checksumFile(sha256.New(), u.artifact.AbsolutePath)
if err != nil {
return err
return nil, err
}
req.Header.Add("X-Checksum-SHA256", sha256Checksum)

res, err := u.client.Do(req)
if err != nil {
return err
return nil, err
}
return checkResponse(res)
return nil, checkResponse(res)
}

func checksumFile(hasher hash.Hash, path string) (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions internal/artifact/azure_blob_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func (u *azureBlobUploaderWork) Description() string {
}

// DoWork uploads an artifact file.
func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error {
func (u *azureBlobUploaderWork) DoWork(ctx context.Context) (*api.ArtifactPartETag, error) {
u.logger.Debug("Reading file %q", u.artifact.AbsolutePath)
f, err := os.Open(u.artifact.AbsolutePath)
if err != nil {
return fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
return nil, fmt.Errorf("failed to open file %q (%w)", u.artifact.AbsolutePath, err)
}
defer f.Close()

Expand All @@ -121,5 +121,5 @@ func (u *azureBlobUploaderWork) DoWork(ctx context.Context) error {

bbc := u.client.NewContainerClient(u.loc.ContainerName).NewBlockBlobClient(blobName)
_, err = bbc.UploadFile(ctx, f, nil)
return err
return nil, err
}
5 changes: 4 additions & 1 deletion internal/artifact/batch_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type BatchCreatorConfig struct {
// CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API.
// If it's zero, there's no context timeout and the default HTTP timeout will prevail.
CreateArtifactsTimeout time.Duration

// Whether to allow multipart uploads to the BK-hosted bucket.
AllowMultipart bool
}

type BatchCreator struct {
Expand Down Expand Up @@ -63,7 +66,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
ID: api.NewUUID(),
Artifacts: theseArtifacts,
UploadDestination: a.conf.UploadDestination,
MultipartSupported: true,
MultipartSupported: a.conf.AllowMultipart,
}

a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length)
Expand Down
Loading

0 comments on commit 2a3de4b

Please sign in to comment.