Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Refactoring dbShard #2848

Merged
merged 20 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,10 +1732,6 @@ func (n *dbNamespace) aggregateTiles(
return 0, errNamespaceNotBootstrapped
}

n.RLock()
nsCtx := n.nsContextWithRLock()
n.RUnlock()

var (
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
targetShards = n.OwnedShards()
Expand Down Expand Up @@ -1778,7 +1774,7 @@ func (n *dbNamespace) aggregateTiles(
}

shardProcessedTileCount, err := targetShard.AggregateTiles(
sourceNs.ID(), sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, opts, nsCtx.Schema)
sourceNs.ID(), n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, opts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done under RLock if we're passing in n?

Copy link
Collaborator Author

@linasm linasm Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method called accepts n as a storage.Namespace interface which only exposes public methods of dbNamespace. These public methods have locking when they access mutable fields of dbNamespace (eg. dbNamespace.Schema()). So I guess this should be fine?


processedTileCount += shardProcessedTileCount
processedShards.Inc(1)
Expand Down
22 changes: 16 additions & 6 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,8 +1415,8 @@ func TestNamespaceAggregateTiles(t *testing.T) {
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
secondSourceBlockStart = start.Add(sourceBlockSize)
sourceShard0ID uint32 = 10
sourceShard1ID uint32 = 20
shard0ID uint32 = 10
shard1ID uint32 = 20
insOpts = instrument.NewOptions()
)

Expand All @@ -1440,12 +1440,12 @@ func TestNamespaceAggregateTiles(t *testing.T) {
sourceNs.shards[0] = sourceShard0
sourceNs.shards[1] = sourceShard1

sourceShard0.EXPECT().ID().Return(sourceShard0ID)
sourceShard0.EXPECT().ID().Return(shard0ID)
sourceShard0.EXPECT().IsBootstrapped().Return(true)
sourceShard0.EXPECT().LatestVolume(start).Return(5, nil)
sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil)

sourceShard1.EXPECT().ID().Return(sourceShard1ID)
sourceShard1.EXPECT().ID().Return(shard1ID)
sourceShard1.EXPECT().IsBootstrapped().Return(true)
sourceShard1.EXPECT().LatestVolume(start).Return(7, nil)
sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil)
Expand All @@ -1462,8 +1462,18 @@ func TestNamespaceAggregateTiles(t *testing.T) {
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}

sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String())
targetShard0.EXPECT().AggregateTiles(sourceNsIDMatcher, sourceShard0ID, gomock.Any(), gomock.Any(), sourceBlockVolumes0, opts, targetNs.Schema()).Return(int64(3), nil)
targetShard1.EXPECT().AggregateTiles(sourceNsIDMatcher, sourceShard1ID, gomock.Any(), gomock.Any(), sourceBlockVolumes1, opts, targetNs.Schema()).Return(int64(2), nil)

targetShard0.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
sourceNsIDMatcher, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(sourceNs, opts)

