Skip to content

Commit

Permalink
[aggregator] Add M3Msg client and server for M3Aggregator (#2171)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Apr 19, 2020
1 parent 6017e79 commit 7c81c68
Show file tree
Hide file tree
Showing 34 changed files with 1,818 additions and 321 deletions.
3 changes: 1 addition & 2 deletions scripts/development/m3_stack/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ scrape_configs:

- job_name: 'aggregator'
static_configs:
- targets: ['m3aggregator01:6002']
- targets: ['m3aggregator02:6002']
- targets: ['m3aggregator01:6002', 'm3aggregator01:6002']

remote_read:
- url: http://m3coordinator01:7201/api/v1/prom/remote/read
Expand Down
140 changes: 36 additions & 104 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,64 +69,24 @@ metrics:
samplingRate: 1.0
extended: none

m3msg:
server:
listenAddress: 0.0.0.0:6000
retry:
maxBackoff: 10s
jitter: true
consumer:
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5

http:
listenAddress: 0.0.0.0:6001
readTimeout: 60s
writeTimeout: 60s

rawtcp:
listenAddress: 0.0.0.0:6000
keepAliveEnabled: true
keepAlivePeriod: 1m
retry:
initialBackoff: 5ms
backoffFactor: 2.0
maxBackoff: 1s
forever: true
jitter: true
readBufferSize: 1440
msgpackIterator:
ignoreHigherVersion: false
readerBufferSize: 1440
largeFloatsSize: 1024
largeFloatsPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
watermark:
low: 0.001
high: 0.002
protobufIterator:
initBufferSize: 1440
maxMessageSize: 50000000 # max message size is 50MB
bytesPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
watermark:
low: 0.001
high: 0.002

kvClient:
etcd:
env: override_test_env
Expand Down Expand Up @@ -187,39 +147,24 @@ aggregator:
- count: 1024
capacity: 64
client:
placementKV:
namespace: /placement
zone: embedded
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 15s
hashType: murmur32
shardCutoffLingerDuration: 1m
encoder:
initBufferSize: 100
maxMessageSize: 50000000
bytesPool:
buckets:
- capacity: 16
count: 10
- capacity: 32
count: 20
watermark:
low: 0.001
high: 0.01
flushSize: 1440
maxTimerBatchSize: 140
queueSize: 1000
queueDropType: oldest
connection:
connectionTimeout: 1s
connectionKeepAlive: true
writeTimeout: 1s
initReconnectThreshold: 2
maxReconnectThreshold: 5000
reconnectThresholdMultiplier: 2
maxReconnectDuration: 1m
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: override_test_env
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
placementManager:
kvConfig:
namespace: /placement
Expand Down Expand Up @@ -297,29 +242,16 @@ aggregator:
name: m3msg
hashType: murmur32
producer:
buffer:
maxBufferSize: 1000000000 # max buffer before m3msg start dropping data.
writer:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: override_test_env
messageRetry:
initialBackoff: 1m
maxBackoff: 2m
messageQueueNewWritesScanInterval: 1s
ackErrorRetry:
initialBackoff: 2s
maxBackoff: 10s
connection:
dialTimeout: 5s
writeTimeout: 5s
retry:
initialBackoff: 1s
maxBackoff: 10s
flushInterval: 1s
writeBufferSize: 16384
readBufferSize: 256
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
passthrough:
enabled: true
forwarding:
Expand Down
44 changes: 18 additions & 26 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,24 @@ downsample:
retention: 6h
remoteAggregator:
client:
placementKV:
namespace: /placement
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
maxTimerBatchSize: 1120
queueSize: 10000
queueDropType: oldest
encoder:
initBufferSize: 2048
maxMessageSize: 10485760
bytesPool:
buckets:
- capacity: 2048
count: 4096
- capacity: 4096
count: 4096
watermark:
low: 0.7
high: 1.0
connection:
writeTimeout: 250ms
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: override_test_env
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5

ingest:
ingester:
Expand Down
27 changes: 23 additions & 4 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,25 @@ curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:
]
}'

echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
echo "Initializing m3msg inbound topic for m3aggregator ingestion from m3coordinators"
curl -vvvsSf -X POST -H "Topic-Name: aggregator_ingest" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

# Do this after placement and topic for m3aggregator is created.
echo "Adding m3aggregator as a consumer to the aggregator ingest topic"
curl -vvvsSf -X POST -H "Topic-Name: aggregator_ingest" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3aggregator",
"environment": "override_test_env",
"zone": "embedded"
},
"consumptionType": "REPLICATED",
"messageTtlNanos": "600000000000"
}
}' # msgs will be discarded after 600000000000ns = 10mins

echo "Initializing m3coordinator topology"
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{
"instances": [
Expand All @@ -81,8 +95,13 @@ echo "Validating m3coordinator topology"
echo "Done validating topology"

# Do this after placement for m3coordinator is created.
echo "Adding m3coordinator as a consumer to the aggregator topic"
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
echo "Initializing m3msg outbound topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST -H "Topic-Name: aggregated_metrics" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

echo "Adding m3coordinator as a consumer to the aggregator publish topic"
curl -vvvsSf -X POST -H "Topic-Name: aggregated_metrics" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: "3.5"
services:
dbnode01:
expose:
- "9000-9004"
- "2379-2380"
- "7201"
ports:
- "0.0.0.0:9000-9004:9000-9004"
- "0.0.0.0:2379-2380:2379-2380"
- "0.0.0.0:7201:7201"
networks:
- backend
image: "m3dbnode_integration:${REVISION}"
m3coordinator01:
expose:
- "7202"
- "7203"
- "7204"
ports:
- "0.0.0.0:7202:7202"
- "0.0.0.0:7203:7203"
- "0.0.0.0:7204:7204"
networks:
- backend
image: "m3coordinator_integration:${REVISION}"
volumes:
- "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
m3aggregator01:
expose:
- "6001"
ports:
- "127.0.0.1:6001:6001"
networks:
- backend
environment:
- M3AGGREGATOR_HOST_ID=m3aggregator01
image: "m3aggregator_integration:${REVISION}"
volumes:
- "./m3aggregator.yml:/etc/m3aggregator/m3aggregator.yml"
m3aggregator02:
networks:
- backend
environment:
- M3AGGREGATOR_HOST_ID=m3aggregator02
image: "m3aggregator_integration:${REVISION}"
volumes:
- "./m3aggregator.yml:/etc/m3aggregator/m3aggregator.yml"
networks:
backend:
Loading

0 comments on commit 7c81c68

Please sign in to comment.