Skip to content

Commit

Permalink
Merge branch 'master' into linasn/dbnode-metrics
Browse files Browse the repository at this point in the history
* master:
  [dtest] Run tests on an existing cluster launched with docker compose (#3067)
  [coordinator] Disable downsampler matcher cache by default (#3080)
  [dbnode][m3ninx] Use new doc.Document in query results to reduce slice allocations (#3057)
  [matcher/coordinator] Add latency metrics to rule matching (#3083)
  [coordinator] Rollout augmentM3Tags flag to true by default (#3082)
  [aggregator] Add ActivePlacementVersion to tcp client (#3071)
  • Loading branch information
soundvibe committed Jan 13, 2021
2 parents 8f0abf5 + 482c510 commit 4097249
Show file tree
Hide file tree
Showing 73 changed files with 1,058 additions and 533 deletions.
24 changes: 24 additions & 0 deletions scripts/dtest/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3.5"
services:
dbnode01:
networks:
- dtest
image: m3dbnode:dev
container_name: dbnode01
ports:
- "0.0.0.0:2379:2379"
- "0.0.0.0:9000:9000"
volumes:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
coord01:
networks:
- dtest
image: m3coordinator:dev
container_name: coord01
ports:
- "0.0.0.0:7201:7201"
- "0.0.0.0:7204:7204"
volumes:
- "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
networks:
dtest:
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions scripts/dtest/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

set -ex
set -o pipefail

COMPOSE_FILE=./scripts/dtest/docker-compose.yml

function defer {
docker-compose -f "${COMPOSE_FILE}" down
}

trap defer EXIT

docker-compose -f "${COMPOSE_FILE}" up --detach

go test -v -tags=dtest ./src/cmd/tools/dtest/docker/harness
2 changes: 1 addition & 1 deletion site/content/m3query/architecture/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ chapter: true

## Overview

M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.github.io/m3/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
M3 Query and M3 Coordinator are written entirely in Go, M3 Query is as a query engine for [M3DB](https://m3db.io/) and M3 Coordinator is a remote read/write endpoint for Prometheus and M3DB. To learn more about Prometheus's remote endpoints and storage, [see here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
2 changes: 1 addition & 1 deletion site/content/m3query/config/annotated_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ writeWorkerPoolPolicy:
size: <int>

tagOptions:
# See here for more information: http://m3db.github.io/m3/how_to/query/#id-generation
# See here for more information under ID generation: https://m3db.io/docs/how_to/query/
idScheme: <id_scheme>

# lookbackDuration defines, at each step, how long we lookback until we see a non-NaN value.
Expand Down
2 changes: 1 addition & 1 deletion site/content/operator/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Namespace defines an M3DB namespace or points to a preset M3DB namespace.

## NamespaceOptions

NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.github.io/m3/operational_guide/namespace_configuration/ for more details.
NamespaceOptions defines parameters for an M3DB namespace. See https://m3db.io/docs/operational_guide/namespace_configuration/ for more details.

| Field | Description | Scheme | Required |
| ----- | ----------- | ------ | -------- |
Expand Down
12 changes: 12 additions & 0 deletions src/aggregator/client/tcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) {
return placement.Clone(), stagedPlacement.Version(), nil
}

// ActivePlacementVersion returns a copy of the currently active placement version. It is a far less expensive call
// than ActivePlacement, as it does not clone the placement.
func (c *TCPClient) ActivePlacementVersion() (int, error) {
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
if err != nil {
return 0, err
}
defer onStagedPlacementDoneFn()

return stagedPlacement.Version(), nil
}

// Flush flushes any remaining data buffered by the client.
func (c *TCPClient) Flush() error {
c.metrics.flush.Inc(1)
Expand Down
9 changes: 7 additions & 2 deletions src/aggregator/client/tcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,8 +799,8 @@ func TestTCPClientActivePlacement(t *testing.T) {
)

c.placementWatcher = watcher
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil)
stagedPlacement.EXPECT().Version().Return(42)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil).Times(2)
stagedPlacement.EXPECT().Version().Return(42).Times(2)
stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil)
mockPl.EXPECT().Clone().Return(emptyPl)

Expand All @@ -809,6 +809,11 @@ func TestTCPClientActivePlacement(t *testing.T) {
assert.Equal(t, 42, v)
assert.Equal(t, 2, doneCalls)
assert.Equal(t, emptyPl, pl)

v, err = c.ActivePlacementVersion()
assert.NoError(t, err)
assert.Equal(t, 42, v)
assert.Equal(t, 3, doneCalls)
}

func TestTCPClientInitAndClose(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
}
}

Expand Down
61 changes: 15 additions & 46 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type metricsAppenderOptions struct {
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool

clockOpts clock.Options
debugLogging bool
Expand Down Expand Up @@ -149,19 +148,16 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}
tags := a.originalTags

// Augment tags if necessary.
if a.augmentM3Tags {
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}

// Sort tags
Expand Down Expand Up @@ -190,11 +186,8 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
matchResult := a.matcher.ForwardMatch(id, fromNanos, toNanos)
id.Close()

// If we augmented metrics tags before running the forward match, then
// filter them out.
if a.augmentM3Tags {
tags.filterPrefix(metric.M3MetricsPrefix)
}
// filter out augmented metrics tags
tags.filterPrefix(metric.M3MetricsPrefix)

var dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
if opts.Override {
Expand All @@ -215,7 +208,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
append(a.curr.Pipelines, pipelines.Pipelines...)
}

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
} else {
Expand Down Expand Up @@ -358,7 +351,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
a.debugLogMatch("downsampler using built mapping staged metadatas",
debugLogMatchOptions{Meta: []metadata.StagedMetadata{a.curr}})

if err := a.addSamplesAppenders(tags, a.curr, unownedID); err != nil {
if err := a.addSamplesAppenders(tags, a.curr); err != nil {
return SamplesAppenderResult{}, err
}
}
Expand Down Expand Up @@ -472,31 +465,7 @@ func (a *metricsAppender) resetTags() {
a.originalTags = nil
}

func (a *metricsAppender) addSamplesAppenders(
originalTags *tags,
stagedMetadata metadata.StagedMetadata,
unownedID []byte,
) error {
// Check if any of the pipelines have tags or a graphite prefix to set.
var tagsExist bool
for _, pipeline := range stagedMetadata.Pipelines {
if len(pipeline.Tags) > 0 || len(pipeline.GraphitePrefix) > 0 {
tagsExist = true
break
}
}

// If we do not need to do any tag augmentation then just return.
if !a.augmentM3Tags && !tagsExist {
a.multiSamplesAppender.addSamplesAppender(samplesAppender{
agg: a.agg,
clientRemote: a.clientRemote,
unownedID: unownedID,
stagedMetadatas: []metadata.StagedMetadata{stagedMetadata},
})
return nil
}

func (a *metricsAppender) addSamplesAppenders(originalTags *tags, stagedMetadata metadata.StagedMetadata) error {
var (
pipelines []metadata.PipelineMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) {
}

// NB: expected ID is generated into human-readable form
// from tags in ForwardMatch mock above.
expected := fmt.Sprintf("foo%d-bar%d", i, i)
// from tags in ForwardMatch mock above. Also include the m3 type, which is included when matching.
// nolint:scopelint
expected := fmt.Sprintf("__m3_type__-gauge,foo%d-bar%d", i, i)
if expected != u.ID.String() {
// NB: if this fails, appender is holding state after Finalize.
return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String())
Expand Down
47 changes: 17 additions & 30 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"runtime"
"strings"
"time"

"github.com/m3db/m3/src/aggregator/aggregator"
Expand Down Expand Up @@ -82,7 +81,13 @@ const (
defaultOpenTimeout = 10 * time.Second
defaultBufferFutureTimedMetric = time.Minute
defaultVerboseErrors = true
defaultMatcherCacheCapacity = 100000
// defaultMatcherCacheCapacity sets the default matcher cache
// capacity to zero so that the cache is turned off.
// This is due to discovering that there is a lot of contention
// used by the cache and the fact that most coordinators are used
// in a stateless manner with a central deployment which in turn
// leads to an extremely low cache hit ratio anyway.
defaultMatcherCacheCapacity = 0
)

var (
Expand Down Expand Up @@ -225,10 +230,9 @@ type agg struct {
aggregator aggregator.Aggregator
clientRemote client.Client

clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
}

// Configuration configurates a downsampler.
Expand Down Expand Up @@ -262,14 +266,6 @@ type Configuration struct {

// EntryTTL determines how long an entry remains alive before it may be expired due to inactivity.
EntryTTL time.Duration `yaml:"entryTTL"`

// AugmentM3Tags will augment the metric type to aggregated metrics
// to be used within the filter for rules. If enabled, for example,
// your filter can specify '__m3_type__:gauge' to filter by gauges.
// This is particularly useful for Graphite metrics today.
// Furthermore, the option is automatically enabled if static rules are
// used and any filter contain an __m3_type__ tag.
AugmentM3Tags bool `yaml:"augmentM3Tags"`
}

// MatcherConfiguration is the configuration for the rule matcher.
Expand Down Expand Up @@ -537,7 +533,7 @@ func (r RollupRuleConfiguration) Rule() (view.RollupRule, error) {
targetPipeline := pipeline.NewPipeline(ops)

targets := []view.RollupTarget{
view.RollupTarget{
{
Pipeline: targetPipeline,
StoragePolicies: storagePolicies,
},
Expand Down Expand Up @@ -658,7 +654,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
scope = instrumentOpts.MetricsScope()
logger = instrumentOpts.Logger()
openTimeout = defaultOpenTimeout
augmentM3Tags = cfg.AugmentM3Tags
namespaceTag = defaultNamespaceTag
)
if o.StorageFlushConcurrency > 0 {
Expand Down Expand Up @@ -717,9 +712,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
rs := rules.NewEmptyRuleSet(defaultConfigInMemoryNamespace,
updateMetadata)
for _, mappingRule := range cfg.Rules.MappingRules {
if strings.Contains(mappingRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := mappingRule.Rule()
if err != nil {
return agg{}, err
Expand All @@ -732,9 +724,6 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

for _, rollupRule := range cfg.Rules.RollupRules {
if strings.Contains(rollupRule.Filter, metric.M3MetricsPrefixString) {
augmentM3Tags = true
}
rule, err := rollupRule.Rule()
if err != nil {
return agg{}, err
Expand Down Expand Up @@ -788,10 +777,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
clientRemote: client,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down Expand Up @@ -953,10 +941,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ func (bsc BootstrapConfiguration) New(
adminClient client.AdminClient,
) (bootstrap.ProcessProvider, error) {
idxOpts := opts.IndexOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/tools/dtest/docker/harness/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ var singleDBNodeDockerResources resources.DockerResources

func TestMain(m *testing.M) {
var err error
singleDBNodeDockerResources, err = resources.SetupSingleM3DBNode()
singleDBNodeDockerResources, err = resources.SetupSingleM3DBNode(
resources.WithExistingCluster("dbnode01", "coord01"),
)

if err != nil {
fmt.Println("could not set up db docker containers", err)
os.Exit(1)
}

if l := len(singleDBNodeDockerResources.Nodes()); l != 1 {
singleDBNodeDockerResources.Cleanup() //nolint:errcheck
fmt.Println("should only have a single node, have", l)
os.Exit(1)
}

code := m.Run()
singleDBNodeDockerResources.Cleanup() //nolint:errcheck
os.Exit(code)
}
Loading

0 comments on commit 4097249

Please sign in to comment.