diff --git a/docker/m3aggregator/Dockerfile b/docker/m3aggregator/Dockerfile new file mode 100644 index 0000000000..b4f2ad9828 --- /dev/null +++ b/docker/m3aggregator/Dockerfile @@ -0,0 +1,29 @@ +# stage 1: build +FROM golang:1.10-alpine AS builder +LABEL maintainer="The M3DB Authors " + +# Install Glide +RUN apk add --update glide git make bash + +# Add source code +RUN mkdir -p /go/src/github.com/m3db/m3 +ADD . /go/src/github.com/m3db/m3 + +# Build m3dbnode binary +RUN cd /go/src/github.com/m3db/m3/ && \ + git submodule update --init && \ + make m3aggregator-linux-amd64 + +# stage 2: lightweight "release" +FROM alpine:latest +LABEL maintainer="The M3DB Authors " + +EXPOSE 5000/tcp 6000/tcp 60001/tcp 7203/tcp 9000-9004/tcp + +RUN apk add --no-cache curl jq + +COPY --from=builder /go/src/github.com/m3db/m3/bin/m3aggregator /bin/ +COPY --from=builder /go/src/github.com/m3db/m3/src/aggregator/config/m3aggregator.yml /etc/m3aggregator/m3aggregator.yml + +ENTRYPOINT [ "/bin/m3aggregator" ] +CMD [ "-f", "/etc/m3aggregator/m3aggregator.yml" ] diff --git a/docker/m3collector/Dockerfile b/docker/m3collector/Dockerfile new file mode 100644 index 0000000000..44bc4b9663 --- /dev/null +++ b/docker/m3collector/Dockerfile @@ -0,0 +1,29 @@ +# stage 1: build +FROM golang:1.10-alpine AS builder +LABEL maintainer="The M3DB Authors " + +# Install Glide +RUN apk add --update glide git make bash + +# Add source code +RUN mkdir -p /go/src/github.com/m3db/m3 +ADD . /go/src/github.com/m3db/m3 + +# Build m3dbnode binary +RUN cd /go/src/github.com/m3db/m3/ && \ + git submodule update --init && \ + make m3collector-linux-amd64 + +# stage 2: lightweight "release" +FROM alpine:latest +LABEL maintainer="The M3DB Authors " + +EXPOSE 7206-7207/tcp + +RUN apk add --no-cache curl jq + +COPY --from=builder /go/src/github.com/m3db/m3/bin/m3collector /bin/ +COPY --from=builder /go/src/github.com/m3db/m3/src/collector/config/m3collector.yml /etc/m3collector/m3collector.yml + +ENTRYPOINT [ "/bin/m3collector" ] +CMD [ "-f", "/etc/m3collector/m3collector.yml" ] diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index 3b59e541d1..65ef32c4d6 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -8,7 +8,7 @@ services: dockerfile: ./docker/m3dbnode/Dockerfile image: m3dbnode01:latest volumes: - - "./dbnode_config.yml:/etc/m3dbnode/m3dbnode.yml" + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" environment: - M3DB_HOST_ID=m3db_seed m3db_data01: @@ -19,7 +19,7 @@ services: dockerfile: ./docker/m3dbnode/Dockerfile image: m3dbnode02:latest volumes: - - "./dbnode_config.yml:/etc/m3dbnode/m3dbnode.yml" + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" environment: - M3DB_HOST_ID=m3db_data01 m3db_data02: @@ -30,10 +30,21 @@ services: dockerfile: ./docker/m3dbnode/Dockerfile image: m3dbnode03:latest volumes: - - "./dbnode_config.yml:/etc/m3dbnode/m3dbnode.yml" + - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" environment: - M3DB_HOST_ID=m3db_data02 - coordinator01: + m3aggregator01: + networks: + - backend + build: + context: ../../../ + dockerfile: ./docker/m3aggregator/Dockerfile + image: m3aggregator01:latest + volumes: + - "./m3aggregator.yml:/etc/m3aggregator/m3aggregator.yml" + environment: + - M3AGGREGATOR_HOST_ID=m3aggregator01 + m3coordinator01: expose: - "7201" - "7203" @@ -49,7 +60,22 @@ services: dockerfile: ./docker/m3coordinator/Dockerfile image: m3coordinator01:latest volumes: - - "./:/etc/m3coordinator/" + - "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml" + m3collector01: + expose: + - "7206" + - "7207" + ports: + - "0.0.0.0:7206:7206" + - "0.0.0.0:7207:7207" + networks: + - backend + build: + context: ../../../ + dockerfile: ./docker/m3collector/Dockerfile + image: m3collector01:latest + volumes: + - "./m3collector.yml:/etc/m3collector/m3collector.yml" prometheus01: expose: - "9090" @@ -60,7 +86,7 @@ services: image: prom/prometheus:latest volumes: - "./:/etc/prometheus/" - grafana2: + grafana: build: context: ../../../ dockerfile: ./docker/grafana/Dockerfile diff --git a/src/aggregator/config/m3aggregator.yaml b/scripts/development/m3_stack/m3aggregator.yml similarity index 71% rename from src/aggregator/config/m3aggregator.yaml rename to scripts/development/m3_stack/m3aggregator.yml index f45601538b..c57f35d2fa 100644 --- a/src/aggregator/config/m3aggregator.yaml +++ b/scripts/development/m3_stack/m3aggregator.yml @@ -1,36 +1,20 @@ -# Copyright (c) 2017 Uber Technologies, Inc. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - logging: level: info metrics: scope: - reportingInterval: 1s - m3: - hostPort: 127.0.0.1:5000 # local collector host port for m3 metrics - service: m3aggregator - env: test - includeHost: true - samplingRate: 0.01 - extended: moderate + prefix: m3aggregator + prometheus: + onError: none + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: none + +http: + listenAddress: 0.0.0.0:6001 + readTimeout: 60s + writeTimeout: 60s rawtcp: listenAddress: 0.0.0.0:6000 @@ -85,15 +69,21 @@ rawtcp: low: 0.001 high: 0.002 -http: - listenAddress: 0.0.0.0:6001 - readTimeout: 45s - writeTimeout: 45s +kvClient: + etcd: + env: default_env + zone: embedded + service: m3aggregator + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - m3db_seed:2379 runtimeOptions: kvConfig: - namespace: /m3aggregator - environment: test + environment: default_env + zone: embedded writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second writeValuesPerMetricLimitPerSecond: 0 writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second @@ -101,7 +91,13 @@ runtimeOptions: writeNewMetricNoLimitWarmupDuration: 0 aggregator: - metricPrefix: stats. + hostID: + resolver: environment + envVarName: M3AGGREGATOR_HOST_ID + metricPrefix: "" + counterPrefix: "" + timerPrefix: "" + gaugePrefix: "" aggregationTypes: counterTransformFnType: empty timerTransformFnType: suffix @@ -131,14 +127,13 @@ aggregator: capacity: 64 client: placementKV: - zone: testZone - environment: testEnvironment - namespace: testNamespace + namespace: /placement + zone: embedded + environment: default_env placementWatcher: - key: testWatchKey + key: m3aggregator initWatchTimeout: 15s hashType: murmur32 - shardCutoverWarmupDuration: 10m shardCutoffLingerDuration: 1m encoder: initBufferSize: 100 @@ -166,10 +161,11 @@ aggregator: maxReconnectDuration: 1m placementManager: kvConfig: - namespace: /m3aggregator - environment: test + namespace: /placement + environment: default_env + zone: embedded placementWatcher: - key: placement + key: m3aggregator initWatchTimeout: 10s hashType: murmur32 bufferDurationBeforeShardCutover: 10m @@ -177,8 +173,8 @@ aggregator: resignTimeout: 1m flushTimesManager: kvConfig: - namespace: /m3aggregator - environment: test + environment: default_env + zone: embedded flushTimesKeyFmt: shardset/%d/flush flushTimesPersistRetrier: initialBackoff: 100ms @@ -192,7 +188,8 @@ aggregator: ttlSeconds: 10 serviceID: name: m3aggregator - environment: test + environment: default_env + zone: embedded electionKeyFmt: shardset/%d/lock campaignRetrier: initialBackoff: 100ms @@ -233,11 +230,40 @@ aggregator: maxBufferSize: 5m forcedFlushWindowSize: 10s flush: + writer: + maxBufferSize: 1440 + encodingTimeSamplingRate: 0.01 + bufferedEncoderPool: + size: 16384 handlers: - - staticBackend: - type: logging - - staticBackend: - type: blackhole + - dynamicBackend: + name: m3msg + hashType: murmur32 + totalShards: 64 + producer: + buffer: + maxBufferSize: 1000000000 # max buffer before m3msg start dropping data. + writer: + topicName: aggregated_metrics + topicServiceOverride: + zone: embedded + environment: default_env + messageRetry: + initialBackoff: 1m + maxBackoff: 2m + messageQueueNewWritesScanInterval: 1s + ackErrorRetry: + initialBackoff: 2s + maxBackoff: 10s + connection: + dialTimeout: 5s + writeTimeout: 5s + retry: + initialBackoff: 1s + maxBackoff: 10s + flushInterval: 1s + writeBufferSize: 16384 + readBufferSize: 256 forwarding: maxSingleDelay: 5s entryTTL: 6h @@ -245,7 +271,6 @@ aggregator: maxTimerBatchSizePerWrite: 140 defaultStoragePolicies: - 10s:2d - - 1m:40d maxNumCachedSourceSets: 2 discardNaNAggregatedValues: true entryPool: diff --git a/scripts/development/m3_stack/m3collector.yml b/scripts/development/m3_stack/m3collector.yml new file mode 100644 index 0000000000..ce940de31d --- /dev/null +++ b/scripts/development/m3_stack/m3collector.yml @@ -0,0 +1,92 @@ +listenAddress: + type: config + value: 0.0.0.0:7206 + +metrics: + scope: + prefix: collector + prometheus: + onError: none + handlerPath: /metrics + listenAddress: 0.0.0.0:7207 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +etcd: + env: default_env + zone: embedded + service: m3collector + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - m3db_seed:2379 + +reporter: + cache: + capacity: 200000 + freshDuration: 5m + stutterDuration: 1m + + matcher: + initWatchTimeout: 10s + rulesKVConfig: + namespace: /rules + namespacesKey: namespaces + ruleSetKeyFmt: rulesets/%s + namespaceTag: application + defaultNamespace: global + nameTagKey: __name__ + matchRangePast: 2m + sortedTagIteratorPool: + size: 8192 + watermark: + low: 0.7 + high: 1.0 + + client: + placementKV: + namespace: /placement + placementWatcher: + key: m3aggregator + initWatchTimeout: 10s + hashType: murmur32 + shardCutoffLingerDuration: 1m + flushSize: 1440 + maxTimerBatchSize: 1120 + queueSize: 10000 + queueDropType: oldest + encoder: + initBufferSize: 2048 + maxMessageSize: 10485760 + bytesPool: + buckets: + - capacity: 2048 + count: 4096 + - capacity: 4096 + count: 4096 + watermark: + low: 0.7 + high: 1.0 + connection: + writeTimeout: 250ms + + clock: + maxPositiveSkew: 2m + maxNegativeSkew: 2m + + sortedTagIteratorPool: + size: 8192 + watermark: + low: 0.7 + high: 1.0 + +logging: + level: info + encoding: json + outputPaths: + - stdout + errorOutputPaths: + - stderr + diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index bc79f8b180..e26d89f471 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -14,9 +14,13 @@ metrics: clusters: - namespaces: - - namespace: prometheus_metrics + - namespace: metrics_0_30m type: unaggregated + retention: 30m + - namespace: metrics_10s_48h + type: aggregated retention: 48h + resolution: 10s client: config: service: diff --git a/scripts/development/m3_stack/dbnode_config.yml b/scripts/development/m3_stack/m3dbnode.yml similarity index 100% rename from scripts/development/m3_stack/dbnode_config.yml rename to scripts/development/m3_stack/m3dbnode.yml diff --git a/scripts/development/m3_stack/prometheus.yml b/scripts/development/m3_stack/prometheus.yml index a54b09463a..be23cd0b81 100644 --- a/scripts/development/m3_stack/prometheus.yml +++ b/scripts/development/m3_stack/prometheus.yml @@ -30,14 +30,14 @@ scrape_configs: - job_name: 'coordinator' static_configs: - - targets: ['coordinator01:7203'] + - targets: ['m3coordinator01:7203'] - job_name: 'dbnode' static_configs: - targets: ['m3db_seed:7203', 'm3db_data01:7203', 'm3db_data02:7203'] remote_read: - - url: http://coordinator01:7201/api/v1/prom/remote/read + - url: http://m3coordinator01:7201/api/v1/prom/remote/read remote_write: - - url: http://coordinator01:7201/api/v1/prom/remote/write + - url: http://m3coordinator01:7201/api/v1/prom/remote/write diff --git a/scripts/development/m3_stack/start_m3.sh b/scripts/development/m3_stack/start_m3.sh index 45b069d015..56d429c0b7 100755 --- a/scripts/development/m3_stack/start_m3.sh +++ b/scripts/development/m3_stack/start_m3.sh @@ -2,13 +2,65 @@ set -xe +DOCKER_ARGS="-d --renew-anon-volumes" +if [[ "$FORCE_BUILD" = true ]] ; then + DOCKER_ARGS="--build -d --renew-anon-volumes" +fi + echo "Bringing up nodes in the backgorund with docker compose, remember to run ./stop.sh when done" -docker-compose -f docker-compose.yml up -d --renew-anon-volumes +docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01 +docker-compose -f docker-compose.yml up $DOCKER_ARGS m3db_seed +docker-compose -f docker-compose.yml up $DOCKER_ARGS prometheus01 +docker-compose -f docker-compose.yml up $DOCKER_ARGS grafana + +if [[ "$MULTI_DB_NODE" = true ]] ; then + echo "Running multi node" + docker-compose -f docker-compose.yml up $DOCKER_ARGS m3db_data01 + docker-compose -f docker-compose.yml up $DOCKER_ARGS m3db_data02 +else + echo "Running single node" +fi + +if [[ "$AGGREGATOR_PIPELINE" = true ]]; then + echo "Running aggregator pipeline" + docker-compose -f docker-compose.yml up $DOCKER_ARGS m3aggregator01 + docker-compose -f docker-compose.yml up $DOCKER_ARGS m3collector01 +else + echo "Not running aggregator pipeline" +fi + + +echo "Sleeping to wait for nodes to initialize" +sleep 10 + echo "Nodes online" -echo "Initializing namespace" +echo "Initializing namespaces" +curl -vvvsSf -X POST localhost:7201/api/v1/namespace -d '{ + "name": "metrics_0_30m", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "30m", + "blockSizeDuration": "10m", + "bufferFutureDuration": "5m", + "bufferPastDuration": "5m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "10m" + } + } +}' curl -vvvsSf -X POST localhost:7201/api/v1/namespace -d '{ - "name": "prometheus_metrics", + "name": "metrics_10s_48h", "options": { "bootstrapEnabled": true, "flushEnabled": true, @@ -17,103 +69,142 @@ curl -vvvsSf -X POST localhost:7201/api/v1/namespace -d '{ "snapshotEnabled": true, "repairEnabled": false, "retentionOptions": { - "retentionPeriodNanos": 172800000000000, - "blockSizeNanos": 7200000000000, - "bufferFutureNanos": 600000000000, - "bufferPastNanos": 600000000000, + "retentionPeriodDuration": "48h", + "blockSizeDuration": "4h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodNanos": 300000000000 + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" }, "indexOptions": { "enabled": true, - "blockSizeNanos": 7200000000000 + "blockSizeDuration": "4h" } } }' -echo "Done initializing namespace" +echo "Done initializing namespaces" echo "Validating namespace" -[ "$(curl -sSf localhost:7201/api/v1/namespace | jq .registry.namespaces.prometheus_metrics.indexOptions.enabled)" == true ] +[ "$(curl -sSf localhost:7201/api/v1/namespace | jq .registry.namespaces.metrics_0_30m.indexOptions.enabled)" == true ] +[ "$(curl -sSf localhost:7201/api/v1/namespace | jq .registry.namespaces.metrics_10s_48h.indexOptions.enabled)" == true ] echo "Done validating namespace" -echo "Initializing M3DB topology" -curl -vvvsSf -X POST localhost:7201/api/v1/placement/init -d '{ - "num_shards": 64, - "replication_factor": 3, - "instances": [ - { - "id": "m3db_seed", - "isolation_group": "rack-a", - "zone": "embedded", - "weight": 1024, - "endpoint": "m3db_seed:9000", - "hostname": "m3db_seed", - "port": 9000 - }, - { - "id": "m3db_data01", - "isolation_group": "rack-b", - "zone": "embedded", - "weight": 1024, - "endpoint": "m3db_data01:9000", - "hostname": "m3db_data01", - "port": 9000 - }, - { - "id": "m3db_data02", - "isolation_group": "rack-c", - "zone": "embedded", - "weight": 1024, - "endpoint": "m3db_data02:9000", - "hostname": "m3db_data02", - "port": 9000 - } - ] -}' -echo "Done initializing M3DB topology" +echo "Initializing topology" +if [[ "$MULTI_DB_NODE" = true ]] ; then + curl -vvvsSf -X POST localhost:7201/api/v1/placement/init -d '{ + "num_shards": 64, + "replication_factor": 3, + "instances": [ + { + "id": "m3db_seed", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "m3db_seed:9000", + "hostname": "m3db_seed", + "port": 9000 + }, + { + "id": "m3db_data01", + "isolation_group": "rack-b", + "zone": "embedded", + "weight": 1024, + "endpoint": "m3db_data01:9000", + "hostname": "m3db_data01", + "port": 9000 + }, + { + "id": "m3db_data02", + "isolation_group": "rack-c", + "zone": "embedded", + "weight": 1024, + "endpoint": "m3db_data02:9000", + "hostname": "m3db_data02", + "port": 9000 + } + ] + }' +else + curl -vvvsSf -X POST localhost:7201/api/v1/placement/init -d '{ + "num_shards": 64, + "replication_factor": 1, + "instances": [ + { + "id": "m3db_seed", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "m3db_seed:9000", + "hostname": "m3db_seed", + "port": 9000 + } + ] + }' +fi echo "Validating topology" [ "$(curl -sSf localhost:7201/api/v1/placement | jq .placement.instances.m3db_seed.id)" == '"m3db_seed"' ] echo "Done validating topology" -echo "Initializing M3Coordinator topology" -curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{ - "instances": [ - { - "id": "coordinator01", - "zone": "embedded", - "endpoint": "coordinator01:7507", - "hostname": "coordinator01", - "port": 7507 - } - ] -}' -echo "Done initializing M3Coordinator topology" +if [[ "$AGGREGATOR_PIPELINE" = true ]]; then + curl -vvvsSf -X POST localhost:7201/api/v1/services/m3aggregator/placement/init -d '{ + "num_shards": 64, + "replication_factor": 1, + "instances": [ + { + "id": "m3aggregator01:6000", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "m3aggregator01:6000", + "hostname": "m3aggregator01", + "port": 6000 + } + ] + }' -echo "Validating M3Coordinator topology" -[ "$(curl -sSf localhost:7201/api/v1/services/m3coordinator/placement | jq .placement.instances.coordinator01.id)" == '"coordinator01"' ] -echo "Done validating topology" + echo "Initializing M3Coordinator topology" + curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{ + "instances": [ + { + "id": "m3coordinator01", + "zone": "embedded", + "endpoint": "m3coordinator01:7507", + "hostname": "m3coordinator01", + "port": 7507 + } + ] + }' + echo "Done initializing M3Coordinator topology" -# Do this after placement for m3coordinator is created. -echo "Initializing m3msg topic for ingestion" -curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{ - "numberOfShards": 64 -}' + echo "Validating M3Coordinator topology" + [ "$(curl -sSf localhost:7201/api/v1/services/m3coordinator/placement | jq .placement.instances.m3coordinator01.id)" == '"m3coordinator01"' ] + echo "Done validating topology" -echo "Adding m3coordinator as a consumer to the topic" -curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{ - "consumerService": { - "serviceId": { - "name": "m3coordinator", - "environment": "default_env", - "zone": "embedded" - }, - "consumptionType": "SHARED", - "messageTtlNanos": "600000000000" - } -}' -# msgs will be discarded after 600000000000ns = 10mins + # Do this after placement for m3coordinator is created. + echo "Initializing m3msg topic for ingestion" + curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{ + "numberOfShards": 64 + }' + + echo "Adding m3coordinator as a consumer to the topic" + curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{ + "consumerService": { + "serviceId": { + "name": "m3coordinator", + "environment": "default_env", + "zone": "embedded" + }, + "consumptionType": "SHARED", + "messageTtlNanos": "600000000000" + } + }' # msgs will be discarded after 600000000000ns = 10mins + + # May not necessarily flush + echo "Sending unaggregated metric to m3collector" + curl http://localhost:7206/api/v1/json/report -X POST -d '{"metrics":[{"type":"gauge","value":42,"tags":{"__name__":"foo_metric","foo":"bar"}}]}' +fi echo "Prometheus available at localhost:9090" echo "Grafana available at localhost:3000" -echo "Run ./stop.sh to shutdown nodes when done" \ No newline at end of file +echo "Run ./stop.sh to shutdown nodes when done" diff --git a/src/aggregator/config/m3aggregator.yml b/src/aggregator/config/m3aggregator.yml new file mode 100644 index 0000000000..c57f35d2fa --- /dev/null +++ b/src/aggregator/config/m3aggregator.yml @@ -0,0 +1,283 @@ +logging: + level: info + +metrics: + scope: + prefix: m3aggregator + prometheus: + onError: none + handlerPath: /metrics + sanitization: prometheus + samplingRate: 1.0 + extended: none + +http: + listenAddress: 0.0.0.0:6001 + readTimeout: 60s + writeTimeout: 60s + +rawtcp: + listenAddress: 0.0.0.0:6000 + keepAliveEnabled: true + keepAlivePeriod: 1m + retry: + initialBackoff: 5ms + backoffFactor: 2.0 + maxBackoff: 1s + forever: true + jitter: true + readBufferSize: 1440 + msgpackIterator: + ignoreHigherVersion: false + readerBufferSize: 1440 + largeFloatsSize: 1024 + largeFloatsPool: + buckets: + - count: 1024 + capacity: 2048 + - count: 512 + capacity: 4096 + - count: 256 + capacity: 8192 + - count: 128 + capacity: 16384 + - count: 64 + capacity: 32768 + - count: 32 + capacity: 65536 + watermark: + low: 0.001 + high: 0.002 + protobufIterator: + initBufferSize: 1440 + maxMessageSize: 50000000 # max message size is 50MB + bytesPool: + buckets: + - count: 1024 + capacity: 2048 + - count: 512 + capacity: 4096 + - count: 256 + capacity: 8192 + - count: 128 + capacity: 16384 + - count: 64 + capacity: 32768 + - count: 32 + capacity: 65536 + watermark: + low: 0.001 + high: 0.002 + +kvClient: + etcd: + env: default_env + zone: embedded + service: m3aggregator + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - m3db_seed:2379 + +runtimeOptions: + kvConfig: + environment: default_env + zone: embedded + writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second + writeValuesPerMetricLimitPerSecond: 0 + writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second + writeNewMetricLimitClusterPerSecond: 0 + writeNewMetricNoLimitWarmupDuration: 0 + +aggregator: + hostID: + resolver: environment + envVarName: M3AGGREGATOR_HOST_ID + metricPrefix: "" + counterPrefix: "" + timerPrefix: "" + gaugePrefix: "" + aggregationTypes: + counterTransformFnType: empty + timerTransformFnType: suffix + gaugeTransformFnType: empty + aggregationTypesPool: + size: 1024 + quantilesPool: + buckets: + - count: 256 + capacity: 4 + - count: 128 + capacity: 8 + stream: + eps: 0.001 + capacity: 32 + streamPool: + size: 4096 + samplePool: + size: 4096 + floatsPool: + buckets: + - count: 4096 + capacity: 16 + - count: 2048 + capacity: 32 + - count: 1024 + capacity: 64 + client: + placementKV: + namespace: /placement + zone: embedded + environment: default_env + placementWatcher: + key: m3aggregator + initWatchTimeout: 15s + hashType: murmur32 + shardCutoffLingerDuration: 1m + encoder: + initBufferSize: 100 + maxMessageSize: 50000000 + bytesPool: + buckets: + - capacity: 16 + count: 10 + - capacity: 32 + count: 20 + watermark: + low: 0.001 + high: 0.01 + flushSize: 1440 + maxTimerBatchSize: 140 + queueSize: 1000 + queueDropType: oldest + connection: + connectionTimeout: 1s + connectionKeepAlive: true + writeTimeout: 1s + initReconnectThreshold: 2 + maxReconnectThreshold: 5000 + reconnectThresholdMultiplier: 2 + maxReconnectDuration: 1m + placementManager: + kvConfig: + namespace: /placement + environment: default_env + zone: embedded + placementWatcher: + key: m3aggregator + initWatchTimeout: 10s + hashType: murmur32 + bufferDurationBeforeShardCutover: 10m + bufferDurationAfterShardCutoff: 10m + resignTimeout: 1m + flushTimesManager: + kvConfig: + environment: default_env + zone: embedded + flushTimesKeyFmt: shardset/%d/flush + flushTimesPersistRetrier: + initialBackoff: 100ms + backoffFactor: 2.0 + maxBackoff: 2s + maxRetries: 3 + electionManager: + election: + leaderTimeout: 10s + resignTimeout: 10s + ttlSeconds: 10 + serviceID: + name: m3aggregator + environment: default_env + zone: embedded + electionKeyFmt: shardset/%d/lock + campaignRetrier: + initialBackoff: 100ms + backoffFactor: 2.0 + maxBackoff: 2s + forever: true + jitter: true + changeRetrier: + initialBackoff: 100ms + backoffFactor: 2.0 + maxBackoff: 5s + forever: true + jitter: true + resignRetrier: + initialBackoff: 100ms + backoffFactor: 2.0 + maxBackoff: 5s + forever: true + jitter: true + campaignStateCheckInterval: 1s + shardCutoffCheckOffset: 30s + flushManager: + checkEvery: 1s + jitterEnabled: true + maxJitters: + - flushInterval: 5s + maxJitterPercent: 1.0 + - flushInterval: 10s + maxJitterPercent: 0.5 + - flushInterval: 1m + maxJitterPercent: 0.5 + - flushInterval: 10m + maxJitterPercent: 0.5 + - flushInterval: 1h + maxJitterPercent: 0.25 + numWorkersPerCPU: 0.5 + flushTimesPersistEvery: 10s + maxBufferSize: 5m + forcedFlushWindowSize: 10s + flush: + writer: + maxBufferSize: 1440 + encodingTimeSamplingRate: 0.01 + bufferedEncoderPool: + size: 16384 + handlers: + - dynamicBackend: + name: m3msg + hashType: murmur32 + totalShards: 64 + producer: + buffer: + maxBufferSize: 1000000000 # max buffer before m3msg start dropping data. + writer: + topicName: aggregated_metrics + topicServiceOverride: + zone: embedded + environment: default_env + messageRetry: + initialBackoff: 1m + maxBackoff: 2m + messageQueueNewWritesScanInterval: 1s + ackErrorRetry: + initialBackoff: 2s + maxBackoff: 10s + connection: + dialTimeout: 5s + writeTimeout: 5s + retry: + initialBackoff: 1s + maxBackoff: 10s + flushInterval: 1s + writeBufferSize: 16384 + readBufferSize: 256 + forwarding: + maxSingleDelay: 5s + entryTTL: 6h + entryCheckInterval: 10m + maxTimerBatchSizePerWrite: 140 + defaultStoragePolicies: + - 10s:2d + maxNumCachedSourceSets: 2 + discardNaNAggregatedValues: true + entryPool: + size: 4096 + counterElemPool: + size: 4096 + timerElemPool: + size: 4096 + gaugeElemPool: + size: 4096 diff --git a/src/cmd/services/m3aggregator/config/aggregator.go b/src/cmd/services/m3aggregator/config/aggregator.go index a709d9009f..54b0d50101 100644 --- a/src/cmd/services/m3aggregator/config/aggregator.go +++ b/src/cmd/services/m3aggregator/config/aggregator.go @@ -43,6 +43,7 @@ import ( "github.com/m3db/m3metrics/pipeline/applied" "github.com/m3db/m3metrics/policy" "github.com/m3db/m3x/clock" + "github.com/m3db/m3x/config/hostid" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" "github.com/m3db/m3x/retry" @@ -56,20 +57,23 @@ var ( // AggregatorConfiguration contains aggregator configuration. type AggregatorConfiguration struct { + // HostID is the local host ID configuration. + HostID *hostid.Configuration `yaml:"hostID"` + // AggregationTypes configs the aggregation types. AggregationTypes aggregation.TypesConfiguration `yaml:"aggregationTypes"` // Common metric prefix. - MetricPrefix string `yaml:"metricPrefix"` + MetricPrefix *string `yaml:"metricPrefix"` // Counter metric prefix. - CounterPrefix string `yaml:"counterPrefix"` + CounterPrefix *string `yaml:"counterPrefix"` // Timer metric prefix. - TimerPrefix string `yaml:"timerPrefix"` + TimerPrefix *string `yaml:"timerPrefix"` // Gauge metric prefix. - GaugePrefix string `yaml:"gaugePrefix"` + GaugePrefix *string `yaml:"gaugePrefix"` // Stream configuration for computing quantiles. Stream streamConfiguration `yaml:"stream"` @@ -193,7 +197,7 @@ func (c *AggregatorConfiguration) NewAggregatorOptions( opts = opts.SetAdminClient(adminClient) // Set instance id. - instanceID, err := instanceID(address) + instanceID, err := c.newInstanceID(address) if err != nil { return nil, err } @@ -354,6 +358,27 @@ func (c *AggregatorConfiguration) NewAggregatorOptions( return opts, nil } +func (c *AggregatorConfiguration) newInstanceID(address string) (string, error) { + var ( + hostName string + err error + ) + if c.HostID != nil { + hostName, err = c.HostID.Resolve() + } else { + hostName, err = os.Hostname() + } + if err != nil { + return "", fmt.Errorf("error determining host name: %v", err) + } + + _, port, err := net.SplitHostPort(address) + if err != nil { + return "", fmt.Errorf("error parsing server address %s: %v", address, err) + } + return net.JoinHostPort(hostName, port), nil +} + func bufferForPastTimedMetricFn(buffer time.Duration) aggregator.BufferForPastTimedMetricFn { return func(resolution time.Duration) time.Duration { return buffer + resolution @@ -723,23 +748,11 @@ type metricPrefixSetter func(b []byte) aggregator.Options func setMetricPrefix( opts aggregator.Options, - str string, + str *string, fn metricPrefixSetter, ) aggregator.Options { - if str == "" { + if str == nil { return opts } - return fn([]byte(str)) -} - -func instanceID(address string) (string, error) { - hostName, err := os.Hostname() - if err != nil { - return "", fmt.Errorf("error determining host name: %v", err) - } - _, port, err := net.SplitHostPort(address) - if err != nil { - return "", fmt.Errorf("error parsing server address %s: %v", address, err) - } - return net.JoinHostPort(hostName, port), nil + return fn([]byte(*str)) } diff --git a/src/collector/config/m3collector.yml b/src/collector/config/m3collector.yml index 9d7195a69d..15bf496756 100644 --- a/src/collector/config/m3collector.yml +++ b/src/collector/config/m3collector.yml @@ -6,6 +6,7 @@ metrics: scope: prefix: collector prometheus: + onError: none handlerPath: /metrics listenAddress: 0.0.0.0:7207 # until https://github.com/m3db/m3/issues/682 is resolved sanitization: prometheus @@ -51,8 +52,7 @@ reporter: key: m3aggregator initWatchTimeout: 10s hashType: murmur32 - shardCutoverWarmupDuration: 1h - shardCutoffLingerDuration: 10m + shardCutoffLingerDuration: 1m flushSize: 1440 maxTimerBatchSize: 1120 queueSize: 10000 @@ -85,3 +85,7 @@ reporter: logging: level: info encoding: json + outputPaths: + - stdout + errorOutputPaths: + - stderr diff --git a/src/collector/server/server.go b/src/collector/server/server.go index f8721aa06f..73b07fa488 100644 --- a/src/collector/server/server.go +++ b/src/collector/server/server.go @@ -74,8 +74,10 @@ func Run(runOpts RunOptions) { fmt.Fprintf(os.Stderr, "unable to create logger: %v", err) os.Exit(1) } + defer logger.Sync() + logger.Info("creating metrics scope") scope, closer, err := cfg.Metrics.NewRootScope() if err != nil { logger.Fatal("could not connect to metrics", zap.Error(err)) @@ -86,11 +88,13 @@ func Run(runOpts RunOptions) { SetMetricsScope(scope). SetZapLogger(logger) + logger.Info("creating etcd client") clusterClient, err := cfg.Etcd.NewClient(instrumentOpts) if err != nil { logger.Fatal("could not create etcd client", zap.Error(err)) } + logger.Info("creating reporter") reporter, err := newReporter(cfg.Reporter, clusterClient, instrumentOpts) if err != nil { logger.Fatal("could not create reporter", zap.Error(err)) @@ -113,6 +117,7 @@ func Run(runOpts RunOptions) { tagDecoderPoolOptions) tagDecoderPool.Init() + logger.Info("creating http handlers and registering routes") handler, err := httpd.NewHandler(reporter, tagEncoderPool, tagDecoderPool, instrumentOpts) if err != nil { @@ -173,27 +178,33 @@ func newReporter( instrumentOpts instrument.Options, ) (reporter.Reporter, error) { scope := instrumentOpts.MetricsScope() + logger := instrumentOpts.ZapLogger() clockOpts := cfg.Clock.NewOptions() + logger.Info("creating metrics matcher cache") cache := cfg.Cache.NewCache(clockOpts, instrumentOpts.SetMetricsScope(scope.SubScope("cache"))) + logger.Info("creating metrics matcher") matcher, err := cfg.Matcher.NewMatcher(cache, clusterClient, clockOpts, instrumentOpts.SetMetricsScope(scope.SubScope("matcher"))) if err != nil { return nil, fmt.Errorf("unable to create matcher: %v", err) } + logger.Info("creating aggregator client") aggClient, err := cfg.Client.NewClient(clusterClient, clockOpts, instrumentOpts.SetMetricsScope(scope.SubScope("backend"))) if err != nil { return nil, fmt.Errorf("unable to create agg tier client: %v", err) } + logger.Info("connecting to aggregator cluster") if err := aggClient.Init(); err != nil { return nil, fmt.Errorf("unable to initialize agg tier client: %v", err) } + logger.Info("creating aggregator reporter") reporterOpts := m3aggregator.NewReporterOptions(). SetClockOptions(clockOpts). SetInstrumentOptions(instrumentOpts) diff --git a/src/query/api/v1/handler/placement/common.go b/src/query/api/v1/handler/placement/common.go index 3ceda534b2..27399604e0 100644 --- a/src/query/api/v1/handler/placement/common.go +++ b/src/query/api/v1/handler/placement/common.go @@ -211,7 +211,6 @@ func ServiceWithAlgo( opts ServiceOptions, now time.Time, ) (placement.Service, placement.Algorithm, error) { - overrides := services.NewOverrideOptions() switch opts.ServiceName { case M3AggregatorServiceName: diff --git a/src/query/server/server.go b/src/query/server/server.go index 48cf68c478..bea005dde5 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -225,21 +225,29 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { + logger.Info("starting m3msg server ") ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } + server, err := cfg.Ingest.M3Msg.NewServer( ingester.Ingest, instrumentOptions.SetMetricsScope(scope.SubScope("m3msg")), ) + if err != nil { logger.Fatal("unable to create m3msg server", zap.Error(err)) } + if err := server.ListenAndServe(); err != nil { logger.Fatal("unable to listen on ingest server", zap.Error(err)) } + + logger.Info("started m3msg server ") defer server.Close() + } else { + logger.Info("no m3msg server configured") } var interruptCh <-chan error = make(chan error)