Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: query pprof from store-gateway #2694

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 90 additions & 36 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile

for i, querier := range queriers {
if skipBlock(querier.BlockID()) {
iters[i] = iter.NewEmptyIterator[Profile]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ smart

continue
}
i := i
Expand Down Expand Up @@ -1138,59 +1139,112 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
otlog.Object("hints", request.Hints),
)

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), nil)
if err != nil {
return err
}

iters, err := SelectMatchingProfiles(ctx, request, queriers)
queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End), request.Hints)
if err != nil {
return err
}

// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest],
*ingestv1.MergeProfilesPprofResponse,
*ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
deduplicationNeeded := true
if request.Hints != nil && request.Hints.Block != nil {
deduplicationNeeded = request.Hints.Block.Deduplication
}

result := make([]*profile.Profile, 0, len(queriers))
var lock sync.Mutex
result := make([]*profile.Profile, 0, len(queriers))
g, ctx := errgroup.WithContext(ctx)
for i, querier := range queriers {
i := i
querier := querier
if len(selectedProfiles[i]) == 0 {
continue

// depending on if new need deduplication or not there are two different code paths.
if !deduplicationNeeded {
// signal the end of the profile streaming by sending an empty response.
sp.LogFields(otlog.String("msg", "no profile streaming as no deduplication needed"))
if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
}
// Sort profiles for better read locality.
// Merge async the result so we can continue streaming profiles.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
if err != nil {
return err

// in this path we can just merge the profiles from each block and send the result to the client.
for _, querier := range queriers {
querier := querier
g.Go(util.RecoverPanic(func() error {

iters, err := querier.SelectMatchingProfiles(ctx, request)
if err != nil {
return err
}
defer func() {
iters.Close()
}()

profiles, err := iter.Slice(iters)
if err != nil {
return err
}

if len(profiles) == 0 {
return nil
}

merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(profiles)))
if err != nil {
return err
}

lock.Lock()
result = append(result, merge)
lock.Unlock()
return nil
}))
}
} else {
// in this path we have to go thorugh every profile and deduplicate them.
iters, err := SelectMatchingProfiles(ctx, request, queriers)
if err != nil {
return err
}

// send batches of profiles to client and filter via bidi stream.
selectedProfiles, err := filterProfiles[
BidiServerMerge[*ingestv1.MergeProfilesPprofResponse, *ingestv1.MergeProfilesPprofRequest],
*ingestv1.MergeProfilesPprofResponse,
*ingestv1.MergeProfilesPprofRequest](ctx, iters, defaultBatchSize, stream)
if err != nil {
return err
}

for i, querier := range queriers {
querier := querier
i := i
if len(selectedProfiles[i]) == 0 {
continue
}
lock.Lock()
defer lock.Unlock()
result = append(result, merge)
return nil
}))
}
// Sort profiles for better read locality.
// Merge async the result so we can continue streaming profiles.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
if err != nil {
return err
}
lock.Lock()
result = append(result, merge)
lock.Unlock()
return nil
}))
}

// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
if err := stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
if err = stream.Send(&ingestv1.MergeProfilesPprofResponse{}); err != nil {
return err
}
}

if err := g.Wait(); err != nil {
if err = g.Wait(); err != nil {
return err
}

if len(result) == 0 {
result = append(result, &profile.Profile{})
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"golang.org/x/sync/errgroup"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
Expand Down Expand Up @@ -138,6 +139,62 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se
return selectMergeTree(gCtx, responses)
}

func (q *Querier) selectProfileFromIngesters(ctx context.Context, req *querierv1.SelectMergeProfileRequest, plan blockPlan) (*googlev1.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectProfile Ingesters")
defer sp.Finish()
profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
_, err = parser.ParseMetricSelector(req.LabelSelector)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesPprof]
if plan != nil {
responses, err = forAllPlannedIngesters(ctx, q.ingesterQuerier, plan, func(ctx context.Context, ic IngesterQueryClient, hints *ingestv1.Hints) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
} else {
responses, err = forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
}
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
// send the first initial request to all ingesters.
g, gCtx := errgroup.WithContext(ctx)
for idx := range responses {
r := responses[idx]
hints, ok := plan[r.addr]
if !ok && plan != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("no hints found for replica %s", r.addr))
}

g.Go(util.RecoverPanic(func() error {
return r.response.Send(&ingestv1.MergeProfilesPprofRequest{
Request: &ingestv1.SelectProfilesRequest{
LabelSelector: req.LabelSelector,
Start: req.Start,
End: req.End,
Type: profileType,
Hints: &ingestv1.Hints{Block: hints},
},
})
}))
}
if err = g.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

