Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Upload Feature #1902

Merged
merged 54 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fcb3d09
file upload WIP
pzl Sep 22, 2022
33e9821
functional chunk uploads, basic functionality
pzl Sep 26, 2022
8da226e
cleanups
pzl Sep 27, 2022
f0cd448
use a non-retry ES client for bulk uploads
pzl Sep 27, 2022
ebcee30
index name temporary cleanup
pzl Sep 29, 2022
f9b5557
cbor: boolean byte was flipped
pzl Oct 4, 2022
79c959c
use throttle to cap upload concurrency, add tests
pzl Oct 4, 2022
76dacbf
start using real(er) document IDs and index names
pzl Oct 4, 2022
2cafe7f
update tests
pzl Oct 4, 2022
fc9e79c
max file size
pzl Oct 4, 2022
59ee0c2
write agent,action IDs to file meta doc
pzl Oct 5, 2022
3b0442d
file upload verification, incl hashes
pzl Oct 12, 2022
2fea5df
WIP add contents schema for inner-zip info
pzl Oct 18, 2022
3e64ef7
refactor out of dl, support arbitrary req payloads
pzl Nov 7, 2022
8bc0a5c
add upload authorization
pzl Nov 8, 2022
7a592e3
strip unused fields
pzl Nov 8, 2022
1249622
reset license header
pzl Nov 8, 2022
d7ef820
small fixes
pzl Nov 8, 2022
f61b67b
restore mime to checked fields
pzl Nov 8, 2022
30d80c3
update file index pattern names
pzl Nov 9, 2022
cd6bdb6
wrap initial upload op write in mutex lock
pzl Nov 9, 2022
af09cc5
return all chunk IDs for counting
pzl Nov 10, 2022
cd5870c
fix capitalization
pzl Nov 10, 2022
b3823b4
no need to close request body
pzl Nov 10, 2022
ec0ee5d
WIP
pzl Dec 19, 2022
5dc0b7b
cleanup upload writing
pzl Dec 19, 2022
26319ec
refactor, parse JSON once
pzl Dec 20, 2022
08c211a
check transit hash, request chunk fields manually
pzl Dec 20, 2022
825f487
Merge remote-tracking branch 'upstream/main' into file-upload
pzl Dec 20, 2022
50cd8ff
es client 8.5.0 update and cleanups
pzl Dec 20, 2022
aff2c6d
cleanups and some tests
pzl Dec 21, 2022
367fa00
cleanups, etc
pzl Dec 22, 2022
4b62be3
remove vestigial functions
pzl Dec 22, 2022
65b33b1
reuse trimmed hash
pzl Jan 9, 2023
b0eee36
clarify some offsets in chunk writing
pzl Jan 9, 2023
2709024
cleanup some error objects
pzl Jan 9, 2023
3004ac5
cleanup temp auth flag
pzl Jan 17, 2023
1c18186
turn down some linters for intentional decisions
pzl Jan 17, 2023
b374287
restore some of the upload tests
pzl Jan 17, 2023
f7dc8a2
ensure Info fields are populated
pzl Jan 17, 2023
6cc2b71
comment spacing
pzl Jan 17, 2023
046b310
namespace the exportable structs to avoid circular imports coming
pzl Jan 17, 2023
610d19a
use internal cache for upload infos
pzl Jan 17, 2023
882df20
fix lint
pzl Jan 17, 2023
8e38d80
fixup tests
pzl Jan 18, 2023
dcf436f
move chunk def into new package for exported defs
pzl Jan 18, 2023
e2ee184
small cleanups
pzl Jan 18, 2023
ea72b10
comment cleanups
pzl Jan 18, 2023
bf44629
fixup uploader tests
pzl Jan 19, 2023
db44cd1
add line to changelog
pzl Jan 19, 2023
f47e698
Merge branch 'main' into file-upload
pzl Jan 19, 2023
144756d
comment conventions
pzl Jan 19, 2023
b877927
a WHOLE LOT of handler tests
pzl Jan 20, 2023
ce4d2d2
Merge branch 'main' into file-upload
pzl Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package api

