Skip to content

Commit

Permalink
file upload WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pzl committed Sep 22, 2022
1 parent 46ac14b commit 00d9ec4
Show file tree
Hide file tree
Showing 12 changed files with 703 additions and 15 deletions.
3 changes: 2 additions & 1 deletion cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,9 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
at := api.NewArtifactT(&cfg.Inputs[0].Server, bulker, f.cache)
ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache)
st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache)
ut := api.NewUploadT(&cfg.Inputs[0].Server, bulker, f.cache)

router := api.NewRouter(ctx, bulker, ct, et, at, ack, st, sm, tracer, f.bi)
router := api.NewRouter(ctx, bulker, ct, et, at, ack, st, ut, sm, tracer, f.bi)

g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error {
return api.Run(ctx, router, &cfg.Inputs[0].Server)
Expand Down
233 changes: 233 additions & 0 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package api

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/throttle"
"github.com/elastic/fleet-server/v7/internal/pkg/upload"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

const (
// TODO: move to a config
maxParallelUploads = 5

// specification-designated maximum
maxChunkSize = 4194304 // 4 MiB
)

func (rt Router) handleUploadStart(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(ECSHTTPRequestID, reqID).
Logger()

err := rt.ut.handleUploadStart(&zlog, w, r)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) || errors.Is(err, upload.ErrMaxConcurrentUploads) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail checkin")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

func (rt Router) handleUploadChunk(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")
chunkID := ps.ByName("num")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

chunkNum, err := strconv.Atoi(chunkID)
if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)
if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
return
}
err = rt.ut.handleUploadChunk(&zlog, w, r, id, chunkNum)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail checkin")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

func (rt Router) handleUploadComplete(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")

reqID := r.Header.Get(logger.HeaderRequestID)

zlog := log.With().
Str(LogAgentID, id).
Str(ECSHTTPRequestID, reqID).
Logger()

err := rt.ut.handleUploadComplete(&zlog, w, r, id)

if err != nil {
cntUpload.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Int64(ECSEventDuration, time.Since(start).Nanoseconds()).
Msg("fail checkin")

if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}

type UploadT struct {
bulker bulk.Bulk
cache cache.Cache
esThrottle *throttle.Throttle
limit *limit.Limiter
upl *upload.Uploader
}

func NewUploadT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *UploadT {
log.Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int("maxParallel", defaultMaxParallel).
Msg("Artifact install limits")

return &UploadT{
bulker: bulker,
cache: cache,
limit: limit.NewLimiter(&cfg.Limits.ArtifactLimit),
esThrottle: throttle.NewThrottle(defaultMaxParallel),
upl: upload.New(maxParallelUploads),
}
}

func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) error {
var fi FileInfo
if err := json.NewDecoder(r.Body).Decode(&fi); err != nil {
r.Body.Close()
if errors.Is(err, io.EOF) {
return fmt.Errorf("file info body is required: %w", err)
}
return err
}
r.Body.Close()

if strings.TrimSpace(fi.Name) == "" {
return errors.New("file name is required")
}
if fi.Size <= 0 {
return errors.New("invalid file size, size is required")
}

uploadID, err := ut.upl.Begin()
if err != nil {
return err
}

// TODO: write header doc

_, err = w.Write([]byte(uploadID))
if err != nil {
return err
}
return nil
}

func (ut *UploadT) handleUploadChunk(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string, chunkID int) error {
// prevent over-sized chunks
chunk := http.MaxBytesReader(w, r.Body, maxChunkSize)
data, err := ut.upl.Chunk(uplID, chunkID, chunk)
if err != nil {
return err
}

_, err = w.Write([]byte(data))
if err != nil {
return err
}
return nil
}

func (ut *UploadT) handleUploadComplete(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, uplID string) error {
data, err := ut.upl.Complete(uplID)
if err != nil {
return err
}

_, err = w.Write([]byte(data))
if err != nil {
return err
}
return nil
}
8 changes: 8 additions & 0 deletions internal/pkg/api/handleUpload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !integration
// +build !integration

package api
2 changes: 2 additions & 0 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
cntEnroll routeStats
cntAcks routeStats
cntStatus routeStats
cntUpload routeStats
cntArtifacts artifactStats
)

