Skip to content

Commit

Permalink
[db] Add an option to use db writes in index mode (#1596)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored May 2, 2019
1 parent e50a400 commit def07b4
Show file tree
Hide file tree
Showing 17 changed files with 561 additions and 44 deletions.
44 changes: 43 additions & 1 deletion src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/environment"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/x/config/hostid"
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
Expand Down Expand Up @@ -70,6 +71,9 @@ type DBConfiguration struct {
// Index configuration.
Index IndexConfiguration `yaml:"index"`

// Transforms configuration.
Transforms TransformConfiguration `yaml:"transforms"`

// Logging configuration.
Logging xlog.Configuration `yaml:"logging"`

Expand Down Expand Up @@ -165,6 +169,10 @@ func (c *DBConfiguration) InitDefaultsAndValidate() error {
return err
}

if err := c.Transforms.Validate(); err != nil {
return err
}

return nil
}

Expand All @@ -173,8 +181,42 @@ type IndexConfiguration struct {
// MaxQueryIDsConcurrency controls the maximum number of outstanding QueryID
// requests that can be serviced concurrently. Limiting the concurrency is
// important to prevent index queries from overloading the database entirely
// as they are very CPU-intensive (regex and FST matching.)
// as they are very CPU-intensive (regex and FST matching).
MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"`

// ForwardIndexProbability determines the likelihood that an incoming write is
// written to the next block, when arriving close to the block boundary.
//
// NB: this is an optimization which lessens pressure on the index around
// block boundaries by eagerly writing the series to the next block
// preemptively.
ForwardIndexProbability float64 `yaml:"forwardIndexProbability" validate:"min=0.0,max=1.0"`

// ForwardIndexThreshold determines the threshold for forward writes, as a
// fraction of the given namespace's bufferFuture.
//
// NB: this is an optimization which lessens pressure on the index around
// block boundaries by eagerly writing the series to the next block
// preemptively.
ForwardIndexThreshold float64 `yaml:"forwardIndexThreshold" validate:"min=0.0,max=1.0"`
}

// TransformConfiguration contains configuration options that can transform
// incoming writes.
type TransformConfiguration struct {
// TruncateBy determines what type of truncatation is applied to incoming
// writes.
TruncateBy series.TruncateType `yaml:"truncateBy"`
// ForcedValue determines what to set all incoming write values to.
ForcedValue *float64 `yaml:"forceValue"`
}

func (c *TransformConfiguration) Validate() error {
if c == nil {
return nil
}

return c.TruncateBy.Validate()
}

// TickConfiguration is the tick configuration for background processing of
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func TestConfiguration(t *testing.T) {
expected := `db:
index:
maxQueryIDsConcurrency: 0
forwardIndexProbability: 0
forwardIndexThreshold: 0
transforms:
truncateBy: 0
forceValue: null
logging:
file: /var/log/m3dbnode.log
level: info
Expand Down
15 changes: 14 additions & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,17 @@ func withEncodingAndPoolingOptions(
aggregateQueryResultsPool := index.NewAggregateResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool")))

// Set value transformation options.
opts = opts.SetTruncateType(cfg.Transforms.TruncateBy)
forcedValue := cfg.Transforms.ForcedValue
if forcedValue != nil {
opts = opts.SetWriteTransformOptions(series.WriteTransformOptions{
ForceValueEnabled: true,
ForceValue: *forcedValue,
})
}

// Set index options.
indexOpts := opts.IndexOptions().
SetInstrumentOptions(iopts).
SetMemSegmentOptions(
Expand All @@ -1294,7 +1305,9 @@ func withEncodingAndPoolingOptions(
SetIdentifierPool(identifierPool).
SetCheckedBytesPool(bytesPool).
SetQueryResultsPool(queryResultsPool).
SetAggregateResultsPool(aggregateQueryResultsPool)
SetAggregateResultsPool(aggregateQueryResultsPool).
SetForwardIndexProbability(cfg.Index.ForwardIndexProbability).
SetForwardIndexThreshold(cfg.Index.ForwardIndexThreshold)

queryResultsPool.Init(func() index.QueryResults {
// NB(r): Need to initialize after setting the index opts so
Expand Down
56 changes: 56 additions & 0 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions src/dbnode/storage/index/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func init() {

// nolint: maligned
type opts struct {
forwardIndexThreshold float64
forwardIndexProbability float64
insertMode InsertMode
clockOpts clock.Options
instrumentOpts instrument.Options
Expand Down Expand Up @@ -379,3 +381,23 @@ func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Opt
func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions {
return o.readThroughSegmentOptions
}

func (o *opts) SetForwardIndexProbability(value float64) Options {
opts := *o
opts.forwardIndexProbability = value
return &opts
}

func (o *opts) ForwardIndexProbability() float64 {
return o.forwardIndexProbability
}

func (o *opts) SetForwardIndexThreshold(value float64) Options {
opts := *o
opts.forwardIndexThreshold = value
return &opts
}

func (o *opts) ForwardIndexThreshold() float64 {
return o.forwardIndexThreshold
}
13 changes: 13 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,4 +850,17 @@ type Options interface {

// ReadThroughSegmentOptions returns the read through segment cache options.
ReadThroughSegmentOptions() ReadThroughSegmentOptions

// SetForwardIndexProbability sets the probability chance for forward writes.
SetForwardIndexProbability(value float64) Options

// ForwardIndexProbability returns the probability chance for forward writes.
ForwardIndexProbability() float64

// SetForwardIndexProbability sets the threshold for forward writes as a
// fraction of the bufferFuture.
SetForwardIndexThreshold(value float64) Options

// ForwardIndexProbability returns the threshold for forward writes.
ForwardIndexThreshold() float64
}
10 changes: 8 additions & 2 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,11 @@ func (n *dbNamespace) Write(
n.metrics.write.ReportError(n.nowFn().Sub(callStart))
return ts.Series{}, false, err
}
opts := series.WriteOptions{
TruncateType: n.opts.TruncateType(),
}
series, wasWritten, err := shard.Write(ctx, id, timestamp,
value, unit, annotation, series.WriteOptions{})
value, unit, annotation, opts)
n.metrics.write.ReportSuccessOrError(err, n.nowFn().Sub(callStart))
return series, wasWritten, err
}
Expand All @@ -616,8 +619,11 @@ func (n *dbNamespace) WriteTagged(
n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart))
return ts.Series{}, false, err
}
opts := series.WriteOptions{
TruncateType: n.opts.TruncateType(),
}
series, wasWritten, err := shard.WriteTagged(ctx, id, tags, timestamp,
value, unit, annotation, series.WriteOptions{})
value, unit, annotation, opts)
n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart))
return series, wasWritten, err
}
Expand Down
Loading

0 comments on commit def07b4

Please sign in to comment.