Skip to content

Commit

Permalink
Merge pull request #122 from grafana/20220726_correct-profiles-schema
Browse files Browse the repository at this point in the history
Iterate on profiles schema
  • Loading branch information
simonswine authored Jul 27, 2022
2 parents c481323 + 5219159 commit bb64ba7
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/firedb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e
KeepFrames: p.KeepFrames,
TimeNanos: p.TimeNanos,
DurationNanos: p.DurationNanos,
Comment: copySlice(p.Comment),
Comments: copySlice(p.Comment),
DefaultSampleType: p.DefaultSampleType,
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/firedb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (*profilesHelper) addToRewriter(r *rewriter, elemRewriter idConversionTable
}

func (*profilesHelper) rewrite(r *rewriter, s *schemav1.Profile) error {
for pos := range s.Comment {
r.strings.rewrite(&s.Comment[pos])
for pos := range s.Comments {
r.strings.rewrite(&s.Comments[pos])
}

r.strings.rewrite(&s.DropFrames)
Expand All @@ -279,7 +279,7 @@ func (*profilesHelper) size(p *schemav1.Profile) uint64 {
var size = profileSize

size += uint64(len(p.SeriesRefs) * 8)
size += uint64(len(p.Comment) * 8)
size += uint64(len(p.Comments) * 8)

for _, s := range p.Samples {
size += sizeOfSample(s)
Expand Down
40 changes: 20 additions & 20 deletions pkg/firedb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,64 +11,64 @@ import (

var (
stringRef = parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked)
pprofLabels = parquet.Repeated(fireparquet.Group{
pprofLabels = parquet.List(fireparquet.Group{
fireparquet.NewGroupField("Key", stringRef),
fireparquet.NewGroupField("Str", parquet.Optional(stringRef)),
fireparquet.NewGroupField("Num", parquet.Optional(parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked))),
fireparquet.NewGroupField("NumUnit", parquet.Optional(stringRef)),
})
sampleField = fireparquet.Group{
fireparquet.NewGroupField("StacktraceID", parquet.Encoded(parquet.Uint(64), &parquet.DeltaBinaryPacked)),
fireparquet.NewGroupField("Values", parquet.Repeated(parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked))),
fireparquet.NewGroupField("Values", parquet.List(parquet.Encoded(parquet.Int(64), &parquet.DeltaBinaryPacked))),
fireparquet.NewGroupField("Labels", pprofLabels),
}
profilesSchema = parquet.NewSchema("Profile", fireparquet.Group{
fireparquet.NewGroupField("ID", parquet.UUID()),
fireparquet.NewGroupField("SeriesRefs", parquet.Repeated(parquet.Encoded(parquet.Uint(64), &parquet.DeltaBinaryPacked))),
fireparquet.NewGroupField("Samples", parquet.Repeated(sampleField)),
fireparquet.NewGroupField("DropFrames", stringRef),
fireparquet.NewGroupField("KeepFrames", stringRef),
fireparquet.NewGroupField("SeriesRefs", parquet.List(parquet.Encoded(parquet.Uint(64), &parquet.DeltaBinaryPacked))),
fireparquet.NewGroupField("Samples", parquet.List(sampleField)),
fireparquet.NewGroupField("DropFrames", parquet.Optional(stringRef)),
fireparquet.NewGroupField("KeepFrames", parquet.Optional(stringRef)),
fireparquet.NewGroupField("TimeNanos", parquet.Timestamp(parquet.Nanosecond)),
fireparquet.NewGroupField("DurationNanos", parquet.Int(64)),
fireparquet.NewGroupField("Period", parquet.Int(64)),
fireparquet.NewGroupField("Comments", parquet.Repeated(stringRef)),
fireparquet.NewGroupField("DefaultSampleType", parquet.Int(64)),
fireparquet.NewGroupField("DurationNanos", parquet.Optional(parquet.Int(64))),
fireparquet.NewGroupField("Period", parquet.Optional(parquet.Int(64))),
fireparquet.NewGroupField("Comments", parquet.List(stringRef)),
fireparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))),
})
)

type Sample struct {
StacktraceID uint64 `parquet:",delta"`
Values []int64 `parquet:","`
Labels []*profilev1.Label `parquet:","`
Values []int64 `parquet:",list"`
Labels []*profilev1.Label `parquet:",list"`
}

type Profile struct {
// A unique UUID per ingested profile
ID uuid.UUID `parquet:",uuid"`

// SeriesRefs reference the underlying series in the TSDB index
SeriesRefs []model.Fingerprint `parquet:","`
SeriesRefs []model.Fingerprint `parquet:",list"`

// The set of samples recorded in this profile.
Samples []*Sample `parquet:","`
Samples []*Sample `parquet:",list"`

// frames with Function.function_name fully matching the following
// regexp will be dropped from the samples, along with their successors.
DropFrames int64 `parquet:","` // Index into string table.
DropFrames int64 `parquet:",optional"` // Index into string table.
// frames with Function.function_name fully matching the following
// regexp will be kept, even if it matches drop_frames.
KeepFrames int64 `parquet:","` // Index into string table.
KeepFrames int64 `parquet:",optional"` // Index into string table.
// Time of collection (UTC) represented as nanoseconds past the epoch.
TimeNanos int64 `parquet:",delta,timestamp(nanosecond)"`
// Duration of the profile, if a duration makes sense.
DurationNanos int64 `parquet:",delta"`
DurationNanos int64 `parquet:",delta,optional"`
// The number of events between sampled occurrences.
Period int64 `parquet:","`
Period int64 `parquet:",optional"`
// Freeform text associated to the profile.
Comment []int64 `parquet:"Comments,"` // Indices into string table.
Comments []int64 `parquet:",list"` // Indices into string table.
// Index into the string table of the type of the preferred sample
// value. If unset, clients should default to the last sample value.
DefaultSampleType int64 `parquet:","`
DefaultSampleType int64 `parquet:",optional"`
}

type ProfilePersister struct{}
Expand Down
70 changes: 68 additions & 2 deletions pkg/firedb/schemas/v1/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,29 @@ import (
"strings"
"testing"

"github.com/google/uuid"
"github.com/prometheus/common/model"
"github.com/segmentio/parquet-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

profilev1 "github.com/grafana/fire/pkg/gen/google/v1"
)

// This test ensures that the structs that are stored and the used schema matches
func TestSchemaMatch(t *testing.T) {
profilesStructSchema := parquet.SchemaOf(&Profile{})
require.Equal(t, profilesStructSchema.String(), profilesSchema.String())

// TODO: Unfortunately the upstream schema doesn't correctly produce a
// schema of a List of a struct pointer. This replaces this in the schema
// comparison, because this has no affect to our construct/reconstruct code
// we can simply replace the string in the schema.
profilesStructSchema := strings.ReplaceAll(
parquet.SchemaOf(&Profile{}).String(),
"optional group element",
"required group element",
)

require.Equal(t, profilesStructSchema, profilesSchema.String())

stacktracesStructSchema := parquet.SchemaOf(&storedStacktrace{})
require.Equal(t, strings.Replace(stacktracesStructSchema.String(), "message storedStacktrace", "message Stacktrace", 1), stacktracesSchema.String())
Expand Down Expand Up @@ -69,3 +83,55 @@ func TestStringsRoundTrip(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, newStrings(), sRead)
}

func newProfiles() []*Profile {
return []*Profile{
{
ID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TimeNanos: 1001,
SeriesRefs: []model.Fingerprint{0xaa, 0xab},
Samples: []*Sample{
{
StacktraceID: 0xba,
Values: []int64{0xca, 0xcc},
Labels: []*profilev1.Label{},
},
{
StacktraceID: 0xbb,
Values: []int64{0xca, 0xcc},
Labels: []*profilev1.Label{
{Key: 0xda, Str: 0xea},
},
},
},
Comments: []int64{},
},
{
ID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
SeriesRefs: []model.Fingerprint{0xab, 0xac},
TimeNanos: 1002,
Samples: []*Sample{
{
StacktraceID: 0xbc,
Values: []int64{0xcd, 0xce},
Labels: []*profilev1.Label{},
},
},
Comments: []int64{},
},
}
}

func TestProfilesRoundTrip(t *testing.T) {
var (
p = newProfiles()
w = &ReadWriter[*Profile, *ProfilePersister]{}
buf bytes.Buffer
)

require.NoError(t, w.WriteParquetFile(&buf, p))

sRead, err := w.ReadParquetFile(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
assert.Equal(t, newProfiles(), sRead)
}
5 changes: 4 additions & 1 deletion pkg/parquet/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ func (groupType) ConvertedType() *deprecated.ConvertedType { return nil }
func (f *groupField) Name() string { return f.name }

func (f *groupField) Value(base reflect.Value) reflect.Value {
if base.Kind() == reflect.Pointer {
if base.Kind() == reflect.Ptr {
if base.IsNil() {
base.Set(reflect.New(base.Type().Elem()))
}
return f.Value(base.Elem())
}
return base.FieldByName(f.name)
Expand Down

0 comments on commit bb64ba7

Please sign in to comment.