Skip to content

Commit

Permalink
Fix file delivery order (#3283)
Browse files Browse the repository at this point in the history
(cherry picked from commit 485fad9)
  • Loading branch information
pzl authored and mergify[bot] committed Feb 22, 2024
1 parent 8c2cfe9 commit 9a09d8a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1708005256-file-order-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: fix file reassembly for large files

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3283

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
4 changes: 4 additions & 0 deletions internal/pkg/file/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"io"
"sort"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/file"
Expand Down Expand Up @@ -77,6 +78,9 @@ func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileI
func (d *Deliverer) SendFile(ctx context.Context, zlog zerolog.Logger, w io.Writer, chunks []file.ChunkInfo, fileID string) error {
span, ctx := apm.StartSpan(ctx, "response", "write")
defer span.End()
sort.SliceStable(chunks, func(i, j int) bool {
return chunks[i].Pos < chunks[j].Pos
})
for _, chunkInfo := range chunks {
body, err := readChunkStream(ctx, d.client, chunkInfo.Index, chunkInfo.ID)
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions internal/pkg/file/delivery/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -253,6 +254,64 @@ func TestSendFileMultipleChunksUsesBackingIndex(t *testing.T) {
require.NoError(t, err)
}

func TestSendFileHandlesDisorderedChunks(t *testing.T) {
buf := bytes.NewBuffer(nil)

fakeBulk := itesting.NewMockBulk()
esClient, esMock := mockESClient(t)

const fileID = "xyz"
idx := fmt.Sprintf(FileDataIndexPattern, "endpoint") + "-0001"
sampleDocBody := hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")

chunks := []file.ChunkInfo{
{Index: idx, ID: fileID + ".20", Pos: 20},
{Index: idx, ID: fileID + ".21", Pos: 21},
{Index: idx, ID: fileID + ".22", Pos: 22},
{Index: idx, ID: fileID + ".9", Pos: 9},
{Index: idx, ID: fileID + ".10", Pos: 10},
{Index: idx, ID: fileID + ".11", Pos: 11},
{Index: idx, ID: fileID + ".12", Pos: 12},
{Index: idx, ID: fileID + ".13", Pos: 13},
{Index: idx, ID: fileID + ".14", Pos: 14},
{Index: idx, ID: fileID + ".15", Pos: 15},
{Index: idx, ID: fileID + ".16", Pos: 16},
{Index: idx, ID: fileID + ".17", Pos: 17},
{Index: idx, ID: fileID + ".18", Pos: 18},
{Index: idx, ID: fileID + ".19", Pos: 19},
{Index: idx, ID: fileID + ".0", Pos: 0},
{Index: idx, ID: fileID + ".1", Pos: 1},
{Index: idx, ID: fileID + ".2", Pos: 2},
{Index: idx, ID: fileID + ".3", Pos: 3},
{Index: idx, ID: fileID + ".4", Pos: 4},
{Index: idx, ID: fileID + ".5", Pos: 5},
{Index: idx, ID: fileID + ".6", Pos: 6},
{Index: idx, ID: fileID + ".7", Pos: 7},
{Index: idx, ID: fileID + ".8", Pos: 8},
}

expectedIdx := 0

esMock.RoundTripFn = func(req *http.Request) (*http.Response, error) {

// Parse out the chunk number requested
parts := strings.Split(req.URL.Path, "/") // ["", ".fleet-filedelivery-data-endpoint-0001", "_doc", "xyz.1"]
docIdx := strings.TrimPrefix(parts[3], fileID+".")
docnum, err := strconv.Atoi(docIdx)
require.NoError(t, err)

// should be our expected increasing counter
assert.Equal(t, expectedIdx, docnum)
expectedIdx += 1

return sendBodyBytes(sampleDocBody), nil
}

d := New(esClient, fakeBulk, -1)
err := d.SendFile(context.Background(), zerolog.Logger{}, buf, chunks, fileID)
require.NoError(t, err)
}

/*
Setup to convert a *elasticsearch.Client as a harmless mock
by replacing the Transport to nowhere
Expand Down

0 comments on commit 9a09d8a

Please sign in to comment.