Skip to content

Commit

Permalink
Merge pull request #293 from prometheus/beorn7/hash
Browse files Browse the repository at this point in the history
Avoid hash collisions in the storage by not hashing grouping labels
  • Loading branch information
beorn7 authored Oct 10, 2019
2 parents ce41261 + f55b64b commit bd99670
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 16 deletions.
3 changes: 2 additions & 1 deletion handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package handler
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -580,7 +581,7 @@ func TestWipeMetricStore(t *testing.T) {
metricCount := 5
mgs := storage.GroupingKeyToMetricGroup{}
for i := 0; i < metricCount; i++ {
mgs[uint64(i)] = storage.MetricGroup{}
mgs[fmt.Sprint(i)] = storage.MetricGroup{}
}
mms := MockMetricStore{metricGroups: mgs}

Expand Down
57 changes: 54 additions & 3 deletions storage/diskmetricstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -61,6 +62,10 @@ type mfStat struct {
copied bool // Has the MetricFamily already been copied?
}

// legacyGroupingKeyToMetricGroup is like GroupingKeyToMetricGroup but uses the
// old uint64 grouping key (a hash value) instead of a string grouping key.
type legacyGroupingKeyToMetricGroup map[uint64]MetricGroup

// NewDiskMetricStore returns a DiskMetricStore ready to use. To cleanly shut it
// down and free resources, the Shutdown() method has to be called.
//
Expand Down Expand Up @@ -264,7 +269,7 @@ func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) {
dms.lock.Lock()
defer dms.lock.Unlock()

key := model.LabelsToSignature(wr.Labels)
key := groupingKeyFor(wr.Labels)

if wr.MetricFamilies == nil {
// No MetricFamilies means delete request. Delete the whole
Expand Down Expand Up @@ -307,7 +312,7 @@ func (dms *DiskMetricStore) setPushFailedTimestamp(wr WriteRequest) {
dms.lock.Lock()
defer dms.lock.Unlock()

key := model.LabelsToSignature(wr.Labels)
key := groupingKeyFor(wr.Labels)

group, ok := dms.metricGroups[key]
if !ok {
Expand Down Expand Up @@ -425,7 +430,25 @@ func (dms *DiskMetricStore) restore() error {
}
defer f.Close()
d := gob.NewDecoder(f)
return d.Decode(&dms.metricGroups)
if err := d.Decode(&dms.metricGroups); err == nil {
return nil
}
// Convert legacy disk format. TODO(beorn7): Remove prior to v1 release.
level.Info(dms.logger).Log("msg", "could not decode persistence file, trying legacy v0.9 format")
legacyMetricGroups := legacyGroupingKeyToMetricGroup{}
// Need to rewind file and create new decoder.
if _, err := f.Seek(0, 0); err != nil {
return err
}
d = gob.NewDecoder(f)
if err := d.Decode(&legacyMetricGroups); err != nil {
return err // That's a real failure then, unrelated to the format change.
}
for _, mg := range legacyMetricGroups {
dms.metricGroups[groupingKeyFor(mg.Labels)] = mg
}
level.Info(dms.logger).Log("msg", "conversion from v0.9 legacy format successful, next persisting will happen in current format")
return nil
}

func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
Expand All @@ -437,6 +460,34 @@ func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
}
}

// groupingKeyFor creates a grouping key from the provided map of grouping
// labels. The grouping key is created by joining all label names and values
// together with model.SeparatorByte as a separator. The label names are sorted
// lexicographically before joining. In that way, the grouping key is both
// reproducible and unique.
func groupingKeyFor(labels map[string]string) string {
if len(labels) == 0 { // Super fast path.
return ""
}

labelNames := make([]string, 0, len(labels))
for labelName := range labels {
labelNames = append(labelNames, labelName)
}
sort.Strings(labelNames)

sb := strings.Builder{}
for i, labelName := range labelNames {
sb.WriteString(labelName)
sb.WriteByte(model.SeparatorByte)
sb.WriteString(labels[labelName])
if i+1 < len(labels) { // No separator at the end.
sb.WriteByte(model.SeparatorByte)
}
}
return sb.String()
}

// extractPredefinedHelpStrings extracts all the HELP strings from the provided
// gatherer so that the DiskMetricStore can fix deviations in pushed metrics.
func extractPredefinedHelpStrings(g prometheus.Gatherer) (map[string]string, error) {
Expand Down
49 changes: 38 additions & 11 deletions storage/diskmetricstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func addGroup(
groupingLabels map[string]string,
metrics NameToTimestampedMetricFamilyMap,
) {
mg[model.LabelsToSignature(groupingLabels)] = MetricGroup{
mg[groupingKeyFor(groupingLabels)] = MetricGroup{
Labels: groupingLabels,
Metrics: metrics,
}
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestAddDeletePersistRestore(t *testing.T) {
t.Error(err)
}
// Spot-check timestamp.
tmf := dms.metricGroups[model.LabelsToSignature(map[string]string{
tmf := dms.metricGroups[groupingKeyFor(map[string]string{
"job": "job1",
"instance": "instance2",
})].Metrics["mf1"]
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestAddDeletePersistRestore(t *testing.T) {
t.Error(err)
}
// Check that no empty map entry for job3 was left behind.
if _, stillExists := dms.metricGroups[model.LabelsToSignature(grouping5)]; stillExists {
if _, stillExists := dms.metricGroups[groupingKeyFor(grouping5)]; stillExists {
t.Error("An instance map for 'job3' still exists.")
}

Expand Down Expand Up @@ -1326,8 +1326,8 @@ func TestGetMetricFamiliesMap(t *testing.T) {
"instance": "instance2",
}

ls1 := model.LabelsToSignature(labels1)
ls2 := model.LabelsToSignature(labels2)
gk1 := groupingKeyFor(labels1)
gk2 := groupingKeyFor(labels2)

// Submit a single simple metric family.
ts1 := time.Now()
Expand Down Expand Up @@ -1366,13 +1366,13 @@ func TestGetMetricFamiliesMap(t *testing.T) {
// expectedMFMap is a multi-layered map that maps the labelset
// fingerprints to the corresponding metric family string
// representations. This is for test assertion purposes.
expectedMFMap := map[uint64]map[string]string{
ls1: {
expectedMFMap := map[string]map[string]string{
gk1: {
"mf3": mf3.String(),
pushMetricName: pushTimestamp.String(),
pushFailedMetricName: pushFailedTimestamp.String(),
},
ls2: {
gk2: {
"mf1": mf1b.String(),
"mf2": mf2.String(),
pushMetricName: newPushTimestampGauge(labels2, ts2).String(),
Expand Down Expand Up @@ -1450,6 +1450,33 @@ func TestHelpStringFix(t *testing.T) {

}

func TestGroupingKeyForLabels(t *testing.T) {
sep := string([]byte{model.SeparatorByte})
scenarios := []struct {
in map[string]string
out string
}{
{
in: map[string]string{},
out: "",
},
{
in: map[string]string{"foo": "bar"},
out: "foo" + sep + "bar",
},
{
in: map[string]string{"foo": "bar", "dings": "bums"},
out: "dings" + sep + "bums" + sep + "foo" + sep + "bar",
},
}

for _, s := range scenarios {
if want, got := s.out, groupingKeyFor(s.in); want != got {
t.Errorf("Want grouping key %q for labels %v, got %q.", want, s.in, got)
}
}
}

func checkMetricFamilies(dms *DiskMetricStore, expectedMFs ...*dto.MetricFamily) error {
gotMFs := dms.GetMetricFamilies()
if expected, got := len(expectedMFs), len(gotMFs); expected != got {
Expand Down Expand Up @@ -1479,7 +1506,7 @@ func checkMetricFamilies(dms *DiskMetricStore, expectedMFs ...*dto.MetricFamily)
return nil
}

func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[string]string) error {
func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[string]map[string]string) error {
mfMap := dms.GetMetricFamiliesMap()

if expected, got := len(expectedMFMap), len(mfMap); expected != got {
Expand All @@ -1489,7 +1516,7 @@ func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[
for k, v := range mfMap {
if innerMap, ok := expectedMFMap[k]; ok {
if len(innerMap) != len(v.Metrics) {
return fmt.Errorf("expected %d metric entries for labelSet fingerprint %d in map, but got %d",
return fmt.Errorf("expected %d metric entries for grouping key %s in map, but got %d",
len(innerMap), k, len(v.Metrics))
}
for metricName, metricString := range innerMap {
Expand All @@ -1498,7 +1525,7 @@ func checkMetricFamilyGroups(dms *DiskMetricStore, expectedMFMap map[uint64]map[
}
}
} else {
return fmt.Errorf("expected key value %d to be present in metric families in map", k)
return fmt.Errorf("expected grouping key %s to be present in metric families map", k)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type WriteRequest struct {

// GroupingKeyToMetricGroup is the first level of the metric store, keyed by
// grouping key.
type GroupingKeyToMetricGroup map[uint64]MetricGroup
type GroupingKeyToMetricGroup map[string]MetricGroup

// MetricGroup adds the grouping labels to a NameToTimestampedMetricFamilyMap.
type MetricGroup struct {
Expand Down

0 comments on commit bd99670

Please sign in to comment.