Skip to content

Commit

Permalink
Avoid hash collisions in the storage by not hashing grouping labels
Browse files Browse the repository at this point in the history
Instead, simply put into a string what was hashed before and use that
string as the key in the map instead of the uint64 hash value used before.

Hashing was essentially done for convenience, ignoring the ill effect
of a hash collision, should it ever occur. In contrast to other uses
of hashing of label maps, the hashing isn't needed here for
performance reasons. Neither CPU cycles nor memory usage is critical
here.

The downside is that the on-disk format changes. This commit includes
code to convert the old format if encountered. This code is planned to
be removed prior to the v1 release of the PGW.

Note about testing the conversion: I have verified manually that it
works and was planning to add a test. However, that test would require
adding a fixture in the old format. As this code will be removed in
the next released, I decided it's not worth the
complication. (Automated tests are there to make sure things keep
working after code changes. In this case, the next change will be to
remove the code to be tested.)

Signed-off-by: beorn7 <[email protected]>
  • Loading branch information
beorn7 committed Oct 10, 2019
1 parent ce41261 commit f55b64b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 16 deletions.
3 changes: 2 additions & 1 deletion handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package handler
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestWipeMetricStore(t *testing.T) {
metricCount := 5
mgs := storage.GroupingKeyToMetricGroup{}
for i := 0; i < metricCount; i++ {
mgs[uint64(i)] = storage.MetricGroup{}
mgs[fmt.Sprint(i)] = storage.MetricGroup{}
}
mms := MockMetricStore{metricGroups: mgs}

Expand Down
57 changes: 54 additions & 3 deletions storage/diskmetricstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -61,6 +62,10 @@ type mfStat struct {
copied bool // Has the MetricFamily already been copied?
}

// legacyGroupingKeyToMetricGroup is like GroupingKeyToMetricGroup but uses the
// old uint64 grouping key (a hash value) instead of a string grouping key.
type legacyGroupingKeyToMetricGroup map[uint64]MetricGroup

// NewDiskMetricStore returns a DiskMetricStore ready to use. To cleanly shut it
// down and free resources, the Shutdown() method has to be called.
//
Expand Down Expand Up @@ -264,7 +269,7 @@ func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) {
dms.lock.Lock()
defer dms.lock.Unlock()

key := model.LabelsToSignature(wr.Labels)
key := groupingKeyFor(wr.Labels)

if wr.MetricFamilies == nil {
// No MetricFamilies means delete request. Delete the whole
Expand Down Expand Up @@ -307,7 +312,7 @@ func (dms *DiskMetricStore) setPushFailedTimestamp(wr WriteRequest) {
dms.lock.Lock()
defer dms.lock.Unlock()

key := model.LabelsToSignature(wr.Labels)
key := groupingKeyFor(wr.Labels)

group, ok := dms.metricGroups[key]
if !ok {
Expand Down Expand Up @@ -425,7 +430,25 @@ func (dms *DiskMetricStore) restore() error {
}
defer f.Close()
d := gob.NewDecoder(f)
return d.Decode(&dms.metricGroups)
if err := d.Decode(&dms.metricGroups); err == nil {
return nil
}
// Convert legacy disk format. TODO(beorn7): Remove prior to v1 release.
level.Info(dms.logger).Log("msg", "could not decode persistence file, trying legacy v0.9 format")
legacyMetricGroups := legacyGroupingKeyToMetricGroup{}
// Need to rewind file and create new decoder.
if _, err := f.Seek(0, 0); err != nil {
return err
}
d = gob.NewDecoder(f)
if err := d.Decode(&legacyMetricGroups); err != nil {
return err // That's a real failure then, unrelated to the format change.
}
for _, mg := range legacyMetricGroups {
dms.metricGroups[groupingKeyFor(mg.Labels)] = mg
}
level.Info(dms.logger).Log("msg", "conversion from v0.9 legacy format successful, next persisting will happen in current format")
return nil
}

func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
Expand All @@ -437,6 +460,34 @@ func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
}
}

// groupingKeyFor creates a grouping key from the provided map of grouping
// labels. The grouping key is created by joining all label names and values
// together with model.SeparatorByte as a separator. The label names are sorted
// lexicographically before joining. In that way, the grouping key is both
// reproducible and unique.
func groupingKeyFor(labels map[string]string) string {
if len(labels) == 0 { // Super fast path.
return ""
}

labelNames := make([]string, 0, len(labels))
for labelName := range labels {
labelNames = append(labelNames, labelName)
}
sort.Strings(labelNames)

sb := strings.Builder{}
for i, labelName := range labelNames {
sb.WriteString(labelName)
sb.WriteByte(model.SeparatorByte)
sb.WriteString(labels[labelName])
if i+1 < len(labels) { // No separator at the end.
sb.WriteByte(model.SeparatorByte)
}
}
return sb.String()
}

// extractPredefinedHelpStrings extracts all the HELP strings from the provided
// gatherer so that the DiskMetricStore can fix deviations in pushed metrics.
func extractPredefinedHelpStrings(g prometheus.Gatherer) (map[string]string, error) {
Expand Down
49 changes: 38 additions & 11 deletions storage/diskmetricstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func addGroup(
groupingLabels map[string]string,
metrics NameToTimestampedMetricFamilyMap,
) {
mg[model.LabelsToSignature(groupingLabels)] = MetricGroup{
mg[groupingKeyFor(groupingLabels)] = MetricGroup{
Labels: groupingLabels,
Metrics: metrics,
}
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestAddDeletePersistRestore(t *testing.T) {
t.Error(err)
}
// Spot-check timestamp.
tmf := dms.metricGroups[model.LabelsToSignature(map[string]string{
tmf := dms.metricGroups[groupingKeyFor(map[string]string{
"job": "job1",
"instance": "instance2",
})].Metrics["mf1"]
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestAddDeletePersistRestore(t *testing.T) {
t.Error(err)
}
// Check that no empty map entry for job3 was left behind.
if _, stillExists := dms.metricGroups[model.LabelsToSignature(grouping5)]; stillExists {
if _, stillExists := dms.metricGroups[groupingKeyFor(grouping5)]; stillExists {
t.Error("An instance map for 'job3' still exists.")
}

Expand Down Expand Up @@ -1326,8 +1326,8 @@ func TestGetMetricFamiliesMap(t *testing.T) {
"instance": "instance2",
}

ls1 := model.LabelsToSignature(labels1)
ls2 := model.LabelsToSignature(labels2)
gk1 := groupingKeyFor(labels1)
gk2 := groupingKeyFor(labels2)

// Submit a single simple metric family.
ts1 := time.Now()
Expand Down Expand Up @@ -1366,13 +1366,13 @@ func TestGetMetricFamiliesMap(t *testing.T) {
// expectedMFMap is a multi-layered map that maps the labelset
// fingerprints to the corresponding metric family string
// representations. This is for test assertion purposes.
expectedMFMap := map[uint64]map[string]string{
ls1: {
expectedMFMap := map[string]map[string]string{
gk1: {
"mf3": mf3.String(),
pushMetricName: pushTimestamp.String(),
pushFailedMetricName: pushFailedTimestamp.String(),
},
ls2: {
gk2: {
"mf1": mf1b.String(),
"mf2": mf2.String(),
pushMetricName: newPushTimestampGauge(labels2, ts2).String(),
Expand Down Expand Up @@ -1450,6 +1450,33 @@ func TestHelpStringFix(t *testing.T) {

}

func TestGroupingKeyForLabels(t *testing.T) {
sep := string([]byte{model.SeparatorByte})
scenarios := []struct {
in map[string]string
out string
}{
{
in: map[string]string{},
out: "",
},
{
in: map[string]string{"foo": "bar"},
out: "foo" + sep + "bar",
},
{
in: map[string]string{"foo": "bar", "dings": "bums"},
out: "dings" + sep + "bums" + sep + "foo" + sep + "bar",
},
}

for _, s := range scenarios {
if want, got := s.out, groupingKeyFor(s.in); want != got {
t.Errorf("Want grouping key %q for labels %v, got %q.", want, s.in, got)
}
}
}

func checkMetricFamilies(dms *DiskMetricStore, expectedMFs ...*dto.MetricFamily) error {
gotMFs := dms.GetMetricFamilies()
if expected, got := len(expectedMFs), len(gotMFs); expected != got {
Expand Down Expand Up @@ -1479,7 +1506,7 @@ func checkMetricFamilies(dms *DiskMetricStore, expectedMFs ...*dto.MetricFamily)
return nil
}

func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[string]string) error {
func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[string]map[string]string) error {
mfMap := dms.GetMetricFamiliesMap()

if expected, got := len(expectedMFMap), len(mfMap); expected != got {
Expand All @@ -1489,7 +1516,7 @@ func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[
for k, v := range mfMap {
if innerMap, ok := expectedMFMap[k]; ok {
if len(innerMap) != len(v.Metrics) {
return fmt.Errorf("expected %d metric entries for labelSet fingerprint %d in map, but got %d",
return fmt.Errorf("expected %d metric entries for grouping key %s in map, but got %d",
len(innerMap), k, len(v.Metrics))
}
for metricName, metricString := range innerMap {
Expand All @@ -1498,7 +1525,7 @@ func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[
}
}
} else {
return fmt.Errorf("expected key value %d to be present in metric families in map", k)
return fmt.Errorf("expected grouping key %s to be present in metric families map", k)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type WriteRequest struct {

// GroupingKeyToMetricGroup is the first level of the metric store, keyed by
// grouping key.
type GroupingKeyToMetricGroup map[uint64]MetricGroup
type GroupingKeyToMetricGroup map[string]MetricGroup

// MetricGroup adds the grouping labels to a NameToTimestampedMetricFamilyMap.
type MetricGroup struct {
Expand Down

0 comments on commit f55b64b

Please sign in to comment.