Expand Down Expand Up @@ -103,6 +104,7 @@ func init() {
cntArtifacts.Register(routesRegistry.NewRegistry("artifacts"))
cntAcks.Register(routesRegistry.NewRegistry("acks"))
cntStatus.Register(routesRegistry.NewRegistry("status"))
cntUpload.Register(routesRegistry.NewRegistry("upload"))
}

func (rt *routeStats) IncError(err error) {
Expand Down
32 changes: 26 additions & 6 deletions internal/pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
)

const (
RouteStatus = "/api/status"
RouteEnroll = "/api/fleet/agents/:id"
RouteCheckin = "/api/fleet/agents/:id/checkin"
RouteAcks = "/api/fleet/agents/:id/acks"
RouteArtifacts = "/api/fleet/artifacts/:id/:sha2"
RouteStatus = "/api/status"
RouteEnroll = "/api/fleet/agents/:id"
RouteCheckin = "/api/fleet/agents/:id/checkin"
RouteAcks = "/api/fleet/agents/:id/acks"
RouteArtifacts = "/api/fleet/artifacts/:id/:sha2"
RouteUploadBegin = "/api/fleet/uploads"
RouteUploadChunk = "/api/fleet/uploads/:id/:num"
RouteUploadComplete = "/api/fleet/uploads/:id"
)

type Router struct {
Expand All @@ -34,11 +37,12 @@ type Router struct {
at *ArtifactT
ack *AckT
st *StatusT
ut *UploadT
sm policy.SelfMonitor
bi build.Info
}

func NewRouter(ctx context.Context, bulker bulk.Bulk, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, tracer *apm.Tracer, bi build.Info) *httprouter.Router {
func NewRouter(ctx context.Context, bulker bulk.Bulk, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, ut *UploadT, sm policy.SelfMonitor, tracer *apm.Tracer, bi build.Info) *httprouter.Router {
r := Router{
ctx: ctx,
bulker: bulker,
Expand All @@ -48,6 +52,7 @@ func NewRouter(ctx context.Context, bulker bulk.Bulk, ct *CheckinT, et *Enroller
at: at,
ack: ack,
st: st,
ut: ut,
bi: bi,
}

Expand Down Expand Up @@ -81,6 +86,21 @@ func NewRouter(ctx context.Context, bulker bulk.Bulk, ct *CheckinT, et *Enroller
RouteArtifacts,
r.handleArtifacts,
},
{
http.MethodPost,
RouteUploadBegin,
r.handleUploadStart,
},
{
http.MethodPut,
RouteUploadChunk,
r.handleUploadChunk,
},
{
http.MethodPost,
RouteUploadComplete,
r.handleUploadComplete,
},
}

router := httprouter.New()
Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/api/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,10 @@ type StatusResponse struct {
Status string `json:"status"`
Version *StatusResponseVersion `json:"version,omitempty"`
}

type FileInfo struct {
Size int64 `json:"size"`
Name string `json:"name"`
Extension string `json:"ext"`
Mime string `json:"mime_type"`
}
2 changes: 1 addition & 1 deletion internal/pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestRun(t *testing.T) {
et, err := NewEnrollerT(verCon, cfg, nil, c)
require.NoError(t, err)

router := NewRouter(ctx, bulker, ct, et, nil, nil, nil, nil, nil, fbuild.Info{})
router := NewRouter(ctx, bulker, ct, et, nil, nil, nil, nil, nil, nil, fbuild.Info{})
errCh := make(chan error)

var wg sync.WaitGroup
Expand Down
27 changes: 27 additions & 0 deletions internal/pkg/dl/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package dl

import (
"context"
"encoding/json"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
)

func CreateUploadInfo(ctx context.Context, bulker bulk.Bulk, fi model.FileInfo) (string, error) {
return createUploadInfo(ctx, bulker, "files", fi) // @todo: index destination is an input (and different per integration)
}

func createUploadInfo(ctx context.Context, bulker bulk.Bulk, index string, fi model.FileInfo) (string, error) {
body, err := json.Marshal(fi)
if err != nil {
return "", err
}

// @todo: proper doc ID
return bulker.Create(ctx, index, "", body, bulk.WithRefresh())
}
Loading

0 comments on commit 00d9ec4

Please sign in to comment.