From 363806be36ead85358af7583755ab2193bf3eda1 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Mon, 15 May 2023 12:20:12 -0700 Subject: [PATCH] [chore] [exporter/splunkhec] Simplify the batching process Rework how the batches are created and sent to the backend to make it easier to read the code. Also this change fixes an issue introduced with removing of the additional `index` field of the `bufferState` struct: if a hec batch combining several OTel resources gets rejected with a retryable error, only the latests read resource/scope records were sent for retry. The fix is confirmed by additional TestPushLogsRetryableFailureMultipleResources test. This also improves performance by removing a bunch of memory allocations. --- exporter/splunkhecexporter/buffer.go | 7 +- exporter/splunkhecexporter/client.go | 396 ++++++++++------------ exporter/splunkhecexporter/client_test.go | 62 ++-- 3 files changed, 212 insertions(+), 253 deletions(-) diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index 9836bff634d3..e12481ccd52d 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -36,9 +36,6 @@ type bufferState struct { maxEventLength uint writer io.Writer buf *bytes.Buffer - resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list - library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list - record int // index in Logs/Metrics/Spans list rawLength int } @@ -190,9 +187,7 @@ type bufferStatePool struct { // get returns a bufferState from the pool. func (p bufferStatePool) get() *bufferState { - bf := p.pool.Get().(*bufferState) - bf.reset() - return bf + return p.pool.Get().(*bufferState) } // put returns a bufferState to the pool. diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index b2f97be15694..3f17219303a3 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -40,6 +40,14 @@ var getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error return c.pushLogData } +// iterState captures a state of iteration over the pdata Logs/Metrics/Traces instances. +type iterState struct { + resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list + library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list + record int // index in Logs/Metrics/Spans list + done bool +} + // client sends the data to the splunk backend. type client struct { config *Config @@ -157,215 +165,173 @@ func isProfilingData(sl plog.ScopeLogs) bool { func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rls = ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - ills := rls.At(i).ScopeLogs() - for j := 0; j < ills.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushLogRecords(ctx, rls, bufState, headers) - - if err != nil { - return consumererror.NewLogs(err, subLogs(ld, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewLogs(err, subLogs(ld, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent non-profiling data - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewLogs(err, subLogs(ld, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) } -func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := lds.At(state.resource) - logs := res.ScopeLogs().At(state.library).LogRecords() - - state.record = 0 - for k := 0; k < logs.Len(); k++ { - var b []byte - - if c.config.ExportRaw { - b = []byte(logs.At(k).Body().AsString() + "\n") - } else { - // Parsing log record to Splunk event. - event := mapLogRecordToSplunkEvent(res.Resource(), logs.At(k), c.config) - // JSON encoding event and writing to buffer. - var err error - b, err = jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped log event: %v, error: %w", event, err))) - continue - } - - } - - // Continue adding events to buffer up to capacity. - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +// fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed. +func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (iterState, []error) { + var b []byte + var permanentErrors []error - if state.containsData() { - if err := c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < logs.ResourceLogs().Len(); i++ { + rl := logs.ResourceLogs().At(i) + for j := is.library; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := is.record; k < sl.LogRecords().Len(); k++ { + logRecord := sl.LogRecords().At(k) + + if c.config.ExportRaw { + b = []byte(logRecord.Body().AsString() + "\n") + } else { + // Parsing log record to Splunk event. + event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config) + // JSON encoding event and writing to buffer. + var err error + b, err = jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "dropped log event: %v, error: %w", event, err))) + continue + } + } + + // Continue adding events to buffer up to capacity. + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, + consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped log event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } -func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMetricsSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := mds.At(state.resource) - metrics := res.ScopeMetrics().At(state.library).Metrics() - - state.record = 0 - for k := 0; k < metrics.Len(); k++ { - // Parsing metric record to Splunk event. - events := mapMetricToSplunkEvent(res.Resource(), metrics.At(k), c.config, c.logger) - buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) - if c.config.UseMultiMetricFormat { - merged, err := mergeEventsToMultiMetricFormat(events) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err))) - } else { - events = merged - } - } - for _, event := range events { - // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) - continue - } - buf.Write(b) - } - - // Continue adding events to buffer up to capacity. - b := buf.Bytes() - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is iterState) (iterState, []error) { + var permanentErrors []error - if state.containsData() { - if err := c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + for j := is.library; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + for k := is.record; k < sm.Metrics().Len(); k++ { + metric := sm.Metrics().At(k) + + // Parsing metric record to Splunk event. + events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) + buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) + if c.config.UseMultiMetricFormat { + merged, err := mergeEventsToMultiMetricFormat(events) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error merging events: %w", err))) + } else { + events = merged + } + } + for _, event := range events { + // JSON encoding event and writing to buffer. + b, err := jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) + continue + } + buf.Write(b) + } + + // Continue adding events to buffer up to capacity. + b := buf.Bytes() + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } -func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSlice, state *bufferState, headers map[string]string) (permanentErrors []error, sendingError error) { - res := tds.At(state.resource) - spans := res.ScopeSpans().At(state.library).Spans() - - state.record = 0 - for k := 0; k < spans.Len(); k++ { - // Parsing span record to Splunk event. - event := mapSpanToSplunkEvent(res.Resource(), spans.At(k), c.config) - // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) - continue - } - - // Continue adding events to buffer up to capacity. - accepted, e := state.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - continue - } - if accepted { - continue - } +func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iterState) (iterState, []error) { + var permanentErrors []error - if state.containsData() { - if err = c.postEvents(ctx, state, headers); err != nil { - return permanentErrors, err + for i := is.resource; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + for j := is.library; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := is.record; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + + // Parsing span record to Splunk event. + event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) + // JSON encoding event and writing to buffer. + b, err := jsoniter.Marshal(event) + if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) + continue + } + + // Continue adding events to buffer up to capacity. + accepted, e := bs.accept(b) + if e != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", e))) + continue + } + if !accepted { + if bs.containsData() { + return iterState{i, j, k, false}, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), bs.bufferMaxLen))) + return iterState{i, j, k + 1, false}, permanentErrors + } } } - state.reset() - - // Writing truncated bytes back to buffer. - accepted, e = state.accept(b) - if accepted { - state.record = k - continue - } - - state.record = k + 1 - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("error writing the event: %w", e))) - } else { - permanentErrors = append(permanentErrors, consumererror.NewPermanent( - fmt.Errorf("dropped trace event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) - } } - return permanentErrors, nil + return iterState{done: true}, permanentErrors } // pushMetricsDataInBatches sends batches of Splunk events in JSON format. @@ -374,32 +340,22 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rms = md.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - ilms := rms.At(i).ScopeMetrics() - for j := 0; j < ilms.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushMetricsRecords(ctx, rms, bufState, headers) - - if err != nil { - return consumererror.NewMetrics(err, subMetrics(md, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewMetrics(err, subMetrics(md, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent metrics - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewMetrics(err, subMetrics(md, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) @@ -411,32 +367,22 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error { bufState := c.bufferStatePool.get() defer c.bufferStatePool.put(bufState) - + is := iterState{} var permanentErrors []error - var rts = td.ResourceSpans() - for i := 0; i < rts.Len(); i++ { - ilts := rts.At(i).ScopeSpans() - for j := 0; j < ilts.Len(); j++ { - var err error - var newPermanentErrors []error - - bufState.resource, bufState.library = i, j - newPermanentErrors, err = c.pushTracesData(ctx, rts, bufState, headers) - - if err != nil { - return consumererror.NewTraces(err, subTraces(td, bufState)) + for { + bufState.reset() + latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, bufState, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if bufState.containsData() { + if err := c.postEvents(ctx, bufState, headers); err != nil { + return consumererror.NewTraces(err, subTraces(td, is)) } - - permanentErrors = append(permanentErrors, newPermanentErrors...) } - } - - // There's some leftover unsent traces - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { - return consumererror.NewTraces(err, subTraces(td, bufState)) + if latestIterState.done { + break } + is = latestIterState } return multierr.Combine(permanentErrors...) @@ -450,7 +396,7 @@ func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers } // subLogs returns a subset of logs starting from the state. -func subLogs(src plog.Logs, state *bufferState) plog.Logs { +func subLogs(src plog.Logs, state iterState) plog.Logs { dst := plog.NewLogs() resources := src.ResourceLogs() resourcesSub := dst.ResourceLogs() @@ -492,7 +438,7 @@ func subLogs(src plog.Logs, state *bufferState) plog.Logs { } // subMetrics returns a subset of metrics starting from the state. -func subMetrics(src pmetric.Metrics, state *bufferState) pmetric.Metrics { +func subMetrics(src pmetric.Metrics, state iterState) pmetric.Metrics { dst := pmetric.NewMetrics() resources := src.ResourceMetrics() resourcesSub := dst.ResourceMetrics() @@ -533,7 +479,7 @@ func subMetrics(src pmetric.Metrics, state *bufferState) pmetric.Metrics { return dst } -func subTraces(src ptrace.Traces, state *bufferState) ptrace.Traces { +func subTraces(src ptrace.Traces, state iterState) ptrace.Traces { dst := ptrace.NewTraces() resources := src.ResourceSpans() resourcesSub := dst.ResourceSpans() diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 5ccf6792f5da..e7bbd0e32101 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -1499,7 +1499,7 @@ func TestSubLogs(t *testing.T) { logs := createLogData(2, 2, 3) // Logs subset from leftmost index (resource 0, library 0, record 0). - _0_0_0 := &bufferState{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming + _0_0_0 := iterState{resource: 0, library: 0, record: 0} //revive:disable-line:var-naming got := subLogs(logs, _0_0_0) // Number of logs in subset should equal original logs. @@ -1513,7 +1513,7 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) // Logs subset from some mid index (resource 0, library 1, log 2). - _0_1_2 := &bufferState{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming + _0_1_2 := iterState{resource: 0, library: 1, record: 2} //revive:disable-line:var-naming got = subLogs(logs, _0_1_2) assert.Equal(t, 7, got.LogRecordCount()) @@ -1526,7 +1526,7 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) // Logs subset from rightmost index (resource 1, library 1, log 2). - _1_1_2 := &bufferState{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming + _1_1_2 := iterState{resource: 1, library: 1, record: 2} //revive:disable-line:var-naming got = subLogs(logs, _1_1_2) // Number of logs in subset should be 1. @@ -1537,30 +1537,48 @@ func TestSubLogs(t *testing.T) { assert.Equal(t, "1_1_2", val.AsString()) } -func TestPushLogRecordsBufferCounters(t *testing.T) { +func TestPushLogsPartialSuccess(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.ExportRaw = true - c := client{ - config: cfg, - logger: zap.NewNop(), - hecWorker: &mockHecWorker{}, - bufferStatePool: newBufferStatePool(6, false, 10), - } + cfg.MaxContentLengthLogs = 6 + c := newLogsClient(exportertest.NewNopCreateSettings(), cfg) - logs := plog.NewResourceLogsSlice() - logRecords := logs.AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() - logRecords.AppendEmpty().Body().SetStr("12345") // the first log record should be accepted and sent - logRecords.AppendEmpty().Body().SetStr("12345678") // the second log record should be rejected as it's too big - logRecords.AppendEmpty().Body().SetStr("12345") // the third log record should be just accepted - bs := c.bufferStatePool.get() - defer c.bufferStatePool.put(bs) + // The first request succeeds, the second fails. + httpClient, _ := newTestClientWithPresetResponses([]int{200, 503}, []string{"OK", "NOK"}) + url := &url.URL{Scheme: "http", Host: "splunk"} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(cfg, component.NewDefaultBuildInfo())} - permErrs, err := c.pushLogRecords(context.Background(), logs, bs, nil) - assert.NoError(t, err) - assert.Len(t, permErrs, 1) - assert.ErrorContains(t, permErrs[0], "bytes larger than configured max content length") + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + logRecords.AppendEmpty().Body().SetStr("log-1") // should be successfully sent + logRecords.AppendEmpty().Body().SetStr("log-2-too-big") // should be permanently rejected as it's too big + logRecords.AppendEmpty().Body().SetStr("log-3") // should be rejected and returned to for retry + + err := c.pushLogData(context.Background(), logs) + expectedErr := consumererror.Logs{} + require.ErrorContains(t, err, "503") + require.ErrorAs(t, err, &expectedErr) + require.Equal(t, 1, expectedErr.Data().LogRecordCount()) + assert.Equal(t, "log-3", expectedErr.Data().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().Str()) +} - assert.Equal(t, 2, bs.record, "the buffer counter must be at the latest log record") +func TestPushLogsRetryableFailureMultipleResources(t *testing.T) { + c := newLogsClient(exportertest.NewNopCreateSettings(), NewFactory().CreateDefaultConfig().(*Config)) + + httpClient, _ := newTestClientWithPresetResponses([]int{503}, []string{"NOK"}) + url := &url.URL{Scheme: "http", Host: "splunk"} + c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, component.NewDefaultBuildInfo())} + + logs := plog.NewLogs() + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-1") + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-2") + logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log-3") + + err := c.pushLogData(context.Background(), logs) + expectedErr := consumererror.Logs{} + require.ErrorContains(t, err, "503") + require.ErrorAs(t, err, &expectedErr) + assert.Equal(t, logs, expectedErr.Data()) } // validateCompressedEqual validates that GZipped `got` contains `expected` strings