Expand Down
235 changes: 31 additions & 204 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (
"sync"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/tile"
"github.com/m3db/m3/src/dbnode/generated/proto/annotation"
"github.com/m3db/m3/src/dbnode/generated/proto/pagetoken"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
Expand All @@ -47,7 +44,6 @@ import (
"github.com/m3db/m3/src/dbnode/storage/series/lookup"
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/ts/downsample"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/m3ninx/doc"
Expand All @@ -72,18 +68,16 @@ const (
)

var (
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
errShardInvalidPageToken = errors.New("shard could not unmarshal page token")
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid")
errNewShardEntryTagsIterNotAtIndexZero = errors.New("new shard entry options error: tags iter not at index zero")
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped")
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped")
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized")
errFlushStateAlreadyInitialized = errors.New("shard flush state is already initialized")
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard")
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
errShardInvalidPageToken = errors.New("shard could not unmarshal page token")
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid")
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped")
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped")
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized")
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard")

// ErrDatabaseLoadLimitHit is the error returned when the database load limit
// is hit or exceeded.
Expand Down Expand Up @@ -188,6 +182,7 @@ type dbShard struct {
currRuntimeOptions dbShardRuntimeOptions
logger *zap.Logger
metrics dbShardMetrics
tileAggregator TileAggregator
ticking bool
shard uint32
coldWritesEnabled bool
Expand Down Expand Up @@ -328,6 +323,7 @@ func newDatabaseShard(
coldWritesEnabled: namespaceMetadata.Options().ColdWritesEnabled(),
logger: opts.InstrumentOptions().Logger(),
metrics: newDatabaseShardMetrics(shard, scope),
tileAggregator: opts.TileAggregator(),
}
s.insertQueue = newDatabaseShardInsertQueue(s.insertSeriesBatch,
s.nowFn, scope, opts.InstrumentOptions().Logger())
Expand Down Expand Up @@ -2663,21 +2659,26 @@ func (s *dbShard) Repair(

func (s *dbShard) AggregateTiles(
sourceNsID ident.ID,
sourceShardID uint32,
targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
sourceBlockVolumes []shardBlockVolume,
opts AggregateTilesOptions,
targetSchemaDescr namespace.SchemaDescr,
) (int64, error) {
if len(blockReaders) != len(sourceBlockVolumes) {
return 0, fmt.Errorf("blockReaders and sourceBlockVolumes length mismatch (%d != %d)", len(blockReaders), len(sourceBlockVolumes))
return 0, fmt.Errorf(
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)",
len(blockReaders),
len(sourceBlockVolumes))
}

openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders))
defer func() {
for _, reader := range openBlockReaders {
reader.Close()
if err := reader.Close(); err != nil {
s.logger.Error("error closing DataFileSetReader", zap.Error(err))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typically no error in go error returns, change to something like "could not close DataFileSetReader"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
}()

Expand All @@ -2687,7 +2688,7 @@ func (s *dbShard) AggregateTiles(
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: sourceNsID,
Shard: sourceShardID,
Shard: shardID,
BlockStart: sourceBlockVolume.blockStart,
VolumeIndex: sourceBlockVolume.latestVolume,
},
Expand All @@ -2706,46 +2707,15 @@ func (s *dbShard) AggregateTiles(
zap.Int("volumeIndex", sourceBlockVolume.latestVolume))
return 0, err
}
if blockReader.Entries() > maxEntries {
maxEntries = blockReader.Entries()

entries := blockReader.Entries()
if entries > maxEntries {
maxEntries = entries
}

openBlockReaders = append(openBlockReaders, blockReader)
}

crossBlockReader, err := fs.NewCrossBlockReader(openBlockReaders, s.opts.InstrumentOptions())
if err != nil {
s.logger.Error("NewCrossBlockReader", zap.Error(err))
return 0, err
}
defer crossBlockReader.Close()

tileOpts := tile.Options{
FrameSize: opts.Step,
Start: xtime.ToUnixNano(opts.Start),
ReaderIteratorPool: s.opts.ReaderIteratorPool(),
}

readerIter, err := tile.NewSeriesBlockIterator(crossBlockReader, tileOpts)
if err != nil {
s.logger.Error("error when creating new series block iterator", zap.Error(err))
return 0, err
}

closed := false
defer func() {
if !closed {
if err := readerIter.Close(); err != nil {
// NB: log the error on ungraceful exit.
s.logger.Error("could not close read iterator on error", zap.Error(err))
}
}
}()

encoder := s.opts.EncoderPool().Get()
defer encoder.Close()
encoder.Reset(opts.Start, 0, targetSchemaDescr)

latestTargetVolume, err := s.LatestVolume(opts.Start)
if err != nil {
return 0, err
Expand All @@ -2764,54 +2734,12 @@ func (s *dbShard) AggregateTiles(
return 0, err
}

var (
annotationPayload annotation.Payload
// NB: there is a maximum of 4 datapoints per frame for counters.
downsampledValues = make([]downsample.Value, 0, 4)
processedTileCount int64
segmentCapacity int
writerData = make([][]byte, 2)
multiErr xerrors.MultiError
)

for readerIter.Next() {
seriesIter, id, encodedTags := readerIter.Current()

seriesTileCount, err := encodeAggregatedSeries(seriesIter, annotationPayload, downsampledValues, encoder)
if err != nil {
s.metrics.largeTilesWriteErrors.Inc(1)
multiErr = multiErr.Add(err)
break
}

if seriesTileCount == 0 {
break
}

processedTileCount += seriesTileCount
segment := encoder.DiscardReset(opts.Start, segmentCapacity, targetSchemaDescr)

segmentLen := segment.Len()
if segmentLen > segmentCapacity {
// Will use the same capacity for the next series.
segmentCapacity = segmentLen
}

writerData[0] = segment.Head.Bytes()
writerData[1] = segment.Tail.Bytes()
checksum := segment.CalculateChecksum()

if err := writer.WriteAll(id, encodedTags, writerData, checksum); err != nil {
s.metrics.largeTilesWriteErrors.Inc(1)
multiErr = multiErr.Add(err)
} else {
s.metrics.largeTilesWrites.Inc(1)
}

segment.Finalize()
}
var multiErr xerrors.MultiError

if err := readerIter.Err(); err != nil {
processedTileCount, err := s.tileAggregator.AggregateTiles(
opts, targetNs, s.ID(), openBlockReaders, writer)
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment here that we still need to do cleanup etc and should log any errors from that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
}

Expand All @@ -2833,11 +2761,6 @@ func (s *dbShard) AggregateTiles(
}
}

closed = true
if err := readerIter.Close(); err != nil {
multiErr = multiErr.Add(err)
}

if err := multiErr.FinalError(); err != nil {
return 0, err
}
Expand All @@ -2849,102 +2772,6 @@ func (s *dbShard) AggregateTiles(
return processedTileCount, nil
}

func encodeAggregatedSeries(
seriesIter tile.SeriesFrameIterator,
annotationPayload annotation.Payload,
downsampledValues []downsample.Value,
encoder encoding.Encoder,
) (int64, error) {
var (
prevFrameLastValue = math.NaN()
processedTileCount int64
handleValueResets bool
firstUnit xtime.Unit
firstAnnotation ts.Annotation
err error
)

for seriesIter.Next() {
frame := seriesIter.Current()

frameValues := frame.Values()
if len(frameValues) == 0 {
continue
}

if processedTileCount == 0 {
firstUnit, err = frame.Units().Value(0)
if err != nil {
return 0, err
}

firstAnnotation, err = frame.Annotations().Value(0)
if err != nil {
return 0, err
}

annotationPayload.Reset()
if annotationPayload.Unmarshal(firstAnnotation) == nil {
// NB: unmarshall error might be a result of some historical annotation data
// which is not compatible with protobuf payload struct. This would generally mean
// that metrics type is unknown, so we should ignore the error here.
handleValueResets = annotationPayload.HandleValueResets
}
}

downsampledValues = downsampledValues[:0]
lastIdx := len(frameValues) - 1

if handleValueResets {
// Last value plus possible few more datapoints to preserve counter semantics.
downsampledValues = downsample.DownsampleCounterResets(prevFrameLastValue, frameValues, downsampledValues)
} else {
// Plain last value per frame.
downsampledValue := downsample.Value{
FrameIndex: lastIdx,
Value: frameValues[lastIdx],
}
downsampledValues = append(downsampledValues, downsampledValue)
}

if err = encodeDownsampledValues(downsampledValues, frame, firstUnit, firstAnnotation, encoder); err != nil {
return 0, err
}

prevFrameLastValue = frameValues[lastIdx]
processedTileCount++
}

if err := seriesIter.Err(); err != nil {
return 0, err
}

return processedTileCount, nil
}

func encodeDownsampledValues(
downsampledValues []downsample.Value,
frame tile.SeriesBlockFrame,
unit xtime.Unit,
annotation ts.Annotation,
encoder encoding.Encoder,
) error {
for _, downsampledValue := range downsampledValues {
timestamp := frame.Timestamps()[downsampledValue.FrameIndex]
dp := ts.Datapoint{
Timestamp: timestamp,
TimestampNanos: xtime.ToUnixNano(timestamp),
Value: downsampledValue.Value,
}

if err := encoder.Encode(dp, unit, annotation); err != nil {
return err
}
}

return nil
}

func (s *dbShard) BootstrapState() BootstrapState {
s.RLock()
bs := s.bootstrapState
Expand Down
Loading