Skip to content

Commit

Permalink
Merge branch 'r/trace-decode-time' of ssh://github.com/m3db/m3 into r…
Browse files Browse the repository at this point in the history
…/trace-decode-time
  • Loading branch information
rallen090 committed Feb 24, 2020
2 parents ebb5aca + c434c92 commit 75e4f7d
Show file tree
Hide file tree
Showing 30 changed files with 2,395 additions and 108 deletions.
2,011 changes: 2,011 additions & 0 deletions integrations/grafana/m3aggregator_end_to_end_details.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ services:
- "7204"
ports:
- "0.0.0.0:7202:7202"
- "0.0.0.0:7203:7203"
- "0.0.0.0:7204:7204"
networks:
- backend
Expand Down
14 changes: 7 additions & 7 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ rawtcp:

kvClient:
etcd:
env: default_env
env: override_test_env
zone: embedded
service: m3aggregator
cacheDir: /var/lib/m3kv
Expand All @@ -82,7 +82,7 @@ kvClient:

runtimeOptions:
kvConfig:
environment: default_env
environment: override_test_env
zone: embedded
writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
writeValuesPerMetricLimitPerSecond: 0
Expand Down Expand Up @@ -131,7 +131,7 @@ aggregator:
placementKV:
namespace: /placement
zone: embedded
environment: default_env
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 15s
Expand Down Expand Up @@ -164,7 +164,7 @@ aggregator:
placementManager:
kvConfig:
namespace: /placement
environment: default_env
environment: override_test_env
zone: embedded
placementWatcher:
key: m3aggregator
Expand All @@ -175,7 +175,7 @@ aggregator:
resignTimeout: 1m
flushTimesManager:
kvConfig:
environment: default_env
environment: override_test_env
zone: embedded
flushTimesKeyFmt: shardset/%d/flush
flushTimesPersistRetrier:
Expand All @@ -190,7 +190,7 @@ aggregator:
ttlSeconds: 10
serviceID:
name: m3aggregator
environment: default_env
environment: override_test_env
zone: embedded
electionKeyFmt: shardset/%d/lock
campaignRetrier:
Expand Down Expand Up @@ -243,7 +243,7 @@ aggregator:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: default_env
environment: override_test_env
messageRetry:
initialBackoff: 1m
maxBackoff: 2m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ downsample:
client:
placementKV:
namespace: /placement
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
Expand Down
6 changes: 3 additions & 3 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ echo "Setup DB node"
setup_single_m3db_node

echo "Initializing aggregator topology"
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/services/m3aggregator/placement/init -d '{
"num_shards": 64,
"replication_factor": 2,
"instances": [
Expand All @@ -51,7 +51,7 @@ curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init
}'

echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

Expand All @@ -75,7 +75,7 @@ echo "Done validating topology"

# Do this after placement for m3coordinator is created.
echo "Adding m3coordinator as a consumer to the aggregator topic"
curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
Expand Down
23 changes: 23 additions & 0 deletions scripts/docker-integration-tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ TESTS=(
scripts/docker-integration-tests/coordinator_config_rules/test.sh
)

# Some systems, including our default Buildkite hosts, don't come with netcat
# installed and we may not have perms to install it. "Install" it in the worst
# possible way.
if ! command -v nc && [[ "$BUILDKITE" == "true" ]]; then
echo "installing netcat"
NCDIR="$(mktemp -d)"

yumdownloader --destdir "$NCDIR" --resolve nc
(
cd "$NCDIR"
RPM=$(find . -maxdepth 1 -name '*.rpm' | tail -n1)
rpm2cpio "$RPM" | cpio -id
)

export PATH="$PATH:$NCDIR/usr/bin"

function cleanup_nc() {
rm -rf "$NCDIR"
}

trap cleanup_nc EXIT
fi

scripts/docker-integration-tests/setup.sh

