Skip to content

Commit

Permalink
feat: Count event amount to verify all messages are pushed
Browse files Browse the repository at this point in the history
TAF subscribe to northbound MQTT broker to count pushed events and query the reading amount from southbound for verifying.

Signed-off-by: weichou <[email protected]>
  • Loading branch information
weichou1229 committed Nov 30, 2020
1 parent 7be2a17 commit 6bdce90
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 59 deletions.
6 changes: 4 additions & 2 deletions TAF/config/modbus_scalability_test/configuration.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import os

WORK_DIR = os.getenv('WORK_DIR')
SERVICE_NAME = "edgex-device-modbus"
DEVICE_PROFILE_NAME = "test-device-profile"
SIMULATOR_HOST = "192.168.x.x"
MQTT_BROKER_IP = "127.0.0.1"
MQTT_BROKER_IP = "192.168.x.x"
SLEEP_INTERVAL = 60
DEVICE_INCREMENT = 10
THRESHOLD_CPU_UTILIZATION = 100
THRESHOLD_MEMORY_UTILIZATION = 80
THRESHOLD_MEMORY_USAGE_GROWTH = 20
THRESHOLD_EVENT_AMOUNT_TOLERANCE = 5
SIMULATOR_NUMBER = 1000
STARTING_PORT = 10000
# PROC_PATH is the path of process information pseudo-filesystem
Expand Down
2 changes: 1 addition & 1 deletion TAF/config/modbus_scalability_test/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Timeout = 5000
ConnectRetries = 3
Labels = []
EnableAsyncReadings = true
AsyncBufferSize = 16
AsyncBufferSize = 2

[Registry]
Host = 'localhost'
Expand Down
84 changes: 55 additions & 29 deletions TAF/testCaseModules/keywords/scalabilityTest/modbus/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from TAF.config.modbus_scalability_test import configuration
from TAF.config import global_variables
from TAF.testCaseModules.keywords.setup import edgex
from TUC.data.SettingsInfo import SettingsInfo

psutil.PROCFS_PATH = configuration.PROC_PATH
accumulated_event_amount = 0
Expand Down Expand Up @@ -46,7 +47,7 @@ def __init__(self, case_no, device_amount,


def on_connect(client, userdata, flags, rc):
logger.info("Connected to MQTT broker with result code " + str(rc), also_console=True)
logger.info("Connected to MQTT broker with result code " + str(rc), also_console=True)
client.subscribe("edgex-events")


Expand All @@ -73,9 +74,9 @@ def initial_mqtt_client(mqtt_client):

# fetch_report_info fetches the system information via psutil (https://pypi.org/project/psutil/)
def fetch_report_info():
logger.info(psutil.cpu_count(), also_console=True)
logger.info(psutil.cpu_freq(), also_console=True)
logger.info(psutil.virtual_memory(), also_console=True)
logger.info("cpu_count: {}".format(psutil.cpu_count()), also_console=True)
logger.info("cpu_freq: {}".format(psutil.cpu_freq()), also_console=True)
logger.info("virtual_memory: {}".format(psutil.virtual_memory()), also_console=True)

cpu_freq = 0
# The cpu_freq is None in some Virtual machine
Expand Down Expand Up @@ -121,29 +122,32 @@ def when_run_scalability_testing():

# Start the device service
logger.info('▶ Start the device service {}'.format(configuration.SERVICE_NAME), also_console=True)
edgex.start_services(configuration.SERVICE_NAME)
update_device_service_admin_state("UNLOCKED")

# Fetch and record ResourceUtilization which is used to generate the report later
logger.info('▶ Fetch the resource utilization with {} devices'.format(device_amount), also_console=True)
resource_utilization = fetch_metric(case_no, device_amount)

# Stop the device service for calculating the event amount
logger.info('▶ Stop the device service {}'.format(configuration.SERVICE_NAME), also_console=True)
edgex.stop_services(configuration.SERVICE_NAME)
time.sleep(10)
update_device_service_admin_state("LOCKED")
time.sleep(20)

resource_utilization.accumulated_event_amount2 = accumulated_event_amount

# Query modbus simulator to get the current reading amount
resource_utilization.expected_accumulated_event_amount2 = int(query_simulator_reading_count()/10)

# Fetch CPU and Memory
records.append(resource_utilization)

logger.info('▶▶ CPU1 {}% with MEM1 {}%'.format(
resource_utilization.cpu1,resource_utilization.mem1), also_console=True)
logger.info('▶▶ CPU2 {}% with MEM2 {}%'.format(
resource_utilization.cpu2, resource_utilization.mem2), also_console=True)

records.append(resource_utilization)
logger.info('▶▶ Expected event amount {}'.format(
resource_utilization.expected_accumulated_event_amount2), also_console=True)
logger.info('▶▶ Actual event amount {}'.format(
resource_utilization.accumulated_event_amount2), also_console=True)

# Check if utilization exceed the threshold, the function will throw an error
check_threshold(resource_utilization)
Expand All @@ -169,6 +173,7 @@ def create_devices(amount, starting_port):
for i in range(amount):
port = starting_port + i
device = create_device_with_port(port)
time.sleep(0.5)
devices.append(device)
return devices

Expand Down Expand Up @@ -222,22 +227,14 @@ def check_threshold(resource_utilization):
resource_utilization.mem2, resource_utilization.mem1,
configuration.THRESHOLD_MEMORY_USAGE_GROWTH))

