-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File Upload Feature #1902
File Upload Feature #1902
Conversation
518df39
to
de75a87
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a very quick 1st look here.
I can see that currently the api handers are interacting with upload
, and dl
to add files to ES.
However I think a cleaner implementation would be to have the ES interactions in upload
so that the caller can just call something like upload.Chunk(uplID, chunkID, r.Body) error
. What do you think?
when retry is disabled (bubbles up to be equivalent to enabling longpolling for us), the elasticsearch.Client does not internally buffer the request (which is used in retrying and replaying the request). Chunks should not be memory-buffered, and should be retried at the integration level, since a retry-and-resume-friendly API is presented to them
internal/pkg/api/handleUpload.go
Outdated
} | ||
} | ||
|
||
func (ut *UploadT) handleUploadStart(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) error { //nolint:unparam // log is standard first arg for the handlers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another way is to do something like this
func (ut *UploadT) handleUploadStart(_ *zerolog.Logger, w http.ResponseWriter, r *http.Request) error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, assuming the @todos will be addressed before merging.
left few comments/nits, please respond or address.
c.mut.RLock() | ||
defer c.mut.RUnlock() | ||
|
||
scopedKey := "upload:" + id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe use a function that makes the key string to avoid doing this in two places, easier to make mistake if anything changes.
} | ||
|
||
// now ensure all positions are accounted for, no gaps, etc | ||
sort.Slice(chunks, func(i, j int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question/nit: could you avoid sorting if the chunks are always selected "ordered by pos" from elasticsearch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I can guarantee that they are always sorted correctly by ES. The positional information is not in a sortable field, it is in just a portion of the document's _id
. Considering this is a bounded size slice (limited maximum file size), sorting was not out of the question
log.Debug().Int("chunkID", i).Msg("non-final chunk was incorrectly marked last") | ||
return false | ||
} | ||
if chunk.Size != int(info.ChunkSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just have the chunk size defined as int64(or uint64) for both structs and avoid casting? the int is int64 anyways, the fleet server is built for 64bit only atm
@cmacknz @michel-laterman would like to have somebody from the agent/fleet-server team to review this as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, other than CHANGELOG missing, and few TODOs in the code.
rewritten for multiple fleet server
internal/pkg/uploader/upload.go
Outdated
// Searches for Upload Metadata document in local memory cache if available | ||
// otherwise, fetches from elasticsearch and caches for next use | ||
func (u *Uploader) GetUploadInfo(ctx context.Context, uploadID string) (upload.Info, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Suggestion | Go convention]
// Searches for Upload Metadata document in local memory cache if available | |
// otherwise, fetches from elasticsearch and caches for next use | |
func (u *Uploader) GetUploadInfo(ctx context.Context, uploadID string) (upload.Info, error) { | |
// GetUploadInfo searches for Upload Metadata document in local memory cache if available | |
// otherwise, fetches from elasticsearch and caches for next use | |
func (u *Uploader) GetUploadInfo(ctx context.Context, uploadID string) (upload.Info, error) { |
internal/pkg/uploader/upload/info.go
Outdated
// the only valid values of upload status according to storage spec | ||
type Status string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Suggestion | Go convention]
// the only valid values of upload status according to storage spec | |
type Status string | |
// Status represents the only valid values of upload status according to storage spec | |
type Status string |
Checked out the latest and tested again e2e, LGTM! I'm able to get files from the Endpoint host: This also includes testing with Fleet installation of the relevant indices. This includes running open my PR locally to update the mappings needed by the new fleet server implementation. I'll merge this PR soon. |
What is the problem this PR solves?
Adding the ability to upload files from integrations, through fleet server, into elasticsearch
Link to Swagger docs for API contract
A followup PR is under way to add this API to the new
openapi.yml
.Extracting a few heuristics (file size limit, timeout) into fleet server configs will also come as a followup during scale testing
How does this PR solve the problem?
Adds three HTTP routes,
/api/fleet/uploads
: Begins an "upload operation". Includes the full metadata about a file, with some required fields like name and size/api/fleet/uploads/<uploadID>/<chunkNum>
: Uploading a segment (chunk) of the file contents./api/fleet/uploads/<uploadID>
: Completes an "upload operation". Fleet server verifies all contents were uploaded, and initially-provided checksums matchHow to test this PR locally
These routes are API-Key authorized, and as well deal with file chunking and checksums. It may be difficult to test routes in isolation without a nearly-fully-operational client implementation. Both Endpoint integration and the Agent itself are developing client implementations for this release.
If one were to (for development or checking purposes) disable the API key checking, they may be able to use this gist as a mock file upload client.
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues