Skip to content

Commit

Permalink
[dbnode] Refactoring dbShard (#2848)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored and robskillington committed Nov 12, 2020
1 parent b9547d8 commit 9509c82
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 366 deletions.
6 changes: 1 addition & 5 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,10 +1732,6 @@ func (n *dbNamespace) aggregateTiles(
return 0, errNamespaceNotBootstrapped
}

n.RLock()
nsCtx := n.nsContextWithRLock()
n.RUnlock()

var (
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
targetShards = n.OwnedShards()
Expand Down Expand Up @@ -1778,7 +1774,7 @@ func (n *dbNamespace) aggregateTiles(
}

shardProcessedTileCount, err := targetShard.AggregateTiles(
sourceNs.ID(), sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, opts, nsCtx.Schema)
sourceNs.ID(), n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, opts)

processedTileCount += shardProcessedTileCount
processedShards.Inc(1)
Expand Down
22 changes: 16 additions & 6 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,8 +1415,8 @@ func TestNamespaceAggregateTiles(t *testing.T) {
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
secondSourceBlockStart = start.Add(sourceBlockSize)
sourceShard0ID uint32 = 10
sourceShard1ID uint32 = 20
shard0ID uint32 = 10
shard1ID uint32 = 20
insOpts = instrument.NewOptions()
)

Expand All @@ -1440,12 +1440,12 @@ func TestNamespaceAggregateTiles(t *testing.T) {
sourceNs.shards[0] = sourceShard0
sourceNs.shards[1] = sourceShard1

sourceShard0.EXPECT().ID().Return(sourceShard0ID)
sourceShard0.EXPECT().ID().Return(shard0ID)
sourceShard0.EXPECT().IsBootstrapped().Return(true)
sourceShard0.EXPECT().LatestVolume(start).Return(5, nil)
sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil)

sourceShard1.EXPECT().ID().Return(sourceShard1ID)
sourceShard1.EXPECT().ID().Return(shard1ID)
sourceShard1.EXPECT().IsBootstrapped().Return(true)
sourceShard1.EXPECT().LatestVolume(start).Return(7, nil)
sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil)
Expand All @@ -1462,8 +1462,18 @@ func TestNamespaceAggregateTiles(t *testing.T) {
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}

sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String())
targetShard0.EXPECT().AggregateTiles(sourceNsIDMatcher, sourceShard0ID, gomock.Any(), gomock.Any(), sourceBlockVolumes0, opts, targetNs.Schema()).Return(int64(3), nil)
targetShard1.EXPECT().AggregateTiles(sourceNsIDMatcher, sourceShard1ID, gomock.Any(), gomock.Any(), sourceBlockVolumes1, opts, targetNs.Schema()).Return(int64(2), nil)

targetShard0.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts)