# if the actual events is less than expected in one minute
amount_different = resource_utilization.expected_accumulated_event_amount2 - resource_utilization.accumulated_event_amount2
amount_tolerance = (amount_different / resource_utilization.expected_accumulated_event_amount2)*100
if amount_tolerance > configuration.THRESHOLD_EVENT_AMOUNT_TOLERANCE:
# if the actual events is not equal to the expected
if resource_utilization.accumulated_event_amount2 != resource_utilization.expected_accumulated_event_amount2:
raise Exception(
'☉ Stop the testing due to the tolerance {}% of actual events amount is greater than {}%'.format(
amount_tolerance,configuration.THRESHOLD_EVENT_AMOUNT_TOLERANCE))
'☉ Stop the testing due to the actual event amount {} not equal the expected {}'.format(
resource_utilization.accumulated_event_amount2, resource_utilization.expected_accumulated_event_amount2))


def create_device_with_port(port):
url = '{}://{}:{}/api/v1/device'.format(
global_variables.URI_SCHEME,
global_variables.BASE_URL,
global_variables.CORE_METADATA_PORT,
)

device = {
"name": "device-{}".format(port),
"description": "mock device with port {}".format(port),
Expand Down Expand Up @@ -277,8 +274,18 @@ def create_device_with_port(port):
}

logger.debug('▶ Created device {}'.format(json.dumps(device)))
res = requests.post(url, data=json.dumps(device))
res.raise_for_status()

conn = http.client.HTTPConnection(host=SettingsInfo().constant.BASE_URL, port=global_variables.CORE_METADATA_PORT, timeout=5)
conn.request(method="POST", url="/api/v1/device", body=json.dumps(device))
try:
r = conn.getresponse()
except Exception as e:
raise e
if int(r.status) == 200:
logger.debug('▶ Create device {} successfully'.format(device["name"]))
else:
logger.error('▶ Fail to create the device {}. {}'.format(device["name"], r.read()))

logger.debug('▶ Created device with the profile "{}" port "{}" '.format(configuration.DEVICE_PROFILE_NAME, port))

return device
Expand All @@ -292,8 +299,11 @@ def remove_created_devices(created_devices):
global_variables.CORE_METADATA_PORT,
)
for i in range(len(created_devices)):
res = requests.delete('{}/{}'.format(url,created_devices[i]["name"]))
res.raise_for_status()
try:
res = requests.delete('{}/{}'.format(url, created_devices[i]["name"]))
res.raise_for_status()
except Exception as e:
logger.error('▶ Fail to remove device by name {}, {}'.format(created_devices[i]["name"], e))


def reset_simulator_reading_count():
Expand All @@ -308,12 +318,12 @@ def reset_simulator_reading_count():
if int(r.status) == 200:
logger.info('▶ Reset simulator reading count to 0', also_console=True)
else:
raise Exception("Fail to reset simulator reading count.")
logger.error('▶ Fail to reset simulator reading count.')
raise Exception('Fail to reset simulator reading count.')


def query_simulator_reading_count():
try:
logger.info('▶ Query simulator reading count.', also_console=True)
conn = http.client.HTTPConnection(host=configuration.SIMULATOR_HOST, port=1503, timeout=10)
conn.request(method="GET", url="/reading/count")
r = conn.getresponse()
Expand All @@ -325,4 +335,20 @@ def query_simulator_reading_count():
logger.info('▶ Query simulator reading count {}'.format(count), also_console=True)
return count
else:
raise Exception("Fail to reset simulator reading count.")
logger.error('▶ Fail to reset simulator reading count.')
raise Exception("Fail to query simulator reading count.")


