Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Add M3Msg client and server for M3Aggregator #2171

Merged
merged 17 commits into from
Apr 19, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 34 additions & 102 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,18 @@ http:
readTimeout: 60s
writeTimeout: 60s

rawtcp:
listenAddress: 0.0.0.0:6000
keepAliveEnabled: true
keepAlivePeriod: 1m
retry:
initialBackoff: 5ms
backoffFactor: 2.0
maxBackoff: 1s
forever: true
jitter: true
readBufferSize: 1440
msgpackIterator:
ignoreHigherVersion: false
readerBufferSize: 1440
largeFloatsSize: 1024
largeFloatsPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
watermark:
low: 0.001
high: 0.002
protobufIterator:
initBufferSize: 1440
maxMessageSize: 50000000 # max message size is 50MB
bytesPool:
buckets:
- count: 1024
capacity: 2048
- count: 512
capacity: 4096
- count: 256
capacity: 8192
- count: 128
capacity: 16384
- count: 64
capacity: 32768
- count: 32
capacity: 65536
m3msg:
robskillington marked this conversation as resolved.
Show resolved Hide resolved
server:
listenAddress: 0.0.0.0:6002
retry:
maxBackoff: 10s
jitter: true
consumer:
messagePool:
size: 16384
watermark:
low: 0.001
high: 0.002
low: 0.2
high: 0.5

kvClient:
etcd:
Expand Down Expand Up @@ -128,39 +88,24 @@ aggregator:
- count: 1024
capacity: 64
client:
placementKV:
namespace: /placement
zone: embedded
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 15s
hashType: murmur32
shardCutoffLingerDuration: 1m
encoder:
initBufferSize: 100
maxMessageSize: 50000000
bytesPool:
buckets:
- capacity: 16
count: 10
- capacity: 32
count: 20
watermark:
low: 0.001
high: 0.01
flushSize: 1440
maxTimerBatchSize: 140
queueSize: 1000
queueDropType: oldest
connection:
connectionTimeout: 1s
connectionKeepAlive: true
writeTimeout: 1s
initReconnectThreshold: 2
maxReconnectThreshold: 5000
reconnectThresholdMultiplier: 2
maxReconnectDuration: 1m
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: override_test_env
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
placementManager:
kvConfig:
namespace: /placement
Expand Down Expand Up @@ -237,29 +182,16 @@ aggregator:
name: m3msg
hashType: murmur32
producer:
buffer:
maxBufferSize: 1000000000 # max buffer before m3msg start dropping data.
writer:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: override_test_env
messageRetry:
initialBackoff: 1m
maxBackoff: 2m
messageQueueNewWritesScanInterval: 1s
ackErrorRetry:
initialBackoff: 2s
maxBackoff: 10s
connection:
dialTimeout: 5s
writeTimeout: 5s
retry:
initialBackoff: 1s
maxBackoff: 10s
flushInterval: 1s
writeBufferSize: 16384
readBufferSize: 256
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
forwarding:
maxSingleDelay: 5s
entryTTL: 6h
Expand Down
44 changes: 18 additions & 26 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,24 @@ clusters:
downsample:
remoteAggregator:
client:
placementKV:
namespace: /placement
environment: override_test_env
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
hashType: murmur32
shardCutoffLingerDuration: 1m
flushSize: 1440
maxTimerBatchSize: 1120
queueSize: 10000
queueDropType: oldest
encoder:
initBufferSize: 2048
maxMessageSize: 10485760
bytesPool:
buckets:
- capacity: 2048
count: 4096
- capacity: 4096
count: 4096
watermark:
low: 0.7
high: 1.0
connection:
writeTimeout: 250ms
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: override_test_env
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5

ingest:
ingester:
Expand Down
35 changes: 27 additions & 8 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,41 @@ curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:
"isolation_group": "availability-zone-a",
"zone": "embedded",
"weight": 100,
"endpoint": "m3aggregator01:6000",
"endpoint": "m3aggregator01:6002",
"hostname": "m3aggregator01",
"port": 6000
"port": 6002
},
{
"id": "m3aggregator02",
"isolation_group": "availability-zone-b",
"zone": "embedded",
"weight": 100,
"endpoint": "m3aggregator02:6000",
"endpoint": "m3aggregator02:6002",
"hostname": "m3aggregator02",
"port": 6000
"port": 6002
}
]
}'

echo "Initializing m3msg topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
echo "Initializing m3msg inbound topic for m3aggregator ingestion from m3coordinators"
curl -vvvsSf -X POST -H "Topic-Name: aggregator_ingest" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

# Do this after placement and topic for m3aggregator is created.
echo "Adding m3aggregator as a consumer to the aggregator ingest topic"
curl -vvvsSf -X POST -H "Topic-Name: aggregator_ingest" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3aggregator",
"environment": "override_test_env",
"zone": "embedded"
},
"consumptionType": "REPLICATED",
"messageTtlNanos": "600000000000"
}
}' # msgs will be discarded after 600000000000ns = 10mins

echo "Initializing m3coordinator topology"
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{
"instances": [
Expand All @@ -74,8 +88,13 @@ echo "Validating m3coordinator topology"
echo "Done validating topology"

# Do this after placement for m3coordinator is created.
echo "Adding m3coordinator as a consumer to the aggregator topic"
curl -vvvsSf -X POST -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
echo "Initializing m3msg outbound topic for m3coordinator ingestion from m3aggregators"
curl -vvvsSf -X POST -H "Topic-Name: aggregated_metrics" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

echo "Adding m3coordinator as a consumer to the aggregator publish topic"
curl -vvvsSf -X POST -H "Topic-Name: aggregated_metrics" -H "Cluster-Environment-Name: override_test_env" localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: "3.5"
services:
dbnode01:
expose:
- "9000-9004"
- "2379-2380"
- "7201"
ports:
- "0.0.0.0:9000-9004:9000-9004"
- "0.0.0.0:2379-2380:2379-2380"
- "0.0.0.0:7201:7201"
networks:
- backend
image: "m3dbnode_integration:${REVISION}"
m3coordinator01:
expose:
- "7202"
- "7203"
- "7204"
ports:
- "0.0.0.0:7202:7202"
- "0.0.0.0:7203:7203"
- "0.0.0.0:7204:7204"
networks:
- backend
image: "m3coordinator_integration:${REVISION}"
volumes:
- "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
m3aggregator01:
expose:
- "6001"
ports:
- "127.0.0.1:6001:6001"
networks:
- backend
environment:
- M3AGGREGATOR_HOST_ID=m3aggregator01
image: "m3aggregator_integration:${REVISION}"
volumes:
- "./m3aggregator.yml:/etc/m3aggregator/m3aggregator.yml"
m3aggregator02:
networks:
- backend
environment:
- M3AGGREGATOR_HOST_ID=m3aggregator02
image: "m3aggregator_integration:${REVISION}"
volumes:
- "./m3aggregator.yml:/etc/m3aggregator/m3aggregator.yml"
networks:
backend:
Loading