// merge all profiles
return selectMergePprofProfile(gCtx, profileType, responses)
}

func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters")
defer sp.Finish()
Expand Down
104 changes: 69 additions & 35 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/grafana/pyroscope/pkg/clientpool"
"github.com/grafana/pyroscope/pkg/iter"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/util"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/util/math"
"github.com/grafana/pyroscope/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -689,6 +689,15 @@ func (sq storeQuery) MergeSpanProfileRequest(req *querierv1.SelectMergeSpanProfi
}
}

func (sq storeQuery) MergeProfileRequest(req *querierv1.SelectMergeProfileRequest) *querierv1.SelectMergeProfileRequest {
return &querierv1.SelectMergeProfileRequest{
ProfileTypeID: req.ProfileTypeID,
LabelSelector: req.LabelSelector,
Start: int64(sq.start),
End: int64(sq.end),
}
}

type storeQueries struct {
ingester, storeGateway storeQuery
queryStoreAfter time.Duration
Expand Down Expand Up @@ -743,50 +752,75 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q
sp.Finish()
}()

profileType, err := phlaremodel.ParseProfileTypeSelector(req.Msg.ProfileTypeID)
profile, err := q.selectProfile(ctx, req.Msg)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
return nil, err
}
_, err = parser.ParseMetricSelector(req.Msg.LabelSelector)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()
profile.TimeNanos = model.Time(req.Msg.End).UnixNano()
return connect.NewResponse(profile), nil
}

func (q *Querier) selectProfile(ctx context.Context, req *querierv1.SelectMergeProfileRequest) (*googlev1.Profile, error) {
// determine the block hints
plan, err := q.blockSelect(ctx, model.Time(req.Start), model.Time(req.End))
if isEndpointNotExistingErr(err) {
level.Warn(spanlogger.FromContext(ctx, q.logger)).Log(
"msg", "block select not supported on at least one component, fallback to use full dataset",
"err", err,
)
plan = nil
} else if err != nil {
return nil, fmt.Errorf("error during block select: %w", err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesPprof, error) {
return ic.MergeProfilesPprof(ctx), nil
})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
// no store gateways configured so just query the ingesters
if q.storeGatewayQuerier == nil {
return q.selectProfileFromIngesters(ctx, req, plan)
}
// send the first initial request to all ingesters.
g, gCtx := errgroup.WithContext(ctx)
for _, r := range responses {
r := r
g.Go(util.RecoverPanic(func() error {
return r.response.Send(&ingestv1.MergeProfilesPprofRequest{
Request: &ingestv1.SelectProfilesRequest{
LabelSelector: req.Msg.LabelSelector,
Start: req.Msg.Start,
End: req.Msg.End,
Type: profileType,
},
})
}))

storeQueries := splitQueryToStores(model.Time(req.Start), model.Time(req.End), model.Now(), q.cfg.QueryStoreAfter, plan)
if !storeQueries.ingester.shouldQuery && !storeQueries.storeGateway.shouldQuery {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("start and end time are outside of the ingester and store gateway retention"))
}
if err := g.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)

storeQueries.Log(level.Debug(spanlogger.FromContext(ctx, q.logger)))

if plan == nil && !storeQueries.ingester.shouldQuery {
return q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
}
if plan == nil && !storeQueries.storeGateway.shouldQuery {
return q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
}

// merge all profiles
profile, err := selectMergePprofProfile(gCtx, profileType, responses)
if err != nil {
g, ctx := errgroup.WithContext(ctx)
var lock sync.Mutex
var merge pprof.ProfileMerge
g.Go(func() error {
ingesterProfile, err := q.selectProfileFromIngesters(ctx, storeQueries.ingester.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
err = merge.Merge(ingesterProfile)
lock.Unlock()
return err
})
g.Go(func() error {
storegatewayProfile, err := q.selectProfileFromStoreGateway(ctx, storeQueries.storeGateway.MergeProfileRequest(req), plan)
if err != nil {
return err
}
lock.Lock()
err = merge.Merge(storegatewayProfile)
lock.Unlock()
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()
profile.TimeNanos = model.Time(req.Msg.End).UnixNano()
return connect.NewResponse(profile), nil

return merge.Profile(), nil
}

func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) {
Expand Down
Loading
Loading