Expand Down
235 changes: 31 additions & 204 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (
"sync"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/tile"
"github.com/m3db/m3/src/dbnode/generated/proto/annotation"
"github.com/m3db/m3/src/dbnode/generated/proto/pagetoken"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
Expand All @@ -47,7 +44,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/series/lookup"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/downsample"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
Expand All @@ -72,18 +68,16 @@ const (
)

var (
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
errShardInvalidPageToken = errors.New("shard could not unmarshal page token")
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid")
errNewShardEntryTagsIterNotAtIndexZero = errors.New("new shard entry options error: tags iter not at index zero")
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped")
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped")
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized")
errFlushStateAlreadyInitialized = errors.New("shard flush state is already initialized")
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard")
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
errShardInvalidPageToken = errors.New("shard could not unmarshal page token")
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid")
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped")
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped")
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized")
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard")

// ErrDatabaseLoadLimitHit is the error returned when the database load limit
// is hit or exceeded.
Expand Down Expand Up @@ -188,6 +182,7 @@ type dbShard struct {
currRuntimeOptions dbShardRuntimeOptions
logger *zap.Logger
metrics dbShardMetrics
tileAggregator TileAggregator
ticking bool
shard uint32
coldWritesEnabled bool
Expand Down Expand Up @@ -328,6 +323,7 @@ func newDatabaseShard(
coldWritesEnabled: namespaceMetadata.Options().ColdWritesEnabled(),
logger: opts.InstrumentOptions().Logger(),
metrics: newDatabaseShardMetrics(shard, scope),
tileAggregator: opts.TileAggregator(),
}
s.insertQueue = newDatabaseShardInsertQueue(s.insertSeriesBatch,
s.nowFn, scope, opts.InstrumentOptions().Logger())
Expand Down Expand Up @@ -2663,21 +2659,26 @@ func (s *dbShard) Repair(

func (s *dbShard) AggregateTiles(
sourceNsID ident.ID,
sourceShardID uint32,
targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
sourceBlockVolumes []shardBlockVolume,
opts AggregateTilesOptions,
targetSchemaDescr namespace.SchemaDescr,
) (int64, error) {
if len(blockReaders) != len(sourceBlockVolumes) {
return 0, fmt.Errorf("blockReaders and sourceBlockVolumes length mismatch (%d != %d)", len(blockReaders), len(sourceBlockVolumes))
return 0, fmt.Errorf(
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)",
len(blockReaders),
len(sourceBlockVolumes))
}

openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders))
defer func() {
for _, reader := range openBlockReaders {
reader.Close()
if err := reader.Close(); err != nil {
s.logger.Error("could not close DataFileSetReader", zap.Error(err))
}
}
}()

Expand All @@ -2687,7 +2688,7 @@ func (s *dbShard) AggregateTiles(
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: sourceNsID,
Shard: sourceShardID,
Shard: shardID,
BlockStart: sourceBlockVolume.blockStart,
VolumeIndex: sourceBlockVolume.latestVolume,
},
Expand All @@ -2706,46 +2707,15 @@ func (s *dbShard) AggregateTiles(
zap.Int("volumeIndex", sourceBlockVolume.latestVolume))
return 0, err
}
if blockReader.Entries() > maxEntries {
maxEntries = blockReader.Entries()

entries := blockReader.Entries()
if entries > maxEntries {
maxEntries = entries
}

openBlockReaders = append(openBlockReaders, blockReader)
}

crossBlockReader, err := fs.NewCrossBlockReader(openBlockReaders, s.opts.InstrumentOptions())
if err != nil {
s.logger.Error("NewCrossBlockReader", zap.Error(err))
return 0, err
}
defer crossBlockReader.Close()

tileOpts := tile.Options{
FrameSize: opts.Step,
Start: xtime.ToUnixNano(opts.Start),
ReaderIteratorPool: s.opts.ReaderIteratorPool(),
}

readerIter, err := tile.NewSeriesBlockIterator(crossBlockReader, tileOpts)
if err != nil {
s.logger.Error("error when creating new series block iterator", zap.Error(err))
return 0, err
}

closed := false
defer func() {
if !closed {
if err := readerIter.Close(); err != nil {
// NB: log the error on ungraceful exit.
s.logger.Error("could not close read iterator on error", zap.Error(err))
}
}
}()

encoder := s.opts.EncoderPool().Get()
defer encoder.Close()
encoder.Reset(opts.Start, 0, targetSchemaDescr)

latestTargetVolume, err := s.LatestVolume(opts.Start)
if err != nil {
return 0, err
Expand All @@ -2764,54 +2734,12 @@ func (s *dbShard) AggregateTiles(
return 0, err
}

var (
annotationPayload annotation.Payload
// NB: there is a maximum of 4 datapoints per frame for counters.
downsampledValues = make([]downsample.Value, 0, 4)
processedTileCount int64
segmentCapacity int
writerData = make([][]byte, 2)
multiErr xerrors.MultiError
)

for readerIter.Next() {
seriesIter, id, encodedTags := readerIter.Current()

seriesTileCount, err := encodeAggregatedSeries(seriesIter, annotationPayload, downsampledValues, encoder)
if err != nil {
s.metrics.largeTilesWriteErrors.Inc(1)
multiErr = multiErr.Add(err)
break
}

if seriesTileCount == 0 {
break
}

processedTileCount += seriesTileCount
segment := encoder.DiscardReset(opts.Start, segmentCapacity, targetSchemaDescr)

segmentLen := segment.Len()
if segmentLen > segmentCapacity {
// Will use the same capacity for the next series.
segmentCapacity = segmentLen
}

writerData[0] = segment.Head.Bytes()
writerData[1] = segment.Tail.Bytes()
checksum := segment.CalculateChecksum()

if err := writer.WriteAll(id, encodedTags, writerData, checksum); err != nil {
s.metrics.largeTilesWriteErrors.Inc(1)
multiErr = multiErr.Add(err)
} else {
s.metrics.largeTilesWrites.Inc(1)
}

segment.Finalize()
}
var multiErr xerrors.MultiError

if err := readerIter.Err(); err != nil {
processedTileCount, err := s.tileAggregator.AggregateTiles(
opts, targetNs, s.ID(), openBlockReaders, writer)
if err != nil {
// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
}

Expand All @@ -2833,11 +2761,6 @@ func (s *dbShard) AggregateTiles(
}
}

closed = true
if err := readerIter.Close(); err != nil {
multiErr = multiErr.Add(err)
}

if err := multiErr.FinalError(); err != nil {
return 0, err
}
Expand All @@ -2849,102 +2772,6 @@ func (s *dbShard) AggregateTiles(
return processedTileCount, nil
}

func encodeAggregatedSeries(
seriesIter tile.SeriesFrameIterator,
annotationPayload annotation.Payload,
downsampledValues []downsample.Value,
encoder encoding.Encoder,
) (int64, error) {
var (
prevFrameLastValue = math.NaN()
processedTileCount int64
handleValueResets bool
firstUnit xtime.Unit
firstAnnotation ts.Annotation
err error
)

for seriesIter.Next() {
frame := seriesIter.Current()

frameValues := frame.Values()
if len(frameValues) == 0 {
continue
}

if processedTileCount == 0 {
firstUnit, err = frame.Units().Value(0)
if err != nil {
return 0, err
}

firstAnnotation, err = frame.Annotations().Value(0)
if err != nil {
return 0, err
}

annotationPayload.Reset()
if annotationPayload.Unmarshal(firstAnnotation) == nil {
// NB: unmarshall error might be a result of some historical annotation data
// which is not compatible with protobuf payload struct. This would generally mean
// that metrics type is unknown, so we should ignore the error here.
handleValueResets = annotationPayload.HandleValueResets
}
}

downsampledValues = downsampledValues[:0]
lastIdx := len(frameValues) - 1

if handleValueResets {
// Last value plus possible few more datapoints to preserve counter semantics.
downsampledValues = downsample.DownsampleCounterResets(prevFrameLastValue, frameValues, downsampledValues)
} else {
// Plain last value per frame.
downsampledValue := downsample.Value{
FrameIndex: lastIdx,
Value: frameValues[lastIdx],
}
downsampledValues = append(downsampledValues, downsampledValue)
}

if err = encodeDownsampledValues(downsampledValues, frame, firstUnit, firstAnnotation, encoder); err != nil {
return 0, err
}

prevFrameLastValue = frameValues[lastIdx]
processedTileCount++
}

if err := seriesIter.Err(); err != nil {
return 0, err
}

return processedTileCount, nil
}

func encodeDownsampledValues(
downsampledValues []downsample.Value,
frame tile.SeriesBlockFrame,
unit xtime.Unit,
annotation ts.Annotation,
encoder encoding.Encoder,
) error {
for _, downsampledValue := range downsampledValues {
timestamp := frame.Timestamps()[downsampledValue.FrameIndex]
dp := ts.Datapoint{
Timestamp: timestamp,
TimestampNanos: xtime.ToUnixNano(timestamp),
Value: downsampledValue.Value,
}

if err := encoder.Encode(dp, unit, annotation); err != nil {
return err
}
}

return nil
}

func (s *dbShard) BootstrapState() BootstrapState {
s.RLock()
bs := s.bootstrapState
Expand Down
Loading

0 comments on commit 9509c82

Please sign in to comment.