From 6bdce9093d00bdbf5c68467bdbf7186b8d6d4cd6 Mon Sep 17 00:00:00 2001 From: weichou Date: Thu, 19 Nov 2020 09:58:58 +0800 Subject: [PATCH] feat: Count event amount to verify all messages are pushed TAF subscribe to northbound MQTT broker to count pushed events and query the reading amount from southbound for verifying. Signed-off-by: weichou --- .../modbus_scalability_test/configuration.py | 6 +- .../configuration.toml | 2 +- .../keywords/scalabilityTest/modbus/run.py | 84 ++++++++++++------- TAF/testCaseModules/keywords/setup/edgex.py | 18 ---- .../scalabilityTest/modbus/modbus.robot | 2 + .../docker/docker-compose-end-to-end.yaml | 3 + TAF/utils/scripts/docker/start-services.sh | 6 -- docs/modbus-scalability-test.md | 4 +- 8 files changed, 66 insertions(+), 59 deletions(-) delete mode 100644 TAF/utils/scripts/docker/start-services.sh diff --git a/TAF/config/modbus_scalability_test/configuration.py b/TAF/config/modbus_scalability_test/configuration.py index 2bd714068..63fad1e0e 100644 --- a/TAF/config/modbus_scalability_test/configuration.py +++ b/TAF/config/modbus_scalability_test/configuration.py @@ -1,13 +1,15 @@ +import os + +WORK_DIR = os.getenv('WORK_DIR') SERVICE_NAME = "edgex-device-modbus" DEVICE_PROFILE_NAME = "test-device-profile" SIMULATOR_HOST = "192.168.x.x" -MQTT_BROKER_IP = "127.0.0.1" +MQTT_BROKER_IP = "192.168.x.x" SLEEP_INTERVAL = 60 DEVICE_INCREMENT = 10 THRESHOLD_CPU_UTILIZATION = 100 THRESHOLD_MEMORY_UTILIZATION = 80 THRESHOLD_MEMORY_USAGE_GROWTH = 20 -THRESHOLD_EVENT_AMOUNT_TOLERANCE = 5 SIMULATOR_NUMBER = 1000 STARTING_PORT = 10000 # PROC_PATH is the path of process information pseudo-filesystem diff --git a/TAF/config/modbus_scalability_test/configuration.toml b/TAF/config/modbus_scalability_test/configuration.toml index 2dd483d52..90b3373fd 100644 --- a/TAF/config/modbus_scalability_test/configuration.toml +++ b/TAF/config/modbus_scalability_test/configuration.toml @@ -13,7 +13,7 @@ Timeout = 5000 ConnectRetries = 3 Labels = [] EnableAsyncReadings = true -AsyncBufferSize = 16 +AsyncBufferSize = 2 [Registry] Host = 'localhost' diff --git a/TAF/testCaseModules/keywords/scalabilityTest/modbus/run.py b/TAF/testCaseModules/keywords/scalabilityTest/modbus/run.py index 69042d2ed..8a195aa79 100644 --- a/TAF/testCaseModules/keywords/scalabilityTest/modbus/run.py +++ b/TAF/testCaseModules/keywords/scalabilityTest/modbus/run.py @@ -11,6 +11,7 @@ from TAF.config.modbus_scalability_test import configuration from TAF.config import global_variables from TAF.testCaseModules.keywords.setup import edgex +from TUC.data.SettingsInfo import SettingsInfo psutil.PROCFS_PATH = configuration.PROC_PATH accumulated_event_amount = 0 @@ -46,7 +47,7 @@ def __init__(self, case_no, device_amount, def on_connect(client, userdata, flags, rc): - logger.info("Connected to MQTT broker with result code " + str(rc), also_console=True) + logger.info("▶ Connected to MQTT broker with result code " + str(rc), also_console=True) client.subscribe("edgex-events") @@ -73,9 +74,9 @@ def initial_mqtt_client(mqtt_client): # fetch_report_info fetches the system information via psutil (https://pypi.org/project/psutil/) def fetch_report_info(): - logger.info(psutil.cpu_count(), also_console=True) - logger.info(psutil.cpu_freq(), also_console=True) - logger.info(psutil.virtual_memory(), also_console=True) + logger.info("cpu_count: {}".format(psutil.cpu_count()), also_console=True) + logger.info("cpu_freq: {}".format(psutil.cpu_freq()), also_console=True) + logger.info("virtual_memory: {}".format(psutil.virtual_memory()), also_console=True) cpu_freq = 0 # The cpu_freq is None in some Virtual machine @@ -121,7 +122,7 @@ def when_run_scalability_testing(): # Start the device service logger.info('▶ Start the device service {}'.format(configuration.SERVICE_NAME), also_console=True) - edgex.start_services(configuration.SERVICE_NAME) + update_device_service_admin_state("UNLOCKED") # Fetch and record ResourceUtilization which is used to generate the report later logger.info('▶ Fetch the resource utilization with {} devices'.format(device_amount), also_console=True) @@ -129,21 +130,24 @@ def when_run_scalability_testing(): # Stop the device service for calculating the event amount logger.info('▶ Stop the device service {}'.format(configuration.SERVICE_NAME), also_console=True) - edgex.stop_services(configuration.SERVICE_NAME) - time.sleep(10) + update_device_service_admin_state("LOCKED") + time.sleep(20) resource_utilization.accumulated_event_amount2 = accumulated_event_amount # Query modbus simulator to get the current reading amount resource_utilization.expected_accumulated_event_amount2 = int(query_simulator_reading_count()/10) - # Fetch CPU and Memory + records.append(resource_utilization) + logger.info('▶▶ CPU1 {}% with MEM1 {}%'.format( resource_utilization.cpu1,resource_utilization.mem1), also_console=True) logger.info('▶▶ CPU2 {}% with MEM2 {}%'.format( resource_utilization.cpu2, resource_utilization.mem2), also_console=True) - - records.append(resource_utilization) + logger.info('▶▶ Expected event amount {}'.format( + resource_utilization.expected_accumulated_event_amount2), also_console=True) + logger.info('▶▶ Actual event amount {}'.format( + resource_utilization.accumulated_event_amount2), also_console=True) # Check if utilization exceed the threshold, the function will throw an error check_threshold(resource_utilization) @@ -169,6 +173,7 @@ def create_devices(amount, starting_port): for i in range(amount): port = starting_port + i device = create_device_with_port(port) + time.sleep(0.5) devices.append(device) return devices @@ -222,22 +227,14 @@ def check_threshold(resource_utilization): resource_utilization.mem2, resource_utilization.mem1, configuration.THRESHOLD_MEMORY_USAGE_GROWTH)) - # if the actual events is less than expected in one minute - amount_different = resource_utilization.expected_accumulated_event_amount2 - resource_utilization.accumulated_event_amount2 - amount_tolerance = (amount_different / resource_utilization.expected_accumulated_event_amount2)*100 - if amount_tolerance > configuration.THRESHOLD_EVENT_AMOUNT_TOLERANCE: + # if the actual events is not equal to the expected + if resource_utilization.accumulated_event_amount2 != resource_utilization.expected_accumulated_event_amount2: raise Exception( - '☉ Stop the testing due to the tolerance {}% of actual events amount is greater than {}%'.format( - amount_tolerance,configuration.THRESHOLD_EVENT_AMOUNT_TOLERANCE)) + '☉ Stop the testing due to the actual event amount {} not equal the expected {}'.format( + resource_utilization.accumulated_event_amount2, resource_utilization.expected_accumulated_event_amount2)) def create_device_with_port(port): - url = '{}://{}:{}/api/v1/device'.format( - global_variables.URI_SCHEME, - global_variables.BASE_URL, - global_variables.CORE_METADATA_PORT, - ) - device = { "name": "device-{}".format(port), "description": "mock device with port {}".format(port), @@ -277,8 +274,18 @@ def create_device_with_port(port): } logger.debug('▶ Created device {}'.format(json.dumps(device))) - res = requests.post(url, data=json.dumps(device)) - res.raise_for_status() + + conn = http.client.HTTPConnection(host=SettingsInfo().constant.BASE_URL, port=global_variables.CORE_METADATA_PORT, timeout=5) + conn.request(method="POST", url="/api/v1/device", body=json.dumps(device)) + try: + r = conn.getresponse() + except Exception as e: + raise e + if int(r.status) == 200: + logger.debug('▶ Create device {} successfully'.format(device["name"])) + else: + logger.error('▶ Fail to create the device {}. {}'.format(device["name"], r.read())) + logger.debug('▶ Created device with the profile "{}" port "{}" '.format(configuration.DEVICE_PROFILE_NAME, port)) return device @@ -292,8 +299,11 @@ def remove_created_devices(created_devices): global_variables.CORE_METADATA_PORT, ) for i in range(len(created_devices)): - res = requests.delete('{}/{}'.format(url,created_devices[i]["name"])) - res.raise_for_status() + try: + res = requests.delete('{}/{}'.format(url, created_devices[i]["name"])) + res.raise_for_status() + except Exception as e: + logger.error('▶ Fail to remove device by name {}, {}'.format(created_devices[i]["name"], e)) def reset_simulator_reading_count(): @@ -308,12 +318,12 @@ def reset_simulator_reading_count(): if int(r.status) == 200: logger.info('▶ Reset simulator reading count to 0', also_console=True) else: - raise Exception("Fail to reset simulator reading count.") + logger.error('▶ Fail to reset simulator reading count.') + raise Exception('Fail to reset simulator reading count.') def query_simulator_reading_count(): try: - logger.info('▶ Query simulator reading count.', also_console=True) conn = http.client.HTTPConnection(host=configuration.SIMULATOR_HOST, port=1503, timeout=10) conn.request(method="GET", url="/reading/count") r = conn.getresponse() @@ -325,4 +335,20 @@ def query_simulator_reading_count(): logger.info('▶ Query simulator reading count {}'.format(count), also_console=True) return count else: - raise Exception("Fail to reset simulator reading count.") + logger.error('▶ Fail to reset simulator reading count.') + raise Exception("Fail to query simulator reading count.") + + +def update_device_service_admin_state(admin_state): + conn = http.client.HTTPConnection(host=SettingsInfo().constant.BASE_URL, port=48081, timeout=5) + conn.request(method="PUT", + url="/api/v1/deviceservice/name/{}/adminstate/{}".format(configuration.SERVICE_NAME, admin_state)) + try: + r1 = conn.getresponse() + except Exception as e: + raise e + if int(r1.status) == 200: + logger.debug('▶ Update device service admin state to {}'.format(admin_state)) + else: + logger.error('▶ Fail to update the admin state.') + raise Exception('Fail to update the admin state') diff --git a/TAF/testCaseModules/keywords/setup/edgex.py b/TAF/testCaseModules/keywords/setup/edgex.py index 7dd1d43a9..9620bd0b8 100644 --- a/TAF/testCaseModules/keywords/setup/edgex.py +++ b/TAF/testCaseModules/keywords/setup/edgex.py @@ -33,24 +33,6 @@ def shutdown_services(*args): run_command(cmd) -def stop_services(*args): - SettingsInfo().TestLog.info("Stop services {}".format(args)) - script_path = "{}/TAF/utils/scripts/{}/stop-services.sh".format( - SettingsInfo().workDir, - SettingsInfo().constant.DEPLOY_TYPE) - cmd = ["sh", script_path, *args] - run_command(cmd) - - -def start_services(*args): - SettingsInfo().TestLog.info("Start services {}".format(args)) - script_path = "{}/TAF/utils/scripts/{}/start-services.sh".format( - SettingsInfo().workDir, - SettingsInfo().constant.DEPLOY_TYPE) - cmd = ["sh", script_path, *args] - run_command(cmd) - - def restart_services(*args): SettingsInfo().TestLog.info("Restart services {}".format(args)) script_path = "{}/TAF/utils/scripts/{}/restart-services.sh".format( diff --git a/TAF/testScenarios/scalabilityTest/modbus/modbus.robot b/TAF/testScenarios/scalabilityTest/modbus/modbus.robot index 312eb42e8..f617592e4 100644 --- a/TAF/testScenarios/scalabilityTest/modbus/modbus.robot +++ b/TAF/testScenarios/scalabilityTest/modbus/modbus.robot @@ -4,6 +4,7 @@ Library TAF/testCaseModules/keywords/scalabilityTest/modbus/report.py Library TAF/testCaseModules/keywords/common/consul.py Library TAF/testCaseModules/keywords/setup/edgex.py Resource TAF/testCaseModules/keywords/common/commonKeywords.robot +Variables TAF/config/modbus_scalability_test/configuration.py Suite Setup Run Keywords Setup Suite ... AND Run Keyword if $SECURITY_SERVICE_NEEDED == 'true' Get Token @@ -19,6 +20,7 @@ ${LOG_FILE_PATH} ${WORK_DIR}/TAF/testArtifacts/logs/modbus_scalability_test Test Modbus scalability Given Modify consul config /v1/kv/edgex/core/1.0/edgex-core-data/Writable/PersistData false And Deploy services scalability-test-mqtt-export + sleep 30 ${report_info} ${records} = When run scalability testing Then generate report ${report_info} ${records} diff --git a/TAF/utils/scripts/docker/docker-compose-end-to-end.yaml b/TAF/utils/scripts/docker/docker-compose-end-to-end.yaml index 67edf485b..6a531c219 100644 --- a/TAF/utils/scripts/docker/docker-compose-end-to-end.yaml +++ b/TAF/utils/scripts/docker/docker-compose-end-to-end.yaml @@ -142,3 +142,6 @@ Writable_LogLevel: DEBUG networks: - edgex-network + depends_on: + - consul + - data diff --git a/TAF/utils/scripts/docker/start-services.sh b/TAF/utils/scripts/docker/start-services.sh deleted file mode 100644 index 8533b30ac..000000000 --- a/TAF/utils/scripts/docker/start-services.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -CONF_DIR=/custom-config -docker run --rm -v ${WORK_DIR}:${WORK_DIR} -w ${WORK_DIR} -v /var/run/docker.sock:/var/run/docker.sock \ - --env WORK_DIR=${WORK_DIR} --env PROFILE=${PROFILE} --security-opt label:disable \ - ${COMPOSE_IMAGE} -f "${WORK_DIR}/TAF/utils/scripts/docker/docker-compose.yaml" start $* \ No newline at end of file diff --git a/docs/modbus-scalability-test.md b/docs/modbus-scalability-test.md index 7806c2886..6d286434b 100644 --- a/docs/modbus-scalability-test.md +++ b/docs/modbus-scalability-test.md @@ -48,7 +48,6 @@ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto THRESHOLD_CPU_UTILIZATION = 100 THRESHOLD_MEMORY_UTILIZATION = 80 THRESHOLD_MEMORY_USAGE_GROWTH = 20 - THRESHOLD_EVENT_AMOUNT_TOLERANCE = 5 SIMULATOR_NUMBER = 1000 STARTING_PORT = 10000 # PROC_PATH is the path of process information pseudo-filesystem @@ -60,8 +59,7 @@ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto - DEVICE_INCREMENT: The device increment for each test case - THRESHOLD_CPU_UTILIZATION: Stop the testing when the CPU utilization reach the specified percentage - THRESHOLD_MEMORY_UTILIZATION = Stop the testing when the memory utilization reach the specified percentage - - THRESHOLD_MEMORY_USAGE_GROWTH = Stop the testing when the second memory usage growth than first - - THRESHOLD_EVENT_AMOUNT_TOLERANCE = Stop the testing when the event amount tolerance + - THRESHOLD_MEMORY_USAGE_GROWTH = Stop the testing when the second memory usage growth than first - SIMULATOR_NUMBER: This value should equal with the simulator's SIMULATOR_NUMBER env - STARTING_PORT: This value should equal with the simulator's STARTING_PORT env, then the test script create the device according to the STARTING_PORT and SIMULATOR_NUMBER - PROC_PATH: This path is used for psutil python lib to collect the system metrics