NUM_TESTS=${#TESTS[@]}
Expand Down
4 changes: 4 additions & 0 deletions src/aggregator/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Configuration struct {
ShardCutoffLingerDuration *time.Duration `yaml:"shardCutoffLingerDuration"`
Encoder EncoderConfiguration `yaml:"encoder"`
FlushSize int `yaml:"flushSize"`
MaxBatchSize int `yaml:"maxBatchSize"`
MaxTimerBatchSize int `yaml:"maxTimerBatchSize"`
QueueSize int `yaml:"queueSize"`
QueueDropType *DropType `yaml:"queueDropType"`
Expand Down Expand Up @@ -127,6 +128,9 @@ func (c *Configuration) newClientOptions(
if c.FlushSize != 0 {
opts = opts.SetFlushSize(c.FlushSize)
}
if c.MaxBatchSize != 0 {
opts = opts.SetMaxBatchSize(c.MaxBatchSize)
}
if c.MaxTimerBatchSize != 0 {
opts = opts.SetMaxTimerBatchSize(c.MaxTimerBatchSize)
}
Expand Down
10 changes: 6 additions & 4 deletions src/aggregator/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ const (
// By default there is no limit on the timer batch size.
defaultMaxTimerBatchSize = 0

defaultInstanceQueueSize = 4096
// defaultInstanceQueueSize determines how many metrics can be buffered
// before it must wait for an existing batch to be flushed to an instance.
defaultInstanceQueueSize = 2 << 15 // ~65k

// By default traffic is cut over to shards 10 minutes before the designated
// cutover time in case there are issues with the instances owning the shards.
Expand All @@ -51,10 +53,10 @@ const (
// By default the oldest metrics in the queue are dropped when it is full.
defaultDropType = DropOldest

// By default set maximum batch size to 32k
defaultMaxBatchSize = 2 << 14
// By default set maximum batch size to 8mb.
defaultMaxBatchSize = 2 << 22

// By default write at least every 100ms
// By default write at least every 100ms.
defaultBatchFlushDeadline = 100 * time.Millisecond
)

Expand Down
28 changes: 27 additions & 1 deletion src/dbnode/integration/admin_session_fetch_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -123,6 +123,32 @@ func testSetupMetadatas(
return metadatasByShard
}

func filterSeriesByShard(
testSetup *testSetup,
seriesMap map[xtime.UnixNano]generate.SeriesBlock,
desiredShards []uint32,
) map[xtime.UnixNano]generate.SeriesBlock {
filteredMap := make(map[xtime.UnixNano]generate.SeriesBlock)
for blockStart, series := range seriesMap {
filteredSeries := make([]generate.Series, 0, len(series))
for _, serie := range series {
shard := testSetup.shardSet.Lookup(serie.ID)
for _, ss := range desiredShards {
if ss == shard {
filteredSeries = append(filteredSeries, serie)
break
}
}
}

if len(filteredSeries) > 0 {
filteredMap[blockStart] = filteredSeries
}
}

return filteredMap
}

func verifySeriesMapsEqual(
t *testing.T,
expectedSeriesMap map[xtime.UnixNano]generate.SeriesBlock,
Expand Down
145 changes: 145 additions & 0 deletions src/dbnode/integration/commitlog_bootstrap_unowned_shard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"fmt"
"testing"
"time"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/integration/fake"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/topology"
xtest "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/require"
)

func TestCommitLogBootstrapUnownedShard(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

log := xtest.NewLogger(t)
retentionOpts := retention.NewOptions().
SetRetentionPeriod(20 * time.Hour).
SetBlockSize(2 * time.Hour).
SetBufferPast(10 * time.Minute).
SetBufferFuture(10 * time.Minute)
blockSize := retentionOpts.BlockSize()

ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(retentionOpts))
require.NoError(t, err)
numShards := 6

// Helper function to create node instances for fake cluster service.
node := func(index int, shards []uint32) services.ServiceInstance {
id := fmt.Sprintf("testhost%d", index)
endpoint := fmt.Sprintf("127.0.0.1:%d", multiAddrPortStart+(index*multiAddrPortEach))

result := services.NewServiceInstance().
SetInstanceID(id).
SetEndpoint(endpoint)
resultShards := make([]shard.Shard, len(shards))
for i, id := range shards {
resultShards[i] = shard.NewShard(id).SetState(shard.Available)
}
return result.SetShards(shard.NewShards(resultShards))
}

// Pretend there are two nodes sharing 6 shards (RF1).
node0OwnedShards := []uint32{0, 1, 2}
svc := fake.NewM3ClusterService().
SetInstances([]services.ServiceInstance{
node(0, node0OwnedShards),
node(1, []uint32{3, 4, 5}),
}).
SetReplication(services.NewServiceReplication().SetReplicas(1)).
SetSharding(services.NewServiceSharding().SetNumShards(numShards))
svcs := fake.NewM3ClusterServices()
svcs.RegisterService("m3db", svc)
topoOpts := topology.NewDynamicOptions().
SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil))
topoInit := topology.NewDynamicInitializer(topoOpts)

opts := newTestOptions(t).
SetNamespaces([]namespace.Metadata{ns1}).
SetNumShards(numShards)
setupOpts := []bootstrappableTestSetupOptions{
{disablePeersBootstrapper: true, topologyInitializer: topoInit},
{disablePeersBootstrapper: true, topologyInitializer: topoInit},
}

setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

// Only set this up for the first setup because we're only writing commit
// logs for the first server.
setup := setups[0]
commitLogOpts := setup.storageOpts.CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
setup.storageOpts = setup.storageOpts.SetCommitLogOptions(commitLogOpts)

log.Info("generating data")
now := setup.getNowFn()
seriesMaps := generateSeriesMaps(30, nil, now.Add(-2*blockSize), now.Add(-blockSize))
log.Info("writing data")
// Write commit log with generated data that spreads across all shards
// (including shards that this node should not own). This node should still
// be able to bootstrap successfully with commit log entries from shards
// that it does not own.
writeCommitLogData(t, setup, commitLogOpts, seriesMaps, ns1, false)
log.Info("finished writing data")

// Setup bootstrapper after writing data so filesystem inspection can find it.
setupCommitLogBootstrapperWithFSInspection(t, setup, commitLogOpts)

// Start the servers.
for _, setup := range setups {
require.NoError(t, setup.startServer())
}

// Defer stop the servers.
defer func() {
setups.parallel(func(s *testSetup) {
require.NoError(t, s.stopServer())
})
log.Debug("servers are now down")
}()

// Only fetch blocks for shards owned by node 0.
metadatasByShard, err := m3dbClientFetchBlocksMetadata(
setup.m3dbVerificationAdminClient, testNamespaces[0], node0OwnedShards,
now.Add(-2*blockSize), now, topology.ReadConsistencyLevelMajority)
require.NoError(t, err)

observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard)
// Filter out the written series that node 0 does not own.
filteredSeriesMaps := filterSeriesByShard(setup, seriesMaps, node0OwnedShards)
// Expect to only see data that node 0 owns.
verifySeriesMapsEqual(t, filteredSeriesMaps, observedSeriesMaps)
}
Loading

0 comments on commit 75e4f7d

Please sign in to comment.