Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Upload Feature #1902

Merged
merged 54 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fcb3d09
file upload WIP
pzl Sep 22, 2022
33e9821
functional chunk uploads, basic functionality
pzl Sep 26, 2022
8da226e
cleanups
pzl Sep 27, 2022
f0cd448
use a non-retry ES client for bulk uploads
pzl Sep 27, 2022
ebcee30
index name temporary cleanup
pzl Sep 29, 2022
f9b5557
cbor: boolean byte was flipped
pzl Oct 4, 2022
79c959c
use throttle to cap upload concurrency, add tests
pzl Oct 4, 2022
76dacbf
start using real(er) document IDs and index names
pzl Oct 4, 2022
2cafe7f
update tests
pzl Oct 4, 2022
fc9e79c
max file size
pzl Oct 4, 2022
59ee0c2
write agent,action IDs to file meta doc
pzl Oct 5, 2022
3b0442d
file upload verification, incl hashes
pzl Oct 12, 2022
2fea5df
WIP add contents schema for inner-zip info
pzl Oct 18, 2022
3e64ef7
refactor out of dl, support arbitrary req payloads
pzl Nov 7, 2022
8bc0a5c
add upload authorization
pzl Nov 8, 2022
7a592e3
strip unused fields
pzl Nov 8, 2022
1249622
reset license header
pzl Nov 8, 2022
d7ef820
small fixes
pzl Nov 8, 2022
f61b67b
restore mime to checked fields
pzl Nov 8, 2022
30d80c3
update file index pattern names
pzl Nov 9, 2022
cd6bdb6
wrap initial upload op write in mutex lock
pzl Nov 9, 2022
af09cc5
return all chunk IDs for counting
pzl Nov 10, 2022
cd5870c
fix capitalization
pzl Nov 10, 2022
b3823b4
no need to close request body
pzl Nov 10, 2022
ec0ee5d
WIP
pzl Dec 19, 2022
5dc0b7b
cleanup upload writing
pzl Dec 19, 2022
26319ec
refactor, parse JSON once
pzl Dec 20, 2022
08c211a
check transit hash, request chunk fields manually
pzl Dec 20, 2022
825f487
Merge remote-tracking branch 'upstream/main' into file-upload
pzl Dec 20, 2022
50cd8ff
es client 8.5.0 update and cleanups
pzl Dec 20, 2022
aff2c6d
cleanups and some tests
pzl Dec 21, 2022
367fa00
cleanups, etc
pzl Dec 22, 2022
4b62be3
remove vestigial functions
pzl Dec 22, 2022
65b33b1
reuse trimmed hash
pzl Jan 9, 2023
b0eee36
clarify some offsets in chunk writing
pzl Jan 9, 2023
2709024
cleanup some error objects
pzl Jan 9, 2023
3004ac5
cleanup temp auth flag
pzl Jan 17, 2023
1c18186
turn down some linters for intentional decisions
pzl Jan 17, 2023
b374287
restore some of the upload tests
pzl Jan 17, 2023
f7dc8a2
ensure Info fields are populated
pzl Jan 17, 2023
6cc2b71
comment spacing
pzl Jan 17, 2023
046b310
namespace the exportable structs to avoid circular imports coming
pzl Jan 17, 2023
610d19a
use internal cache for upload infos
pzl Jan 17, 2023
882df20
fix lint
pzl Jan 17, 2023
8e38d80
fixup tests
pzl Jan 18, 2023
dcf436f
move chunk def into new package for exported defs
pzl Jan 18, 2023
e2ee184
small cleanups
pzl Jan 18, 2023
ea72b10
comment cleanups
pzl Jan 18, 2023
bf44629
fixup uploader tests
pzl Jan 19, 2023
db44cd1
add line to changelog
pzl Jan 19, 2023
f47e698
Merge branch 'main' into file-upload
pzl Jan 19, 2023
144756d
comment conventions
pzl Jan 19, 2023
b877927
a WHOLE LOT of handler tests
pzl Jan 20, 2023
ce4d2d2
Merge branch 'main' into file-upload
pzl Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
390 changes: 390 additions & 0 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,390 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package api