import (
"context"
"crypto/sha256"
pzl marked this conversation as resolved.
Show resolved Hide resolved
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/uploader"
"github.com/elastic/fleet-server/v7/internal/pkg/uploader/cbor"
"github.com/elastic/go-elasticsearch/v8"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const (
// TODO: move to a config
maxFileSize = 104857600 // 100 MiB
maxUploadTimer = 24 * time.Hour
)

func (rt Router) handleUploadStart(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()
reqID := r.Header.Get(logger.HeaderRequestID)
zlog := log.With().
Str(ECSHTTPRequestID, reqID).
Logger()

// authentication occurs inside here
// to check that key agent ID matches the ID in the body payload yet-to-be unmarshalled
if err := rt.ut.handleUploadStart(&zlog, w, r); err != nil {
writeUploadError(err, w, zlog, start, "error initiating upload process")
return
}
}

func (rt Router) handleUploadChunk(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")
chunkID := ps.ByName("num")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

// simpler authentication check, for high chunk throughput
// since chunk checksums must match transit hash
// AND optionally the initial hash, both having stricter auth checks
if _, err := authAPIKey(r, rt.bulker, rt.ut.cache); err != nil {
writeUploadError(err, w, zlog, start, "authentication failure for chunk write")
return
}

chunkNum, err := strconv.Atoi(chunkID)
if err != nil {
writeUploadError(err, w, zlog, start, "error parsing chunk index")
return
}
if err := rt.ut.handleUploadChunk(&zlog, w, r, id, chunkNum); err != nil {
writeUploadError(err, w, zlog, start, "error uploading chunk")
return
}
}

func (rt Router) handleUploadComplete(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

// authentication occurs inside here, to ensure key agent ID
// matches the same agent ID the operation started with
if err := rt.ut.handleUploadComplete(&zlog, w, r, id); err != nil {
writeUploadError(err, w, zlog, start, "error finalizing upload")
return
}
}

type UploadT struct {
bulker bulk.Bulk
chunkClient *elasticsearch.Client
cache cache.Cache
uploader *uploader.Uploader
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *UploadT {
log.Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int64("maxFileSize", maxFileSize).
Msg("upload limits")

return &UploadT{
chunkClient: chunkClient,
bulker: bulker,
cache: cache,
uploader: uploader.New(chunkClient, bulker, cache, maxFileSize, maxUploadTimer),
}
}

func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) error { //nolint:unparam // log is standard first arg for the handlers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another way is to do something like this

func (ut *UploadT) handleUploadStart(_ *zerolog.Logger, w http.ResponseWriter, r *http.Request) error

// decode early to match agentID in the payload
payload, err := uploader.ReadDict(r.Body)
if err != nil {
if errors.Is(err, io.EOF) {
return fmt.Errorf("file info body is required: %w", err)
}
return err
}

// check API key matches payload agent ID
agentID, ok := payload.Str("agent_id")
if !ok || agentID == "" {
return errors.New("required field agent_id is missing")
}
if _, err := authAgent(r, &agentID, ut.bulker, ut.cache); err != nil {
return err
}

// validate payload, enrich with additional fields, and write metadata doc to ES
info, err := ut.uploader.Begin(r.Context(), payload)
if err != nil {
return err
}

// prepare and write response
out, err := json.Marshal(map[string]interface{}{
"upload_id": info.ID,
"chunk_size": info.ChunkSize,
})
if err != nil {
return err
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(out)
if err != nil {
return err
}
return nil
}

func (ut *UploadT) handleUploadChunk(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string, chunkID int) error {
chunkHash := strings.TrimSpace(r.Header.Get("X-Chunk-Sha2"))
if chunkHash == "" {
return errors.New("chunk hash header required")
}

upinfo, chunkInfo, err := ut.uploader.Chunk(r.Context(), uplID, chunkID, chunkHash)
if err != nil {
return err
}

// prevent over-sized chunks
data := http.MaxBytesReader(w, r.Body, uploader.MaxChunkSize)

// compute hash as we stream it
hash := sha256.New()
copier := io.TeeReader(data, hash)

ce := cbor.NewChunkWriter(copier, chunkInfo.Last, chunkInfo.BID, chunkInfo.SHA2, upinfo.ChunkSize)
if err := uploader.IndexChunk(r.Context(), ut.chunkClient, ce, upinfo.Source, chunkInfo.BID, chunkInfo.Pos); err != nil {
return err
}

hashsum := hex.EncodeToString(hash.Sum(nil))

if !strings.EqualFold(chunkHash, hashsum) {
// delete document, since we wrote it, but the hash was invalid
// context scoped to allow this operation to finish even if client disconnects
if err := uploader.DeleteChunk(context.Background(), ut.bulker, upinfo.Source, chunkInfo.BID, chunkInfo.Pos); err != nil {
zlog.Warn().Err(err).
Str("source", upinfo.Source).
Str("fileID", chunkInfo.BID).
Int("chunkNum", chunkInfo.Pos).
Msg("a chunk hash mismatch occurred, and fleet server was unable to remove the invalid chunk")
}
return uploader.ErrHashMismatch
}

return nil
}

func (ut *UploadT) handleUploadComplete(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string) error { //nolint:unparam // log is standard first arg for the handlers
info, err := ut.uploader.GetUploadInfo(r.Context(), uplID)
if err != nil {
return err
}
// need to auth that it matches the ID in the initial
// doc, but that means we had to doc-lookup early
if _, err := authAgent(r, &info.AgentID, ut.bulker, ut.cache); err != nil {
return fmt.Errorf("Error authenticating for upload finalization: %w", err)
}

var req UploadCompleteRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return errors.New("unable to parse request body")
}

hash := strings.TrimSpace(req.TransitHash.SHA256)
if hash == "" {
return errors.New("transit hash required")
}

info, err = ut.uploader.Complete(r.Context(), uplID, hash)
if err != nil {
return err
}

_, err = w.Write([]byte(`{"status":"ok"}`))
if err != nil {
return err
}
return nil
}

// helper function for doing all the error responsibilities
// at the HTTP edge
func writeUploadError(err error, w http.ResponseWriter, zlog zerolog.Logger, start time.Time, msg string) {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg(msg)
if e := resp.Write(w); e != nil {
zlog.Error().Err(e).Msg("failure writing error response")
}
}

type UploadCompleteRequest struct {
TransitHash struct {
SHA256 string `json:"sha256"`
} `json:"transithash"`
}
8 changes: 8 additions & 0 deletions internal/pkg/api/handleUpload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !integration
// +build !integration

package api
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests incoming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are unit tests planned for this pr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will add tests for the handlers. The handlers do very little aside from calling their respective operation in uploader module, so tests were added there first.

2 changes: 2 additions & 0 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
cntEnroll routeStats
cntAcks routeStats
cntStatus routeStats
cntUpload routeStats
cntArtifacts artifactStats
)

