From 00f381f4ca9f3cf3004c7dc2a4b786e207b72378 Mon Sep 17 00:00:00 2001 From: pzl Date: Tue, 6 Feb 2024 16:54:26 -0500 Subject: [PATCH 1/3] add failing test reproducing the bug --- internal/pkg/file/delivery/delivery_test.go | 59 +++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/internal/pkg/file/delivery/delivery_test.go b/internal/pkg/file/delivery/delivery_test.go index e1dc2626a..6a65428ce 100644 --- a/internal/pkg/file/delivery/delivery_test.go +++ b/internal/pkg/file/delivery/delivery_test.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "testing" @@ -252,6 +253,64 @@ func TestSendFileMultipleChunksUsesBackingIndex(t *testing.T) { require.NoError(t, err) } +func TestSendFileHandlesDisorderedChunks(t *testing.T) { + buf := bytes.NewBuffer(nil) + + fakeBulk := itesting.NewMockBulk() + esClient, esMock := mockESClient(t) + + const fileID = "xyz" + idx := fmt.Sprintf(FileDataIndexPattern, "endpoint") + "-0001" + sampleDocBody := hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD") + + chunks := []file.ChunkInfo{ + {Index: idx, ID: fileID + ".20"}, + {Index: idx, ID: fileID + ".21"}, + {Index: idx, ID: fileID + ".22"}, + {Index: idx, ID: fileID + ".9"}, + {Index: idx, ID: fileID + ".10"}, + {Index: idx, ID: fileID + ".11"}, + {Index: idx, ID: fileID + ".12"}, + {Index: idx, ID: fileID + ".13"}, + {Index: idx, ID: fileID + ".14"}, + {Index: idx, ID: fileID + ".15"}, + {Index: idx, ID: fileID + ".16"}, + {Index: idx, ID: fileID + ".17"}, + {Index: idx, ID: fileID + ".18"}, + {Index: idx, ID: fileID + ".19"}, + {Index: idx, ID: fileID + ".0"}, + {Index: idx, ID: fileID + ".1"}, + {Index: idx, ID: fileID + ".2"}, + {Index: idx, ID: fileID + ".3"}, + {Index: idx, ID: fileID + ".4"}, + {Index: idx, ID: fileID + ".5"}, + {Index: idx, ID: fileID + ".6"}, + {Index: idx, ID: fileID + ".7"}, + {Index: idx, ID: fileID + ".8"}, + } + + expectedIdx := 0 + + esMock.RoundTripFn = func(req *http.Request) (*http.Response, error) { + + // Parse out the chunk number requested + parts := strings.Split(req.URL.Path, "/") // ["", ".fleet-filedelivery-data-endpoint-0001", "_doc", "xyz.1"] + docIdx := strings.TrimPrefix(parts[3], fileID+".") + docnum, err := strconv.Atoi(docIdx) + require.NoError(t, err) + + // should be our expected increasing counter + assert.Equal(t, expectedIdx, docnum) + expectedIdx += 1 + + return sendBodyBytes(sampleDocBody), nil + } + + d := New(esClient, fakeBulk, -1) + err := d.SendFile(context.Background(), zerolog.Logger{}, buf, chunks, fileID) + require.NoError(t, err) +} + /* Setup to convert a *elasticsearch.Client as a harmless mock by replacing the Transport to nowhere From a72cfef25d6e8c19d7b8ae25e052ee3f58b5ab1a Mon Sep 17 00:00:00 2001 From: pzl Date: Tue, 6 Feb 2024 17:00:56 -0500 Subject: [PATCH 2/3] make sure file data is fetched in-order --- internal/pkg/file/delivery/delivery.go | 4 ++ internal/pkg/file/delivery/delivery_test.go | 46 ++++++++++----------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/internal/pkg/file/delivery/delivery.go b/internal/pkg/file/delivery/delivery.go index a4485e780..f6d3c1ccc 100644 --- a/internal/pkg/file/delivery/delivery.go +++ b/internal/pkg/file/delivery/delivery.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "sort" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/file" @@ -77,6 +78,9 @@ func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileI func (d *Deliverer) SendFile(ctx context.Context, zlog zerolog.Logger, w io.Writer, chunks []file.ChunkInfo, fileID string) error { span, ctx := apm.StartSpan(ctx, "response", "write") defer span.End() + sort.SliceStable(chunks, func(i, j int) bool { + return chunks[i].Pos < chunks[j].Pos + }) for _, chunkInfo := range chunks { body, err := readChunkStream(ctx, d.client, chunkInfo.Index, chunkInfo.ID) if err != nil { diff --git a/internal/pkg/file/delivery/delivery_test.go b/internal/pkg/file/delivery/delivery_test.go index 6a65428ce..895cb36ca 100644 --- a/internal/pkg/file/delivery/delivery_test.go +++ b/internal/pkg/file/delivery/delivery_test.go @@ -264,29 +264,29 @@ func TestSendFileHandlesDisorderedChunks(t *testing.T) { sampleDocBody := hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD") chunks := []file.ChunkInfo{ - {Index: idx, ID: fileID + ".20"}, - {Index: idx, ID: fileID + ".21"}, - {Index: idx, ID: fileID + ".22"}, - {Index: idx, ID: fileID + ".9"}, - {Index: idx, ID: fileID + ".10"}, - {Index: idx, ID: fileID + ".11"}, - {Index: idx, ID: fileID + ".12"}, - {Index: idx, ID: fileID + ".13"}, - {Index: idx, ID: fileID + ".14"}, - {Index: idx, ID: fileID + ".15"}, - {Index: idx, ID: fileID + ".16"}, - {Index: idx, ID: fileID + ".17"}, - {Index: idx, ID: fileID + ".18"}, - {Index: idx, ID: fileID + ".19"}, - {Index: idx, ID: fileID + ".0"}, - {Index: idx, ID: fileID + ".1"}, - {Index: idx, ID: fileID + ".2"}, - {Index: idx, ID: fileID + ".3"}, - {Index: idx, ID: fileID + ".4"}, - {Index: idx, ID: fileID + ".5"}, - {Index: idx, ID: fileID + ".6"}, - {Index: idx, ID: fileID + ".7"}, - {Index: idx, ID: fileID + ".8"}, + {Index: idx, ID: fileID + ".20", Pos: 20}, + {Index: idx, ID: fileID + ".21", Pos: 21}, + {Index: idx, ID: fileID + ".22", Pos: 22}, + {Index: idx, ID: fileID + ".9", Pos: 9}, + {Index: idx, ID: fileID + ".10", Pos: 10}, + {Index: idx, ID: fileID + ".11", Pos: 11}, + {Index: idx, ID: fileID + ".12", Pos: 12}, + {Index: idx, ID: fileID + ".13", Pos: 13}, + {Index: idx, ID: fileID + ".14", Pos: 14}, + {Index: idx, ID: fileID + ".15", Pos: 15}, + {Index: idx, ID: fileID + ".16", Pos: 16}, + {Index: idx, ID: fileID + ".17", Pos: 17}, + {Index: idx, ID: fileID + ".18", Pos: 18}, + {Index: idx, ID: fileID + ".19", Pos: 19}, + {Index: idx, ID: fileID + ".0", Pos: 0}, + {Index: idx, ID: fileID + ".1", Pos: 1}, + {Index: idx, ID: fileID + ".2", Pos: 2}, + {Index: idx, ID: fileID + ".3", Pos: 3}, + {Index: idx, ID: fileID + ".4", Pos: 4}, + {Index: idx, ID: fileID + ".5", Pos: 5}, + {Index: idx, ID: fileID + ".6", Pos: 6}, + {Index: idx, ID: fileID + ".7", Pos: 7}, + {Index: idx, ID: fileID + ".8", Pos: 8}, } expectedIdx := 0 From cded2923f4d53ee6d45c3e88c2af94c42ce10565 Mon Sep 17 00:00:00 2001 From: pzl Date: Thu, 15 Feb 2024 08:56:47 -0500 Subject: [PATCH 3/3] add changelog --- .../fragments/1708005256-file-order-fix.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1708005256-file-order-fix.yaml diff --git a/changelog/fragments/1708005256-file-order-fix.yaml b/changelog/fragments/1708005256-file-order-fix.yaml new file mode 100644 index 000000000..07fa0d524 --- /dev/null +++ b/changelog/fragments/1708005256-file-order-fix.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: fix file reassembly for large files + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 3283 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234