import (
"context"
"crypto/md5"
"crypto/sha256"
pzl marked this conversation as resolved.
Show resolved Hide resolved
"encoding/json"
"errors"
"fmt"
"hash"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/upload"
"github.com/elastic/fleet-server/v7/internal/pkg/upload/cbor"
"github.com/elastic/go-elasticsearch/v7"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

// the only valid values of upload status according to storage spec
type UploadStatus string

const (
UploadAwaiting UploadStatus = "AWAITING_UPLOAD"
UploadProgress UploadStatus = "UPLOADING"
UploadDone UploadStatus = "READY"
UploadFail UploadStatus = "UPLOAD_ERROR"
UploadDel UploadStatus = "DELETED"
)

const (
// TODO: move to a config
maxParallelUploadOperations = 3
maxParallelChunks = 4
maxFileSize = 104857600 // 100 MiB

)
pzl marked this conversation as resolved.
Show resolved Hide resolved

func (rt Router) handleUploadStart(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(ECSHTTPRequestID, reqID).
Logger()

// authentication occurs inside here
// to check that key agent ID matches the ID in the body payload yet-to-be unmarshalled
err := rt.ut.handleUploadStart(&zlog, w, r)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) || errors.Is(err, upload.ErrMaxConcurrentUploads) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail upload initiation")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

func (rt Router) handleUploadChunk(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")
chunkID := ps.ByName("num")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

// simpler authentication check, since chunk checksum must
// ultimately match the initial hash provided with the stricter key check
if _, err := authAPIKey(r, rt.bulker, rt.ut.cache); err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)
if err := resp.Write(w); err != nil {
pzl marked this conversation as resolved.
Show resolved Hide resolved
zlog.Error().Err(err).Msg("failed writing error response")
}
pzl marked this conversation as resolved.
Show resolved Hide resolved
return
}

chunkNum, err := strconv.Atoi(chunkID)
if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)
if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
return
}
err = rt.ut.handleUploadChunk(&zlog, w, r, id, chunkNum)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail upload chunk")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

func (rt Router) handleUploadComplete(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

// simpler authentication check, file integrity checksum
// will catch directed tampering, this route just says "im done"
if _, err := authAPIKey(r, rt.bulker, rt.ut.cache); err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)
if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("failed writing error response")
}
return
}

err := rt.ut.handleUploadComplete(&zlog, w, r, id)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail upload completion")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

type UploadT struct {
bulker bulk.Bulk
chunkClient *elasticsearch.Client
cache cache.Cache
upl *upload.Uploader
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
log.Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Int("maxParallelOps", maxParallelUploadOperations).
Int("maxParallelChunks", maxParallelChunks).
Msg("Artifact install limits")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consistency: log messages start with lower case in fleet server I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was just copy-pasted from elsewhere and I didn't change the message. Since this is definitely not Artifacts. I guess you've got a capital over there in handleArtifacts.go


return &UploadT{
chunkClient: chunkClient,
bulker: bulker,
cache: cache,
upl: upload.New(maxFileSize, maxParallelChunks, maxParallelChunks),
}
}

func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) error {

// store raw body since we will json-decode twice
// 2MB is a reasonable json payload size. Any more might be an indication of garbage
pzl marked this conversation as resolved.
Show resolved Hide resolved
body, err := ioutil.ReadAll(io.LimitReader(r.Body, 2*1024*1024))
r.Body.Close()
pzl marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error reading request: %w", err)
}

// decode once here to access known fields we need to parse and work with
var fi FileInfo
if err := json.Unmarshal(body, &fi); err != nil {
if errors.Is(err, io.EOF) {
return fmt.Errorf("file info body is required: %w", err)
}
return err
}

// check API key matches payload agent ID
if _, err := authAgent(r, &fi.AgentID, ut.bulker, ut.cache); err != nil {
return err
}

if err := validateUploadPayload(fi); err != nil {
return err
}

docID := fmt.Sprintf("%s.%s", fi.ActionID, fi.AgentID)

var hasher hash.Hash
var sum string
switch {
case fi.File.Hash.SHA256 != "":
hasher = sha256.New()
sum = fi.File.Hash.SHA256
case fi.File.Hash.MD5 != "":
hasher = md5.New()
sum = fi.File.Hash.MD5
}

op, err := ut.upl.Begin(fi.File.Size, docID, fi.Source, sum, hasher)
if err != nil {
return err
}

