diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 2362fa772f..056f78662b 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -1181,59 +1181,112 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 otlog.String("end", model.Time(request.End).Time().String()), otlog.String("selector", request.LabelSelector), otlog.String("profile_id", request.Type.ID), + otlog.Object("hints", request.Hints), ) - queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), nil) - if err != nil { - return err - } - - iters, err := SelectMatchingProfiles(ctx, request, queriers) + queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), request.Hints) if err != nil { return err } - // send batches of profiles to client and filter via bidi stream. - selectedProfiles, err := filterProfiles[ - BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest], - *ingestv1.MergeProfilesPprofResponse, - *ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream) - if err != nil { - return err + deduplicationNeeded := true + if request.Hints != nil && request.Hints.Block != nil { + deduplicationNeeded = request.Hints.Block.Deduplication } - result := make([]*profile.Profile, 0, len(queriers)) var lock sync.Mutex + result := make([]*profile.Profile, 0, len(queriers)) g, ctx := errgroup.WithContext(ctx) - for i, querier := range queriers { - i := i - querier := querier - if len(selectedProfiles[i]) == 0 { - continue + + // depending on if new need deduplication or not there are two different code paths. + if !deduplicationNeeded { + // signal the end of the profile streaming by sending an empty response. + sp.LogFields(otlog.String("msg", "no profile streaming as no deduplication needed")) + if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil { + return err } - // Sort profiles for better read locality. - // Merge async the result so we can continue streaming profiles. - g.Go(util.RecoverPanic(func() error { - merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i]))) - if err != nil { - return err + + // in this path we can just merge the profiles from each block and send the result to the client. + for _, querier := range queriers { + querier := querier + g.Go(util.RecoverPanic(func() error { + + iters, err := querier.SelectMatchingProfiles(ctx, request) + if err != nil { + return err + } + defer func() { + iters.Close() + }() + + profiles, err := iter.Slice(iters) + if err != nil { + return err + } + + if len(profiles) == 0 { + return nil + } + + merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(profiles))) + if err != nil { + return err + } + + lock.Lock() + result = append(result, merge) + lock.Unlock() + return nil + })) + } + } else { + // in this path we have to go thorugh every profile and deduplicate them. + iters, err := SelectMatchingProfiles(ctx, request, queriers) + if err != nil { + return err + } + + // send batches of profiles to client and filter via bidi stream. + selectedProfiles, err := filterProfiles[ + BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest], + *ingestv1.MergeProfilesPprofResponse, + *ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream) + if err != nil { + return err + } + + for i, querier := range queriers { + querier := querier + i := i + if len(selectedProfiles[i]) == 0 { + continue } - lock.Lock() - defer lock.Unlock() - result = append(result, merge) - return nil - })) - } + // Sort profiles for better read locality. + // Merge async the result so we can continue streaming profiles. + g.Go(util.RecoverPanic(func() error { + merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i]))) + if err != nil { + return err + } + lock.Lock() + result = append(result, merge) + lock.Unlock() + return nil + })) + } - // Signals the end of the profile streaming by sending an empty response. - // This allows the client to not block other streaming ingesters. - if err := stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil { - return err + // Signals the end of the profile streaming by sending an empty response. + // This allows the client to not block other streaming ingesters. + sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming")) + if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil { + return err + } } - if err := g.Wait(); err != nil { + if err = g.Wait(); err != nil { return err } + if len(result) == 0 { result = append(result, &profile.Profile{}) } diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index 30593fb947..a0bb9ca09c 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "golang.org/x/sync/errgroup" + googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" @@ -138,6 +139,62 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se return selectMergeTree(gCtx, responses) } +func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan blockPlan) (*googlev1.Profile, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectProfile Ingesters") + defer sp.Finish() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + _, err = parser.ParseMetricSelector(req.LabelSelector) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof] + if plan != nil { + responses, err = forAllPlannedIngesters(ctx, q.ingesterQuerier, plan, func(ctx context.Context, ic IngesterQueryClient, hints *ingestv1.Hints) (clientpool.BidiClientMergeProfilesPprof, error) { + return ic.MergeProfilesPprof(ctx), nil + }) + } else { + responses, err = forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) { + return ic.MergeProfilesPprof(ctx), nil + }) + } + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + // send the first initial request to all ingesters. + g, gCtx := errgroup.WithContext(ctx) + for idx := range responses { + r := responses[idx] + hints, ok := plan[r.addr] + if !ok && plan != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr)) + } + + g.Go(util.RecoverPanic(func() error { + return r.response.Send(&ingestv1.MergeProfilesPprofRequest{ + Request: &ingestv1.SelectProfilesRequest{ + LabelSelector: req.LabelSelector, + Start: req.Start, + End: req.End, + Type: profileType, + Hints: &ingestv1.Hints{Block: hints}, + }, + }) + })) + } + if err = g.Wait(); err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + // merge all profiles + return selectMergePprofProfile(gCtx, profileType, responses) +} + func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters") defer sp.Finish() diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b49fdb8fc5..0f87fe4bc4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -33,7 +33,7 @@ import ( "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" - "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/util/math" "github.com/grafana/pyroscope/pkg/util/spanlogger" ) @@ -678,6 +678,15 @@ func (sq storeQuery) MergeSpanProfileRequest(req *querierv1.SelectMergeSpanProfi } } +func (sq storeQuery) MergeProfileRequest(req *querierv1.SelectMergeProfileRequest) *querierv1.SelectMergeProfileRequest { + return &querierv1.SelectMergeProfileRequest{ + ProfileTypeID: req.ProfileTypeID, + LabelSelector: req.LabelSelector, + Start: int64(sq.start), + End: int64(sq.end), + } +} + func (sq storeQuery) SeriesRequest(req *querierv1.SeriesRequest) *ingestv1.SeriesRequest { return &ingestv1.SeriesRequest{ Start: int64(sq.start), @@ -741,50 +750,75 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q sp.Finish() }() - profileType, err := phlaremodel.ParseProfileTypeSelector(req.Msg.ProfileTypeID) + profile, err := q.selectProfile(ctx, req.Msg) if err != nil { - return nil, connect.NewError(connect.CodeInvalidArgument, err) + return nil, err } - _, err = parser.ParseMetricSelector(req.Msg.LabelSelector) - if err != nil { - return nil, connect.NewError(connect.CodeInvalidArgument, err) + profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano() + profile.TimeNanos = model.Time(req.Msg.End).UnixNano() + return connect.NewResponse(profile), nil +} + +func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeProfileRequest) (*googlev1.Profile, error) { + // determine the block hints + plan, err := q.blockSelect(ctx, model.Time(req.Start), model.Time(req.End)) + if isEndpointNotExistingErr(err) { + level.Warn(spanlogger.FromContext(ctx, q.logger)).Log( + "msg", "block select not supported on at least one component, fallback to use full dataset", + "err", err, + ) + plan = nil + } else if err != nil { + return nil, fmt.Errorf("error during block select: %w", err) } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) { - return ic.MergeProfilesPprof(ctx), nil - }) - if err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + // no store gateways configured so just query the ingesters + if q.storeGatewayQuerier == nil { + return q.selectProfileFromIngesters(ctx, req, plan) } - // send the first initial request to all ingesters. - g, gCtx := errgroup.WithContext(ctx) - for _, r := range responses { - r := r - g.Go(util.RecoverPanic(func() error { - return r.response.Send(&ingestv1.MergeProfilesPprofRequest{ - Request: &ingestv1.SelectProfilesRequest{ - LabelSelector: req.Msg.LabelSelector, - Start: req.Msg.Start, - End: req.Msg.End, - Type: profileType, - }, - }) - })) + + storeQueries := splitQueryToStores(model.Time(req.Start), model.Time(req.End), model.Now(), q.cfg.QueryStoreAfter, plan) + if !storeQueries.ingester.shouldQuery && !storeQueries.storeGateway.shouldQuery { + return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("start and end time are outside of the ingester and store gateway retention")) } - if err := g.Wait(); err != nil { - return nil, connect.NewError(connect.CodeInternal, err) + + storeQueries.Log(level.Debug(spanlogger.FromContext(ctx, q.logger))) + + if plan == nil && !storeQueries.ingester.shouldQuery { + return q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan) + } + if plan == nil && !storeQueries.storeGateway.shouldQuery { + return q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan) } - // merge all profiles - profile, err := selectMergePprofProfile(gCtx, profileType, responses) - if err != nil { + g, ctx := errgroup.WithContext(ctx) + var lock sync.Mutex + var merge pprof.ProfileMerge + g.Go(func() error { + ingesterProfile, err := q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan) + if err != nil { + return err + } + lock.Lock() + err = merge.Merge(ingesterProfile) + lock.Unlock() + return err + }) + g.Go(func() error { + storegatewayProfile, err := q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan) + if err != nil { + return err + } + lock.Lock() + err = merge.Merge(storegatewayProfile) + lock.Unlock() + return err + }) + if err := g.Wait(); err != nil { return nil, err } - profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano() - profile.TimeNanos = model.Time(req.Msg.End).UnixNano() - return connect.NewResponse(profile), nil + + return merge.Profile(), nil } func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 27b28da546..eaddacb7c6 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -20,12 +20,14 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/clientpool" "github.com/grafana/pyroscope/pkg/iter" phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/pprof" pprofth "github.com/grafana/pyroscope/pkg/pprof/testhelper" "github.com/grafana/pyroscope/pkg/testhelper" ) @@ -321,116 +323,136 @@ func Test_SelectMergeStacktraces(t *testing.T) { } func Test_SelectMergeProfiles(t *testing.T) { - req := connect.NewRequest(&querierv1.SelectMergeProfileRequest{ - LabelSelector: `{app="foo"}`, - ProfileTypeID: "memory:inuse_space:bytes:space:byte", - Start: 0, - End: 2, - }) - bidi1 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ - { - LabelsSets: []*typesv1.Labels{ - { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, - }, - { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, - }, - }, - Profiles: []*ingestv1.SeriesProfile{ - {Timestamp: 1, LabelIndex: 0}, - {Timestamp: 2, LabelIndex: 1}, - {Timestamp: 2, LabelIndex: 0}, - }, - }, - }) - bidi2 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ - { - LabelsSets: []*typesv1.Labels{ - { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, - }, + for _, tc := range []struct { + blockSelect bool + name string + }{ + // This tests the interoberabitlity between older ingesters and new queriers + {false, "WithoutBlockHints"}, + {true, "WithBlockHints"}, + } { + t.Run(tc.name, func(t *testing.T) { + + req := connect.NewRequest(&querierv1.SelectMergeProfileRequest{ + LabelSelector: `{app="foo"}`, + ProfileTypeID: "memory:inuse_space:bytes:space:byte", + Start: 0, + End: 2, + }) + bidi1 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, + LabelsSets: []*typesv1.Labels{ + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, + }, + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, + }, + }, + Profiles: []*ingestv1.SeriesProfile{ + {Timestamp: 1, LabelIndex: 0}, + {Timestamp: 2, LabelIndex: 1}, + {Timestamp: 2, LabelIndex: 0}, + }, }, - }, - Profiles: []*ingestv1.SeriesProfile{ - {Timestamp: 1, LabelIndex: 1}, - {Timestamp: 1, LabelIndex: 0}, - {Timestamp: 2, LabelIndex: 1}, - }, - }, - }) - bidi3 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ - { - LabelsSets: []*typesv1.Labels{ + }) + bidi2 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, + LabelsSets: []*typesv1.Labels{ + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, + }, + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, + }, + }, + Profiles: []*ingestv1.SeriesProfile{ + {Timestamp: 1, LabelIndex: 1}, + {Timestamp: 1, LabelIndex: 0}, + {Timestamp: 2, LabelIndex: 1}, + }, }, + }) + bidi3 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ { - Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, + LabelsSets: []*typesv1.Labels{ + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}, + }, + { + Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}, + }, + }, + Profiles: []*ingestv1.SeriesProfile{ + {Timestamp: 1, LabelIndex: 1}, + {Timestamp: 1, LabelIndex: 0}, + {Timestamp: 2, LabelIndex: 0}, + }, }, - }, - Profiles: []*ingestv1.SeriesProfile{ - {Timestamp: 1, LabelIndex: 1}, - {Timestamp: 1, LabelIndex: 0}, - {Timestamp: 2, LabelIndex: 0}, - }, - }, - }) - querier, err := New(Config{ - PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, - }, testhelper.NewMockRing([]ring.InstanceDesc{ - {Addr: "1"}, - {Addr: "2"}, - {Addr: "3"}, - }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { - q := newFakeQuerier() - switch addr { - case "1": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi1) - case "2": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi2) - case "3": - q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi3) - } - return q, nil - }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) - require.NoError(t, err) - res, err := querier.SelectMergeProfile(context.Background(), req) - require.NoError(t, err) - require.NotNil(t, res) - data, err := proto.Marshal(res.Msg) - require.NoError(t, err) - actual, err := profile.ParseUncompressed(data) - require.NoError(t, err) + }) + querier, err := New(Config{ + PoolConfig: clientpool.PoolConfig{ClientCleanupPeriod: 1 * time.Millisecond}, + }, testhelper.NewMockRing([]ring.InstanceDesc{ + {Addr: "1"}, + {Addr: "2"}, + {Addr: "3"}, + }, 3), &poolFactory{f: func(addr string) (client.PoolClient, error) { + q := newFakeQuerier() + switch addr { + case "1": + q.mockMergeProfile(bidi1, []string{"a", "d"}, tc.blockSelect) + case "2": + q.mockMergeProfile(bidi2, []string{"b", "d"}, tc.blockSelect) + case "3": + q.mockMergeProfile(bidi3, []string{"c", "d"}, tc.blockSelect) + } + switch addr { + case "1": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi1) + case "2": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi2) + case "3": + q.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi3) + } + return q, nil + }}, nil, nil, log.NewLogfmtLogger(os.Stdout)) + require.NoError(t, err) + res, err := querier.SelectMergeProfile(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, res) + data, err := proto.Marshal(res.Msg) + require.NoError(t, err) + actual, err := profile.ParseUncompressed(data) + require.NoError(t, err) - expected := pprofth.FooBarProfile.Copy() - expected.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano() - expected.TimeNanos = model.Time(req.Msg.End).UnixNano() - for _, s := range expected.Sample { - s.Value[0] = s.Value[0] * 2 + expected := pprofth.FooBarProfile.Copy() + expected.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano() + expected.TimeNanos = model.Time(req.Msg.End).UnixNano() + for _, s := range expected.Sample { + s.Value[0] = s.Value[0] * 2 + } + require.Equal(t, actual, expected) + + var selected []testProfile + selected = append(selected, bidi1.kept...) + selected = append(selected, bidi2.kept...) + selected = append(selected, bidi3.kept...) + sort.Slice(selected, func(i, j int) bool { + if selected[i].Ts == selected[j].Ts { + return phlaremodel.CompareLabelPairs(selected[i].Labels.Labels, selected[j].Labels.Labels) < 0 + } + return selected[i].Ts < selected[j].Ts + }) + require.Len(t, selected, 4) + require.Equal(t, + []testProfile{ + {Ts: 1, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}}}, + {Ts: 1, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}}}, + {Ts: 2, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}}}, + {Ts: 2, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}}}, + }, selected) + }) } - require.Equal(t, actual, expected) - - var selected []testProfile - selected = append(selected, bidi1.kept...) - selected = append(selected, bidi2.kept...) - selected = append(selected, bidi3.kept...) - sort.Slice(selected, func(i, j int) bool { - if selected[i].Ts == selected[j].Ts { - return phlaremodel.CompareLabelPairs(selected[i].Labels.Labels, selected[j].Labels.Labels) < 0 - } - return selected[i].Ts < selected[j].Ts - }) - require.Len(t, selected, 4) - require.Equal(t, - []testProfile{ - {Ts: 1, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}}}, - {Ts: 1, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}}}, - {Ts: 2, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "bar"}}}}, - {Ts: 2, Labels: &typesv1.Labels{Labels: []*typesv1.LabelPair{{Name: "app", Value: "foo"}}}}, - }, selected) } func TestSelectSeries(t *testing.T) { @@ -576,6 +598,15 @@ func (f *fakeQuerierIngester) mockMergeLabels(bidi *fakeBidiClientSeries, blocks f.On("MergeProfilesLabels", mock.Anything).Once().Return(bidi) } +func (f *fakeQuerierIngester) mockMergeProfile(bidi *fakeBidiClientProfiles, blocks []string, blockSelect bool) { + if blockSelect { + f.On("BlockMetadata", mock.Anything, mock.Anything).Once().Return(newBlockMeta(blocks...), nil) + } else { + f.On("BlockMetadata", mock.Anything, mock.Anything).Once().Return(nil, endpointNotExistingErr) + } + f.On("MergeProfilesPprof", mock.Anything).Once().Return(bidi) +} + func (f *fakeQuerierIngester) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) { var ( args = f.Called(ctx, req) @@ -781,6 +812,16 @@ func (f *fakeBidiClientProfiles) Receive() (*ingestv1.MergeProfilesPprofResponse func (f *fakeBidiClientProfiles) CloseRequest() error { return nil } func (f *fakeBidiClientProfiles) CloseResponse() error { return nil } +func requireFakeMergeProfilesPprof(t *testing.T, n int64, r *profilev1.Profile) { + x, err := pprof.FromProfile(pprofth.FooBarProfile) + for _, s := range x.Sample { + s.Value[0] *= n + } + x.DurationNanos *= n + require.NoError(t, err) + require.Equal(t, x, r) +} + type fakeBidiClientSeries struct { profiles chan *ingestv1.ProfileSets batches []*ingestv1.ProfileSets diff --git a/pkg/querier/select_merge.go b/pkg/querier/select_merge.go index ca071944df..4cabb3cc46 100644 --- a/pkg/querier/select_merge.go +++ b/pkg/querier/select_merge.go @@ -389,6 +389,7 @@ func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, respo if err != nil || result == nil { return err } + // TODO(kolesnikovae): Use pprof proto. p, err := profile.ParseUncompressed(result) if err != nil { return err diff --git a/pkg/querier/select_merge_test.go b/pkg/querier/select_merge_test.go index 13fcd40bb1..53808734ea 100644 --- a/pkg/querier/select_merge_test.go +++ b/pkg/querier/select_merge_test.go @@ -212,6 +212,101 @@ func TestSelectMergeByLabels(t *testing.T) { }, values) } +func TestSelectMergePprof(t *testing.T) { + resp1 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 1}, + {LabelIndex: 0, Timestamp: 2}, + {LabelIndex: 0, Timestamp: 4}, + }, + }, + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 5}, + {LabelIndex: 0, Timestamp: 6}, + }, + }, + }) + resp2 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 2}, + {LabelIndex: 0, Timestamp: 3}, + {LabelIndex: 0, Timestamp: 4}, + }, + }, + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 5}, + {LabelIndex: 0, Timestamp: 6}, + }, + }, + }) + resp3 := newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 3}, + {LabelIndex: 0, Timestamp: 5}, + }, + }, + }) + res, err := selectMergePprofProfile(context.Background(), &typesv1.ProfileType{}, []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof]{ + { + response: resp1, + }, + { + response: resp2, + }, + { + response: resp3, + }, + }) + require.NoError(t, err) + requireFakeMergeProfilesPprof(t, 3, res) + all := []testProfile{} + all = append(all, resp1.kept...) + all = append(all, resp2.kept...) + all = append(all, resp3.kept...) + sort.Slice(all, func(i, j int) bool { return all[i].Ts < all[j].Ts }) + testhelper.EqualProto(t, all, []testProfile{ + {Ts: 1, Labels: &typesv1.Labels{Labels: foobarlabels}}, + {Ts: 2, Labels: &typesv1.Labels{Labels: foobarlabels}}, + {Ts: 3, Labels: &typesv1.Labels{Labels: foobarlabels}}, + {Ts: 4, Labels: &typesv1.Labels{Labels: foobarlabels}}, + {Ts: 5, Labels: &typesv1.Labels{Labels: foobarlabels}}, + {Ts: 6, Labels: &typesv1.Labels{Labels: foobarlabels}}, + }) + res, err = selectMergePprofProfile(context.Background(), &typesv1.ProfileType{}, []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof]{ + { + response: newFakeBidiClientProfiles([]*ingestv1.ProfileSets{ + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 1}, + {LabelIndex: 0, Timestamp: 2}, + {LabelIndex: 0, Timestamp: 4}, + }, + }, + { + LabelsSets: []*typesv1.Labels{{Labels: foobarlabels}}, + Profiles: []*ingestv1.SeriesProfile{ + {LabelIndex: 0, Timestamp: 5}, + {LabelIndex: 0, Timestamp: 6}, + }, + }, + }), + }, + }) + require.NoError(t, err) + requireFakeMergeProfilesPprof(t, 1, res) +} + func BenchmarkSelectMergeStacktraces(b *testing.B) { rf := 3 clientsCount := 20 diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go index aa6057ee62..e3e9add2ef 100644 --- a/pkg/querier/store_gateway_querier.go +++ b/pkg/querier/store_gateway_querier.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "golang.org/x/sync/errgroup" + googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1" @@ -235,6 +236,65 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1 return selectMergeTree(gCtx, responses) } +func (q *Querier) selectProfileFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan map[string]*ingestv1.BlockHints) (*googlev1.Profile, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectProfile StoreGateway") + defer sp.Finish() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + _, err = parser.ParseMetricSelector(req.LabelSelector) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + tenantID, err := tenant.ExtractTenantIDFromContext(ctx) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof] + if plan != nil { + responses, err = forAllPlannedStoreGateways(ctx, tenantID, q.storeGatewayQuerier, plan, func(ctx context.Context, ic StoreGatewayQueryClient, hints *ingestv1.Hints) (clientpool.BidiClientMergeProfilesPprof, error) { + return ic.MergeProfilesPprof(ctx), nil + }) + } else { + responses, err = forAllStoreGateways(ctx, tenantID, q.storeGatewayQuerier, func(ctx context.Context, ic StoreGatewayQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) { + return ic.MergeProfilesPprof(ctx), nil + }) + } + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + // send the first initial request to all ingesters. + g, gCtx := errgroup.WithContext(ctx) + for _, r := range responses { + r := r + hints, ok := plan[r.addr] + if !ok && plan != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr)) + } + g.Go(util.RecoverPanic(func() error { + return r.response.Send(&ingestv1.MergeProfilesPprofRequest{ + Request: &ingestv1.SelectProfilesRequest{ + LabelSelector: req.LabelSelector, + Start: req.Start, + End: req.End, + Type: profileType, + Hints: &ingestv1.Hints{Block: hints}, + }, + }) + })) + } + if err = g.Wait(); err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + // merge all profiles + return selectMergePprofProfile(gCtx, profileType, responses) +} + func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest, plan map[string]*ingestv1.BlockHints) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries StoreGateway") defer sp.Finish()