Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/query: Group by arbitrary labels #5007

Merged
merged 12 commits into from
Oct 14, 2024
18,736 changes: 9,368 additions & 9,368 deletions pkg/ingester/testdata/ingest_arrow.json
100644 → 100755

Large diffs are not rendered by default.

18,736 changes: 9,368 additions & 9,368 deletions pkg/ingester/testdata/ingest_uncompressed_arrow.json
100644 → 100755

Large diffs are not rendered by default.

103 changes: 11 additions & 92 deletions pkg/normalizer/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -66,10 +67,8 @@ type Series struct {
}

type NormalizedWriteRawRequest struct {
Series []Series
AllLabelNames []string
AllPprofLabelNames []string
AllPprofNumLabelNames []string
Series []Series
AllLabelNames []string
}

func MetaFromPprof(p *pprofpb.Profile, name string, sampleIndex int) profile.Meta {
Expand Down Expand Up @@ -129,9 +128,7 @@ func WriteRawRequestToArrowRecord(
}

ps, err := schema.GetDynamicParquetSchema(map[string][]string{
profile.ColumnLabels: normalizedRequest.AllLabelNames,
profile.ColumnPprofLabels: normalizedRequest.AllPprofLabelNames,
profile.ColumnPprofNumLabels: normalizedRequest.AllPprofNumLabelNames,
profile.ColumnLabels: normalizedRequest.AllLabelNames,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -310,42 +307,6 @@ func WriteRawRequestToArrowRecord(
}
}
}
case profile.ColumnPprofLabels:
for _, name := range normalizedRequest.AllPprofLabelNames {
cBuilder := b.Field(b.Schema().FieldIndices(col.Name + "." + name)[0]).(*array.BinaryDictionaryBuilder)
for _, series := range normalizedRequest.Series {
for _, sample := range series.Samples {
for _, p := range sample {
for _, ns := range p.Samples {
if val, ok := ns.Label[name]; ok {
if err := cBuilder.AppendString(val); err != nil {
return nil, err
}
} else {
cBuilder.AppendNull()
}
}
}
}
}
}
case profile.ColumnPprofNumLabels:
for _, name := range normalizedRequest.AllPprofNumLabelNames {
cBuilder := b.Field(b.Schema().FieldIndices(col.Name + "." + name)[0]).(*array.Int64Builder)
for _, series := range normalizedRequest.Series {
for _, sample := range series.Samples {
for _, p := range sample {
for _, ns := range p.Samples {
if val, ok := ns.NumLabel[name]; ok {
cBuilder.Append(val)
} else {
cBuilder.AppendNull()
}
}
}
}
}
}
default:
panic(fmt.Sprintf("unknown column: %s", col.Name))
}
Expand Down Expand Up @@ -527,8 +488,6 @@ func serializePprofStacktrace(

func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawRequest) (NormalizedWriteRawRequest, error) {
allLabelNames := make(map[string]struct{})
allPprofLabelNames := make(map[string]struct{})
allPprofNumLabelNames := make(map[string]struct{})

series := make([]Series, 0, len(req.Series))
for _, rawSeries := range req.Series {
Expand Down Expand Up @@ -581,12 +540,12 @@ func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawR
return NormalizedWriteRawRequest{}, status.Errorf(codes.InvalidArgument, "invalid profile: %v", err)
}

// Find all pprof label names and add them to the list of (infrastructure) label names
LabelNamesFromSamples(
ls,
p.StringTable,
p.Sample,
allPprofLabelNames,
allPprofNumLabelNames,
allLabelNames,
)

normalizedProfiles, err := NormalizePprof(ctx, name, ls, p, req.Normalized, sample.ExecutableInfo)
Expand All @@ -603,11 +562,12 @@ func NormalizeWriteRawRequest(ctx context.Context, req *profilestorepb.WriteRawR
})
}

allLabelNamesKeys := maps.Keys(allLabelNames)
slices.Sort(allLabelNamesKeys)

return NormalizedWriteRawRequest{
Series: series,
AllLabelNames: sortedKeys(allLabelNames),
AllPprofLabelNames: sortedKeys(allPprofLabelNames),
AllPprofNumLabelNames: sortedKeys(allPprofNumLabelNames),
Series: series,
AllLabelNames: allLabelNamesKeys,
}, nil
}

Expand All @@ -616,7 +576,6 @@ func LabelNamesFromSamples(
stringTable []string,
samples []*pprofpb.Sample,
allLabels map[string]struct{},
allNumLabels map[string]struct{},
) {
labels := map[string]struct{}{}
for _, sample := range samples {
Expand Down Expand Up @@ -648,28 +607,6 @@ func LabelNamesFromSamples(
for labelName := range resLabels {
allLabels[labelName] = struct{}{}
}

for _, sample := range samples {
for _, label := range sample.Label {
key := stringTable[label.Key]
if label.Num != 0 {
key = strutil.SanitizeLabelName(key)
if _, ok := allNumLabels[key]; !ok {
allNumLabels[key] = struct{}{}
}
}
}
}
}

func sortedKeys(m map[string]struct{}) []string {
if len(m) == 0 {
return nil
}

out := maps.Keys(m)
sort.Strings(out)
return out
}

// SampleToParquetRow converts a sample to a Parquet row. The passed labels
Expand Down Expand Up @@ -739,24 +676,6 @@ func SampleToParquetRow(
}
columnIndex++
}
case profile.ColumnPprofLabels:
for _, name := range profileLabelNames {
if value, ok := s.Label[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
} else {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
columnIndex++
}
case profile.ColumnPprofNumLabels:
for _, name := range profileNumLabelNames {
if value, ok := s.NumLabel[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
} else {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
columnIndex++
}
default:
panic(fmt.Errorf("conversion not implement for column: %s", column.Name))
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/parca/parca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func MustReadAllGzip(t require.TestingT, filename string) []byte {
}

func TestConsistency(t *testing.T) {
t.Skipf("skipped, need to think how we want to bring back consistency exports without pprof_labels")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we don't store the pprof_labels anymore, it's impossible to export the same profile as pprof download from now on.


t.Parallel()

ctx := context.Background()
Expand Down Expand Up @@ -469,6 +471,9 @@ func TestLabels(t *testing.T) {

ts := timestamppb.New(timestamp.Time(1677488315039)) // time_nanos of the profile divided by 1e6
res, err := api.Query(ctx, &querypb.QueryRequest{
GroupBy: &querypb.GroupBy{
Fields: []string{"labels.api"},
},
ReportType: querypb.QueryRequest_REPORT_TYPE_PPROF,
Options: &querypb.QueryRequest_Single{
Single: &querypb.SingleProfile{
Expand All @@ -488,6 +493,6 @@ func TestLabels(t *testing.T) {
got[l] = struct{}{}
}
}
want := map[string]struct{}{"api": {}}
require.Equal(t, want, got, "profile should contain pprof_labels from the original profile only")
want := map[string]struct{}{}
require.Equal(t, want, got, "profile should contain labels from the original profile only")
}
4 changes: 2 additions & 2 deletions pkg/parcacol/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (c *ArrowToProfileConverter) Convert(

labelIndexes := make(map[string]int)
for i, field := range schema.Fields() {
if strings.HasPrefix(field.Name, profile.ColumnPprofLabelsPrefix) {
labelIndexes[strings.TrimPrefix(field.Name, profile.ColumnPprofLabelsPrefix)] = i
if strings.HasPrefix(field.Name, profile.ColumnLabelsPrefix) {
labelIndexes[strings.TrimPrefix(field.Name, profile.ColumnLabelsPrefix)] = i
}
}

Expand Down
Loading
Loading