Skip to content

Commit

Permalink
refactor out of dl, support arbitrary req payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
pzl committed Nov 7, 2022
1 parent 2fea5df commit 3e64ef7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 367 deletions.
88 changes: 34 additions & 54 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"hash"
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
Expand All @@ -21,10 +22,8 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/upload"
"github.com/elastic/fleet-server/v7/internal/pkg/upload/cbor"
"github.com/elastic/go-elasticsearch/v7"
Expand Down Expand Up @@ -191,15 +190,23 @@ func NewUploadT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch
}

func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) error {

// store raw body since we will json-decode twice
// 2MB is a reasonable json payload size. Any more might be an indication of garbage
body, err := ioutil.ReadAll(io.LimitReader(r.Body, 2*1024*1024))
r.Body.Close()
if err != nil {
return fmt.Errorf("error reading request: %w", err)
}

// decode once here to access known fields we need to parse and work with
var fi FileInfo
if err := json.NewDecoder(r.Body).Decode(&fi); err != nil {
r.Body.Close()
if err := json.Unmarshal(body, &fi); err != nil {
if errors.Is(err, io.EOF) {
return fmt.Errorf("file info body is required: %w", err)
}
return err
}
r.Body.Close()

if err := validateUploadPayload(fi); err != nil {
return err
Expand All @@ -223,8 +230,17 @@ func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter
return err
}

doc := uploadRequestToFileInfo(fi, op.ChunkSize)
ret, err := dl.CreateUploadInfo(r.Context(), ut.bulker, doc, fi.Source, docID)
// second decode here to maintain the arbitrary shape and fields we will just pass through
var reqDoc map[string]interface{}
if err := json.Unmarshal(body, &reqDoc); err != nil {
return fmt.Errorf("error parsing request json: %w", err)
}

doc, err := uploadRequestToFileDoc(reqDoc, op.ChunkSize)
if err != nil {
return fmt.Errorf("unable to convert request to file metadata doc: %w", err)
}
ret, err := upload.CreateFileDoc(r.Context(), ut.bulker, doc, fi.Source, docID)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,7 +277,7 @@ func (ut *UploadT) handleUploadChunk(zlog *zerolog.Logger, w http.ResponseWriter
// prevent over-sized chunks
data := http.MaxBytesReader(w, r.Body, upload.MaxChunkSize)
ce := cbor.NewChunkWriter(data, chunkInfo.Final, chunkInfo.Upload.DocID, chunkInfo.Upload.ChunkSize)
if err := dl.UploadChunk(r.Context(), ut.chunkClient, ce, chunkInfo.Upload.Source, chunkInfo.Upload.DocID, chunkInfo.ID); err != nil {
if err := upload.IndexChunk(r.Context(), ut.chunkClient, ce, chunkInfo.Upload.Source, chunkInfo.Upload.DocID, chunkInfo.ID); err != nil {
return err
}
return nil
Expand All @@ -287,54 +303,18 @@ func (ut *UploadT) handleUploadComplete(zlog *zerolog.Logger, w http.ResponseWri
return nil
}

func uploadRequestToFileInfo(req FileInfo, chunkSize int64) model.FileInfo {
primaryFile := fileRequestToFileData(req.File)
primaryFile.ChunkSize = chunkSize
primaryFile.Status = string(UploadAwaiting)

contents := make([]model.FileData, len(req.Contents))
for i, f := range req.Contents {
contents[i] = fileRequestToFileData(f)
// takes the arbitrary input document from an upload request and injects
// a few known fields as it passes through
func uploadRequestToFileDoc(req map[string]interface{}, chunkSize int64) ([]byte, error) {
fileObj, ok := req["file"].(map[string]interface{})
if !ok {
return nil, errors.New("invalid upload request. File is not an object")
}

return model.FileInfo{
File: &primaryFile,
Contents: contents,
ActionID: req.ActionID,
AgentID: req.AgentID,
Source: req.Source,
}
}
fileObj["ChunkSize"] = chunkSize
fileObj["Status"] = string(UploadAwaiting)

func fileRequestToFileData(req FileData) model.FileData {
return model.FileData{
Accessed: req.Accessed,
Attributes: req.Attributes,
Compression: req.Compression,
Created: req.Created,
Ctime: req.CTime,
Device: req.Device,
Directory: req.Directory,
DriveLetter: req.DriveLetter,
Extension: req.Extension,
Gid: req.GID,
Group: req.Group,
Hash: &model.Hash{
Sha256: req.Hash.SHA256,
Md5: req.Hash.MD5,
},
Inode: req.INode,
MimeType: req.Mime,
Mode: req.Mode,
Mtime: req.MTime,
Name: req.Name,
Owner: req.Owner,
Path: req.Path,
Size: req.Size,
TargetPath: req.TargetPath,
Type: req.Type,
Uid: req.UID,
}
return json.Marshal(req)
}

func updateUploadStatus(ctx context.Context, bulker bulk.Bulk, info upload.Info, status UploadStatus) error {
Expand All @@ -348,7 +328,7 @@ func updateUploadStatus(ctx context.Context, bulker bulk.Bulk, info upload.Info,
if err != nil {
return err
}
return dl.UpdateUpload(ctx, bulker, info.Source, info.DocID, data)
return upload.UpdateFileDoc(ctx, bulker, info.Source, info.DocID, data)
}

func validateUploadPayload(fi FileInfo) error {
Expand Down
118 changes: 0 additions & 118 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 7 additions & 19 deletions internal/pkg/dl/upload.go → internal/pkg/upload/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package dl
package upload

import (
"context"
Expand Down Expand Up @@ -36,33 +36,21 @@ var (
func prepareFindChunkIDs() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
root.Param(FieldSource, false)
root.Param("_source", false) // do not return large data payload
root.Query().Term(FieldBaseID, tmpl.Bind(FieldBaseID), nil)
tmpl.MustResolve(root)
return tmpl
}

func CreateUploadInfo(ctx context.Context, bulker bulk.Bulk, fi model.FileInfo, source string, fileID string) (string, error) {
return createUploadInfo(ctx, bulker, fmt.Sprintf(FileHeaderIndexPattern, source), fi, fileID)
func CreateFileDoc(ctx context.Context, bulker bulk.Bulk, doc []byte, source string, fileID string) (string, error) {
return bulker.Create(ctx, fmt.Sprintf(FileHeaderIndexPattern, source), fileID, doc, bulk.WithRefresh())
}

func createUploadInfo(ctx context.Context, bulker bulk.Bulk, index string, fi model.FileInfo, fileID string) (string, error) {
body, err := json.Marshal(fi)
if err != nil {
return "", err
}
return bulker.Create(ctx, index, fileID, body, bulk.WithRefresh())
}

func UpdateUpload(ctx context.Context, bulker bulk.Bulk, source string, fileID string, data []byte) error {
return updateUpload(ctx, bulker, fmt.Sprintf(FileHeaderIndexPattern, source), fileID, data)
}

func updateUpload(ctx context.Context, bulker bulk.Bulk, index string, fileID string, data []byte) error {
return bulker.Update(ctx, index, fileID, data)
func UpdateFileDoc(ctx context.Context, bulker bulk.Bulk, source string, fileID string, data []byte) error {
return bulker.Update(ctx, fmt.Sprintf(FileHeaderIndexPattern, source), fileID, data)
}

func UploadChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.ChunkEncoder, source string, docID string, chunkID int) error {
func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.ChunkEncoder, source string, docID string, chunkID int) error {

/*
// the non-streaming version
Expand Down
5 changes: 2 additions & 3 deletions internal/pkg/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/throttle"
"github.com/gofrs/uuid"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -258,7 +257,7 @@ func (u *Uploader) finalize(uplID string) error {
}

func (u *Uploader) allChunksPresent(info Info, bulker bulk.Bulk) (bool, error) {
hits, err := dl.ListChunkIDs(context.TODO(), bulker, info.Source, info.DocID)
hits, err := ListChunkIDs(context.TODO(), bulker, info.Source, info.DocID)
if err != nil {
log.Warn().Err(err).Msg("error listing chunks")
return false, err
Expand Down Expand Up @@ -295,7 +294,7 @@ func (u *Uploader) verifyChunkData(info Info, bulker bulk.Bulk) (bool, error) {
// verify hash

for i := 0; i < info.Count; i++ {
chunk, err := dl.GetChunk(context.TODO(), bulker, info.Source, info.DocID, i)
chunk, err := GetChunk(context.TODO(), bulker, info.Source, info.DocID, i)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 3e64ef7

Please sign in to comment.