diff --git a/CHANGELOG.md b/CHANGELOG.md index c0aa5e5579..980959d992 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use _breaking :warning:_ to mark changes that are not backward compatible (relates only to v0.y.z releases.) -## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.1) - 2021.03.09 +## [v0.19.0-rc.1](https://github.com/thanos-io/thanos/releases/tag/v0.19.0-rc.2) - 2021.03.22 ### Added @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients - [#3795](https://github.com/thanos-io/thanos/pull/3795) s3: A truncated "get object" response is reported as error. - [#3899](https://github.com/thanos-io/thanos/pull/3899) Receive: Correct the inference of client gRPC configuration. +- [#3943](https://github.com/thanos-io/thanos/pull/3943): Receive: Fixed memory regression introduced in v0.17.0. ### Changed diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 0337fe495f..693bd74336 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -277,6 +277,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { span, ctx := tracing.StartSpan(r.Context(), "receive_http") defer span.Finish() + // TODO(bwplotka): Optimize readAll https://github.com/thanos-io/thanos/pull/3334/files. compressed, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -290,6 +291,9 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } + // NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory + // from the whole request. Ensure that we always copy those when we want to + // store them for longer time. var wreq prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &wreq); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -310,7 +314,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tenant = h.options.DefaultTenantID } - // exit early if the request contained no data + // Exit early if the request contained no data. if len(wreq.Timeseries) == 0 { level.Info(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant) return diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 70637d49fd..8e4287ffd5 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -7,9 +7,16 @@ import ( "bytes" "context" "fmt" + "io/ioutil" + "math" "math/rand" "net/http" "net/http/httptest" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "strings" "sync" "testing" "time" @@ -18,9 +25,12 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/runutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -175,13 +185,11 @@ func TestDetermineWriteErrorCause(t *testing.T) { } } -func newHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { - cfg := []HashringConfig{ - { - Hashring: "test", - }, - } - var handlers []*Handler +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { + var ( + cfg = []HashringConfig{{Hashring: "test"}} + handlers []*Handler + ) // create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers // This removes the network from the tests and creates a more consistent testing harness. peers := &peerGroup{ @@ -511,7 +519,7 @@ func TestReceiveQuorum(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -850,7 +858,7 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { // to see all requests completing all the time, since we're using local // network we are not expecting anything to go wrong with these. t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -957,3 +965,233 @@ type fakeRemoteWriteGRPCServer struct { func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { return f.h.RemoteWrite(ctx, in) } + +func BenchmarkHandlerReceiveHTTP(b *testing.B) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b)) +} + +func TestHandlerReceiveHTTP(t *testing.T) { + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) +} + +// tsOverrideTenantStorage is storage that overrides timestamp to make it have consistent interval. +type tsOverrideTenantStorage struct { + TenantStorage + + interval int64 +} + +func (s *tsOverrideTenantStorage) TenantAppendable(tenant string) (Appendable, error) { + a, err := s.TenantStorage.TenantAppendable(tenant) + return &tsOverrideAppendable{Appendable: a, interval: s.interval}, err +} + +type tsOverrideAppendable struct { + Appendable + + interval int64 +} + +func (a *tsOverrideAppendable) Appender(ctx context.Context) (storage.Appender, error) { + ret, err := a.Appendable.Appender(ctx) + return &tsOverrideAppender{Appender: ret, interval: a.interval}, err +} + +type tsOverrideAppender struct { + storage.Appender + + interval int64 +} + +var cnt int64 + +func (a *tsOverrideAppender) Add(l labels.Labels, _ int64, v float64) (uint64, error) { + cnt += a.interval + return a.Appender.Add(l, cnt, v) +} + +func (a *tsOverrideAppender) AddFast(ref uint64, _ int64, v float64) error { + cnt += a.interval + return a.Appender.AddFast(ref, cnt, v) +} + +// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would +// be send to Thanos receive. +// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it. +func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byte { + r := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(series))} + + for _, s := range series { + r.Timeseries = append(r.Timeseries, prompb.TimeSeries{ + Labels: s, + // Timestamp does not matter, it will be overridden. + Samples: []prompb.Sample{{Value: math.MaxFloat64, Timestamp: math.MinInt64}}, + }) + } + body, err := proto.Marshal(r) + testutil.Ok(t, err) + return snappy.Encode(nil, body) +} + +func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { + dir, err := ioutil.TempDir("", "test_receive") + testutil.Ok(b, err) + defer func() { testutil.Ok(b, os.RemoveAll(dir)) }() + + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handler := handlers[0] + + reg := prometheus.NewRegistry() + + logger := log.NewNopLogger() + m := NewMultiTSDB( + dir, logger, reg, &tsdb.Options{ + MinBlockDuration: int64(2 * time.Hour / time.Millisecond), + MaxBlockDuration: int64(2 * time.Hour / time.Millisecond), + RetentionDuration: int64(6 * time.Hour / time.Millisecond), + NoLockfile: true, + StripeSize: 1, // Disable stripe pre allocation so we can have clear profiles. + }, + labels.FromStrings("replica", "01"), + "tenant_id", + nil, + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(b, m.Close()) }() + handler.writer = NewWriter(logger, m) + + testutil.Ok(b, m.Flush()) + testutil.Ok(b, m.Open()) + + for _, tcase := range []struct { + name string + writeRequest []byte + }{ + { + name: "typical labels under 1KB, 500 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 500) + for s := 0; s < len(series); s++ { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + series[s] = lbls + } + return series + }()), + }, + { + name: "typical labels under 1KB, 5000 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 5000) + for s := 0; s < len(series); s++ { + lbls := make([]labelpb.ZLabel, 10) + for i := 0; i < len(lbls); i++ { + // Label ~20B name, 50B value. + lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)} + } + series[s] = lbls + } + return series + }()), + }, + { + name: "extremely large label value 10MB, 10 of them", + writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + series := make([][]labelpb.ZLabel, 10) + for s := 0; s < len(series); s++ { + lbl := &strings.Builder{} + lbl.Grow(1024 * 1024 * 10) // 10MB. + word := "abcdefghij" + for i := 0; i < lbl.Cap()/len(word); i++ { + _, _ = lbl.WriteString(word) + } + series[s] = []labelpb.ZLabel{{Name: "__name__", Value: lbl.String()}} + } + return series + }()), + }, + } { + b.Run(tcase.name, func(b testutil.TB) { + handler.options.DefaultTenantID = fmt.Sprintf("%v-ok", tcase.name) + handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1} + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + b.Run("OK", func(b testutil.TB) { + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + } + }) + + handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name) + handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error. + + // It takes time to create new tenant, wait for it. + { + app, err := m.TenantAppendable(handler.options.DefaultTenantID) + testutil.Ok(b, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error { + _, err = app.Appender(ctx) + return err + })) + } + + // First request should be fine, since we don't change timestamp, rest is wrong. + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) + + b.Run("conflict errors", func(b testutil.TB) { + n := b.N() + b.ResetTimer() + for i := 0; i < n; i++ { + r := httptest.NewRecorder() + handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: ioutil.NopCloser(bytes.NewReader(tcase.writeRequest))}) + testutil.Equals(b, http.StatusConflict, r.Code, "%v", i) + } + }) + }) + } + + runtime.GC() + // Take snapshot at the end to reveal how much memory we keep in TSDB. + testutil.Ok(b, Heap("../../")) + +} + +func Heap(dir string) (err error) { + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return err + } + + f, err := os.Create(filepath.Join(dir, "mem.pprof")) + if err != nil { + return err + } + defer runutil.CloseWithErrCapture(&err, f, "close") + return pprof.WriteHeapProfile(f) +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 8fe1cfe68c..0256040262 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -61,13 +62,12 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var errs errutil.MultiError for _, t := range wreq.Timeseries { - lset := make(labels.Labels, len(t.Labels)) - for j := range t.Labels { - lset[j] = labels.Label{ - Name: t.Labels[j].Name, - Value: t.Labels[j].Value, - } - } + // Copy labels so we allocate memory only for labels, nothing else. + labelpb.ReAllocZLabelsStrings(&t.Labels) + + // TODO(bwplotka): Use improvement https://github.com/prometheus/prometheus/pull/8600, so we do that only when + // we need it (when we store labels for longer). + lset := labelpb.ZLabelsToPromLabels(t.Labels) // Append as many valid samples as possible, but keep track of the errors. for _, s := range t.Samples { diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 5712cd912f..99083f82ab 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -34,10 +34,21 @@ func ZLabelsFromPromLabels(lset labels.Labels) []ZLabel { // ZLabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner. // It reuses the same memory. Caller should abort using passed []ZLabel. +// NOTE: Use with care. ZLabels holds memory from the whole protobuf unmarshal, so the returned +// Prometheus Labels will hold this memory as well. func ZLabelsToPromLabels(lset []ZLabel) labels.Labels { return *(*labels.Labels)(unsafe.Pointer(&lset)) } +// ReAllocZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool. +func ReAllocZLabelsStrings(lset *[]ZLabel) { + for j, l := range *lset { + // NOTE: This trick converts from string to byte without copy, but copy when creating string. + (*lset)[j].Name = string(noAllocBytes(l.Name)) + (*lset)[j].Value = string(noAllocBytes(l.Value)) + } +} + // LabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner. // It reuses the same memory. Caller should abort using passed labels.Labels. func LabelsFromPromLabels(lset labels.Labels) []Label { @@ -61,7 +72,7 @@ func ZLabelSetsToPromLabelSets(lss ...ZLabelSet) []labels.Labels { // ZLabel is a Label (also easily transformable to Prometheus labels.Labels) that can be unmarshalled from protobuf // reusing the same memory address for string bytes. -// NOTE: While unmarshal use exactly same bytes that were allocated for protobuf, this will mean that *whole* protobuf +// NOTE: While unmarshalling it uses exactly same bytes that were allocated for protobuf. This mean that *whole* protobuf // bytes will be not GC-ed as long as ZLabels are referenced somewhere. Use it carefully, only for short living // protobuf message processing. type ZLabel Label diff --git a/pkg/store/labelpb/label_test.go b/pkg/store/labelpb/label_test.go index a6227dfd27..ee80b5ad47 100644 --- a/pkg/store/labelpb/label_test.go +++ b/pkg/store/labelpb/label_test.go @@ -174,6 +174,41 @@ func BenchmarkZLabelsMarshalUnmarshal(b *testing.B) { }) } +var ret labels.Labels + +func BenchmarkTransformWithAndWithoutCopy(b *testing.B) { + const ( + fmtLbl = "%07daaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" + num = 1000000 + ) + + b.Run("ZLabelsToPromLabels", func(b *testing.B) { + b.ReportAllocs() + lbls := make([]ZLabel, num) + for i := 0; i < num; i++ { + lbls[i] = ZLabel{Name: fmt.Sprintf(fmtLbl, i), Value: fmt.Sprintf(fmtLbl, i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ret = ZLabelsToPromLabels(lbls) + } + }) + b.Run("ZLabelsToPromLabelsWithRealloc", func(b *testing.B) { + b.ReportAllocs() + lbls := make([]ZLabel, num) + for i := 0; i < num; i++ { + lbls[i] = ZLabel{Name: fmt.Sprintf(fmtLbl, i), Value: fmt.Sprintf(fmtLbl, i)} + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ReAllocZLabelsStrings(&lbls) + ret = ZLabelsToPromLabels(lbls) + } + }) +} + func TestSortZLabelSets(t *testing.T) { expectedResult := ZLabelSets{ {