Skip to content

Commit

Permalink
fix downsampling issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shuaizhang committed Oct 11, 2019
1 parent 05f6239 commit dbdfb22
Showing 1 changed file with 72 additions and 24 deletions.
96 changes: 72 additions & 24 deletions pkg/compact/dedup/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,40 +131,92 @@ func (ss *SampleSeries) toRawChunkSeries() (*ChunkSeries, error) {
}

func (ss *SampleSeries) toDownsampleChunkSeries() (*ChunkSeries, error) {
all := make([][]chunks.Meta, len(downsampleAggrTypes))
for _, at := range downsampleAggrTypes {
chks, err := ss.toChunks(at)
countChks, err := ss.toChunks(downsample.AggrCount)
if err != nil {
return nil, err
}
result := make([]chunks.Meta, 0, len(countChks))
for _, countChk := range countChks {
var chks [5]chunkenc.Chunk
minTime := countChk.MinTime
maxTime := countChk.MaxTime
chks[downsample.AggrCount] = countChk.Chunk
sumChk, err := ss.toChunk(downsample.AggrSum, minTime, maxTime)
if err != nil {
return nil, err
}
all[at] = chks
}
numChks := len(all[downsample.AggrCount])
if numChks == 0 {
return nil, nil
}
result := make([]chunks.Meta, 0, numChks)
for i := 0; i < numChks; i++ {
var chks [5]chunkenc.Chunk
for _, at := range downsampleAggrTypes {
if all[at] == nil {
continue
}
chks[at] = all[at][i].Chunk
if sumChk != nil {
chks[downsample.AggrSum] = sumChk.Chunk
}
minChk, err := ss.toChunk(downsample.AggrMin, minTime, maxTime)
if err != nil {
return nil, err
}
if minChk != nil {
chks[downsample.AggrMin] = minChk.Chunk
}
maxChk, err := ss.toChunk(downsample.AggrMax, minTime, maxTime)
if err != nil {
return nil, err
}
if maxChk != nil {
chks[downsample.AggrMax] = maxChk.Chunk
}
counterChk, err := ss.toChunk(downsample.AggrCounter, minTime, maxTime)
if err != nil {
return nil, err
}
if counterChk != nil {
chks[downsample.AggrCounter] = counterChk.Chunk
}
result = append(result, chunks.Meta{
MinTime: all[downsample.AggrCount][i].MinTime,
MaxTime: all[downsample.AggrCount][i].MaxTime,
MinTime: minTime,
MaxTime: maxTime,
Chunk: downsample.EncodeAggrChunk(chks),
})
}

return &ChunkSeries{
lset: ss.lset,
chks: result,
}, nil
}

func (ss *SampleSeries) toChunk(at downsample.AggrType, minTime, maxTime int64) (*chunks.Meta, error) {
samples := ss.data[at]
if len(samples) == 0 {
return nil, nil
}
c := chunkenc.NewXORChunk()
appender, err := c.Appender()
if err != nil {
return nil, err
}
var lastSample *Sample
for _, sample := range samples {
if sample.timestamp < minTime {
continue
}
if sample.timestamp > maxTime {
break
}
appender.Append(sample.timestamp, sample.value)
lastSample = sample
}
if lastSample == nil {
return nil, nil
}
// InjectThanosMeta the chunk's counter aggregate with the last true sample.
if at == downsample.AggrCounter {
appender.Append(lastSample.timestamp, lastSample.value)
}
return &chunks.Meta{
MinTime: minTime,
MaxTime: maxTime,
Chunk: c,
}, nil
}


func (ss *SampleSeries) toChunks(at downsample.AggrType) ([]chunks.Meta, error) {
samples := ss.data[at]
if len(samples) == 0 {
Expand All @@ -190,10 +242,6 @@ func (ss *SampleSeries) toChunks(at downsample.AggrType) ([]chunks.Meta, error)
for _, v := range samples[start:end] {
appender.Append(v.timestamp, v.value)
}
// InjectThanosMeta the chunk's counter aggregate with the last true sample.
if at == downsample.AggrCounter {
appender.Append(samples[end-1].timestamp, samples[end-1].value)
}
chks = append(chks, chunks.Meta{
MinTime: samples[start].timestamp,
MaxTime: samples[end-1].timestamp,
Expand Down

0 comments on commit dbdfb22

Please sign in to comment.