// second decode here to maintain the arbitrary shape and fields we will just pass through
var reqDoc map[string]interface{}
if err := json.Unmarshal(body, &reqDoc); err != nil {
return fmt.Errorf("error parsing request json: %w", err)
}

doc, err := uploadRequestToFileDoc(reqDoc, op.ChunkSize)
if err != nil {
return fmt.Errorf("unable to convert request to file metadata doc: %w", err)
}
ret, err := upload.CreateFileDoc(r.Context(), ut.bulker, doc, fi.Source, docID)
if err != nil {
return err
}

zlog.Info().Str("return", ret).Msg("wrote doc")

out, err := json.Marshal(map[string]interface{}{
"upload_id": op.ID,
"chunk_size": op.ChunkSize,
})
if err != nil {
return err
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(out)
if err != nil {
return err
}
return nil
}

func (ut *UploadT) handleUploadChunk(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string, chunkID int) error {
chunkInfo, err := ut.upl.Chunk(uplID, chunkID)
if err != nil {
return err
}
defer chunkInfo.Token.Release()
if chunkInfo.FirstReceived {
if err := updateUploadStatus(r.Context(), ut.bulker, chunkInfo.Upload, UploadProgress); err != nil {
zlog.Warn().Err(err).Str("upload", uplID).Msg("unable to update upload status")
}
}

// prevent over-sized chunks
data := http.MaxBytesReader(w, r.Body, upload.MaxChunkSize)
ce := cbor.NewChunkWriter(data, chunkInfo.Final, chunkInfo.Upload.DocID, chunkInfo.Upload.ChunkSize)
if err := upload.IndexChunk(r.Context(), ut.chunkClient, ce, chunkInfo.Upload.Source, chunkInfo.Upload.DocID, chunkInfo.ID); err != nil {
return err
}
return nil
pzl marked this conversation as resolved.
Show resolved Hide resolved
}

func (ut *UploadT) handleUploadComplete(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string) error {
info, err := ut.upl.Complete(uplID, ut.bulker)
if err != nil {
return err
}

if err := updateUploadStatus(r.Context(), ut.bulker, info, UploadDone); err != nil {
// should be 500 error probably?
pzl marked this conversation as resolved.
Show resolved Hide resolved
zlog.Warn().Err(err).Str("upload", uplID).Msg("unable to set upload status to complete")
return err

}

_, err = w.Write([]byte(`{"status":"ok"}`))
if err != nil {
return err
}
return nil
}

// takes the arbitrary input document from an upload request and injects
// a few known fields as it passes through
func uploadRequestToFileDoc(req map[string]interface{}, chunkSize int64) ([]byte, error) {
fileObj, ok := req["file"].(map[string]interface{})
if !ok {
return nil, errors.New("invalid upload request. File is not an object")
}

fileObj["ChunkSize"] = chunkSize
fileObj["Status"] = string(UploadAwaiting)

return json.Marshal(req)
}

func updateUploadStatus(ctx context.Context, bulker bulk.Bulk, info upload.Info, status UploadStatus) error {
data, err := json.Marshal(map[string]interface{}{
"doc": map[string]interface{}{
"file": map[string]string{
"Status": string(status),
},
},
})
if err != nil {
return err
}
return upload.UpdateFileDoc(ctx, bulker, info.Source, info.DocID, data)
}

func validateUploadPayload(fi FileInfo) error {

required := []struct {
Field string
Msg string
}{
{fi.File.Name, "file name"},
{fi.File.Mime, "mime_type"},
{fi.ActionID, "action_id"},
{fi.AgentID, "agent_id"},
{fi.Source, "src"},
}

for _, req := range required {
if strings.TrimSpace(req.Field) == "" {
return fmt.Errorf("%s is required", req.Msg)
}
}

//@todo: valid action?
//@todo: valid agent?
//@todo: valid src? will that make future expansion harder and require FS updates? maybe just validate the index exists

if fi.File.Size <= 0 {
return errors.New("invalid file size, size is required")
}
return nil
}
8 changes: 8 additions & 0 deletions internal/pkg/api/handleUpload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !integration
// +build !integration

package api
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests incoming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are unit tests planned for this pr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will add tests for the handlers. The handlers do very little aside from calling their respective operation in uploader module, so tests were added there first.

Loading