def update_device_service_admin_state(admin_state):
conn = http.client.HTTPConnection(host=SettingsInfo().constant.BASE_URL, port=48081, timeout=5)
conn.request(method="PUT",
url="/api/v1/deviceservice/name/{}/adminstate/{}".format(configuration.SERVICE_NAME, admin_state))
try:
r1 = conn.getresponse()
except Exception as e:
raise e
if int(r1.status) == 200:
logger.debug('▶ Update device service admin state to {}'.format(admin_state))
else:
logger.error('▶ Fail to update the admin state.')
raise Exception('Fail to update the admin state')
18 changes: 0 additions & 18 deletions TAF/testCaseModules/keywords/setup/edgex.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,6 @@ def shutdown_services(*args):
run_command(cmd)


def stop_services(*args):
SettingsInfo().TestLog.info("Stop services {}".format(args))
script_path = "{}/TAF/utils/scripts/{}/stop-services.sh".format(
SettingsInfo().workDir,
SettingsInfo().constant.DEPLOY_TYPE)
cmd = ["sh", script_path, *args]
run_command(cmd)


def start_services(*args):
SettingsInfo().TestLog.info("Start services {}".format(args))
script_path = "{}/TAF/utils/scripts/{}/start-services.sh".format(
SettingsInfo().workDir,
SettingsInfo().constant.DEPLOY_TYPE)
cmd = ["sh", script_path, *args]
run_command(cmd)


def restart_services(*args):
SettingsInfo().TestLog.info("Restart services {}".format(args))
script_path = "{}/TAF/utils/scripts/{}/restart-services.sh".format(
Expand Down
2 changes: 2 additions & 0 deletions TAF/testScenarios/scalabilityTest/modbus/modbus.robot
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Library TAF/testCaseModules/keywords/scalabilityTest/modbus/report.py
Library TAF/testCaseModules/keywords/common/consul.py
Library TAF/testCaseModules/keywords/setup/edgex.py
Resource TAF/testCaseModules/keywords/common/commonKeywords.robot
Variables TAF/config/modbus_scalability_test/configuration.py

Suite Setup Run Keywords Setup Suite
... AND Run Keyword if $SECURITY_SERVICE_NEEDED == 'true' Get Token
Expand All @@ -19,6 +20,7 @@ ${LOG_FILE_PATH} ${WORK_DIR}/TAF/testArtifacts/logs/modbus_scalability_test
Test Modbus scalability
Given Modify consul config /v1/kv/edgex/core/1.0/edgex-core-data/Writable/PersistData false
And Deploy services scalability-test-mqtt-export
sleep 30
${report_info} ${records} = When run scalability testing
Then generate report ${report_info} ${records}

3 changes: 3 additions & 0 deletions TAF/utils/scripts/docker/docker-compose-end-to-end.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@
Writable_LogLevel: DEBUG
networks:
- edgex-network
depends_on:
- consul
- data
6 changes: 0 additions & 6 deletions TAF/utils/scripts/docker/start-services.sh

This file was deleted.

4 changes: 1 addition & 3 deletions docs/modbus-scalability-test.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
THRESHOLD_CPU_UTILIZATION = 100
THRESHOLD_MEMORY_UTILIZATION = 80
THRESHOLD_MEMORY_USAGE_GROWTH = 20
THRESHOLD_EVENT_AMOUNT_TOLERANCE = 5
SIMULATOR_NUMBER = 1000
STARTING_PORT = 10000
# PROC_PATH is the path of process information pseudo-filesystem
Expand All @@ -60,8 +59,7 @@ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
- DEVICE_INCREMENT: The device increment for each test case
- THRESHOLD_CPU_UTILIZATION: Stop the testing when the CPU utilization reach the specified percentage
- THRESHOLD_MEMORY_UTILIZATION = Stop the testing when the memory utilization reach the specified percentage
- THRESHOLD_MEMORY_USAGE_GROWTH = Stop the testing when the second memory usage growth than first
- THRESHOLD_EVENT_AMOUNT_TOLERANCE = Stop the testing when the event amount tolerance
- THRESHOLD_MEMORY_USAGE_GROWTH = Stop the testing when the second memory usage growth than first
- SIMULATOR_NUMBER: This value should equal with the simulator's SIMULATOR_NUMBER env
- STARTING_PORT: This value should equal with the simulator's STARTING_PORT env, then the test script create the device according to the STARTING_PORT and SIMULATOR_NUMBER
- PROC_PATH: This path is used for psutil python lib to collect the system metrics
Expand Down

0 comments on commit 6bdce90

Please sign in to comment.