Expand Down Expand Up @@ -104,6 +105,7 @@ func init() {
cntArtifacts.Register(routesRegistry.NewRegistry("artifacts"))
cntAcks.Register(routesRegistry.NewRegistry("acks"))
cntStatus.Register(routesRegistry.NewRegistry("status"))
cntUpload.Register(routesRegistry.NewRegistry("upload"))
}

func (rt *routeStats) IncError(err error) {
Expand Down
32 changes: 26 additions & 6 deletions internal/pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
)

const (
RouteStatus = "/api/status"
RouteEnroll = "/api/fleet/agents/:id"
RouteCheckin = "/api/fleet/agents/:id/checkin"
RouteAcks = "/api/fleet/agents/:id/acks"
RouteArtifacts = "/api/fleet/artifacts/:id/:sha2"
RouteStatus = "/api/status"
RouteEnroll = "/api/fleet/agents/:id"
RouteCheckin = "/api/fleet/agents/:id/checkin"
RouteAcks = "/api/fleet/agents/:id/acks"
RouteArtifacts = "/api/fleet/artifacts/:id/:sha2"
RouteUploadBegin = "/api/fleet/uploads"
RouteUploadChunk = "/api/fleet/uploads/:id/:num"
RouteUploadComplete = "/api/fleet/uploads/:id"
)

type Router struct {
Expand All @@ -41,12 +44,13 @@ type Router struct {
at *ArtifactT
ack *AckT
st *StatusT
ut *UploadT
sm policy.SelfMonitor
tracer *apm.Tracer
bi build.Info
}

func NewRouter(cfg *config.Server, bulker bulk.Bulk, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, tracer *apm.Tracer, bi build.Info) *Router {
func NewRouter(cfg *config.Server, bulker bulk.Bulk, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, ut *UploadT, sm policy.SelfMonitor, tracer *apm.Tracer, bi build.Info) *Router {
rt := &Router{
cfg: cfg,
bulker: bulker,
Expand All @@ -57,6 +61,7 @@ func NewRouter(cfg *config.Server, bulker bulk.Bulk, ct *CheckinT, et *EnrollerT
ack: ack,
st: st,
tracer: tracer,
ut: ut,
bi: bi,
}

Expand Down Expand Up @@ -98,6 +103,21 @@ func (rt *Router) newHTTPRouter(addr string) *httprouter.Router {
RouteArtifacts,
limiter.WrapArtifact(rt.handleArtifacts, &cntArtifacts),
},
{
http.MethodPost,
RouteUploadBegin,
rt.handleUploadStart,
},
{
http.MethodPut,
RouteUploadChunk,
rt.handleUploadChunk,
},
{
http.MethodPost,
RouteUploadComplete,
rt.handleUploadComplete,
},
}

router := httprouter.New()
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRun(t *testing.T) {
et, err := NewEnrollerT(verCon, cfg, nil, c)
require.NoError(t, err)

router := NewRouter(cfg, bulker, ct, et, nil, nil, nil, nil, nil, fbuild.Info{})
router := NewRouter(cfg, bulker, ct, et, nil, nil, nil, nil, nil, nil, fbuild.Info{})
errCh := make(chan error)

var wg sync.WaitGroup
Expand Down
Loading