From 18f78a07f2082445169fe25dfb7bfe403e466906 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Mon, 23 Oct 2023 15:35:14 -0700 Subject: [PATCH] Add Device Advisor Jobs tests (#491) --- deviceadvisor/README.md | 9 +- deviceadvisor/script/DATestConfig.json | 5 + deviceadvisor/script/DATestRun.py | 33 +- .../tests/DATestUtils/DATestUtils.java | 7 +- deviceadvisor/tests/JobExecution/pom.xml | 46 +++ .../main/java/jobExecution/JobExecution.java | 290 ++++++++++++++++++ 6 files changed, 374 insertions(+), 16 deletions(-) create mode 100644 deviceadvisor/tests/JobExecution/pom.xml create mode 100644 deviceadvisor/tests/JobExecution/src/main/java/jobExecution/JobExecution.java diff --git a/deviceadvisor/README.md b/deviceadvisor/README.md index b493d5a5..f29bc9af 100644 --- a/deviceadvisor/README.md +++ b/deviceadvisor/README.md @@ -104,9 +104,16 @@ for named shadows. ### Named Shadow Update -Validates if your device reads all update messages received and synchronizes the device's state to match the desired +Validates if a device reads all update messages received and synchronizes the device's state to match the desired state properties. This test executes [Device updates reported state to desired state (Happy case)](https://docs.aws.amazon.com/iot/latest/developerguide/device-advisor-tests-shadow.html#update) for named shadows. + +### Job Execution + +Validates if a device is able to receive updates using AWS IoT Jobs, and publish the status of successful updates. + +This test executes +[Device can complete a job execution](https://docs.aws.amazon.com/iot/latest/developerguide/device-advisor-tests-job-execution.html). diff --git a/deviceadvisor/script/DATestConfig.json b/deviceadvisor/script/DATestConfig.json index 94d829b2..ec32555f 100644 --- a/deviceadvisor/script/DATestConfig.json +++ b/deviceadvisor/script/DATestConfig.json @@ -37,6 +37,11 @@ "test_suite_id" : "6s2hyjq6dkmb", "test_exe_path" : "ShadowUpdate", "cmd_args" : "--named-shadow" + }, + { + "test_name" : "Job Execution", + "test_suite_id" : "6y02cc8096vl", + "test_exe_path" : "JobExecution" } ] } diff --git a/deviceadvisor/script/DATestRun.py b/deviceadvisor/script/DATestRun.py index 72727be8..972b2a41 100644 --- a/deviceadvisor/script/DATestRun.py +++ b/deviceadvisor/script/DATestRun.py @@ -41,8 +41,6 @@ def process_logs(log_group, log_stream, thing_name): f.close() try: - secrets_client = boto3.client( - "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) s3_bucket_name = secrets_client.get_secret_value( SecretId="ci/DeviceAdvisor/s3bucket")["SecretString"] s3.Bucket(s3_bucket_name).upload_file(log_file, log_file) @@ -72,6 +70,8 @@ def sleep_with_backoff(base, max): deviceAdvisor = boto3.client( 'iotdeviceadvisor', region_name=os.environ["AWS_DEFAULT_REGION"]) s3 = boto3.resource('s3', region_name=os.environ["AWS_DEFAULT_REGION"]) + secrets_client = boto3.client( + "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) except Exception: print("[Device Advisor] Error: could not create boto3 clients.", file=sys.stderr) exit(-1) @@ -113,35 +113,44 @@ def sleep_with_backoff(base, max): disabled = test_suite.get('disabled', False) if disabled: - print(f"[Device Advisor] Info: " - "{test_name} test suite is disabled, skipping", file=sys.stderr) + print("[Device Advisor] Info: " + f"{test_name} test suite is disabled, skipping", file=sys.stderr) continue ############################################## # create a test thing thing_name = "DATest_" + str(uuid.uuid4()) try: + thing_group = secrets_client.get_secret_value( + SecretId="ci/DeviceAdvisor/thing_group")["SecretString"] # create_thing_response: # { # 'thingName': 'string', # 'thingArn': 'string', # 'thingId': 'string' # } - print("[Device Advisor] Info: Started to create thing...", file=sys.stderr) + print("[Device Advisor] Info: Started to create thing " + f"'{thing_name}'", file=sys.stderr) create_thing_response = client.create_thing( thingName=thing_name ) os.environ["DA_THING_NAME"] = thing_name - except Exception: - print("[Device Advisor] Error: Failed to create thing: " + - thing_name, file=sys.stderr) + # Some tests (e.g. Jobs) require the tested things to be a part of the DA group thing. + client.add_thing_to_thing_group( + thingGroupName=thing_group, + thingName=thing_name, + ) + + except Exception as e: + print(f"[Device Advisor] Error: Failed to create thing '{thing_name}'; " + f"exception: {e}", file=sys.stderr) exit(-1) ############################################## # create certificate and keys used for testing try: - print("[Device Advisor]Info: Started to create certificate...", + print("[Device Advisor] Info: Started to create certificate...", file=sys.stderr) # create_cert_response: # { @@ -187,8 +196,6 @@ def sleep_with_backoff(base, max): ############################################## # attach policy to certificate try: - secrets_client = boto3.client( - "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) policy_name = secrets_client.get_secret_value( SecretId="ci/DeviceAdvisor/policy_name")["SecretString"] client.attach_policy( @@ -352,8 +359,8 @@ def sleep_with_backoff(base, max): except Exception as e: delete_thing_with_certi(thing_name, certificate_id, certificate_arn) - print("[Device Advisor]Error: Failed to test: " + - test_name + ", exception: " + e, file=sys.stderr) + print(f"[Device Advisor] Error: Failed to test: {test_name}; exception: {e}", + file=sys.stderr) did_at_least_one_test_fail = True sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) diff --git a/deviceadvisor/tests/DATestUtils/DATestUtils.java b/deviceadvisor/tests/DATestUtils/DATestUtils.java index 3dda88fd..813c76d6 100644 --- a/deviceadvisor/tests/DATestUtils/DATestUtils.java +++ b/deviceadvisor/tests/DATestUtils/DATestUtils.java @@ -5,7 +5,7 @@ public class DATestUtils { public enum TestType { - CONNECT, SUB_PUB, SHADOW + CONNECT, SUB_PUB, SHADOW, JOBS } private final static String ENV_ENDPONT = "DA_ENDPOINT"; @@ -47,10 +47,13 @@ public static Boolean init(TestType type) { return false; } - else if (type == TestType.SHADOW && (thing_name.isEmpty() || shadowProperty.isEmpty() || shadowValue.isEmpty())) + if (type == TestType.SHADOW && (thing_name.isEmpty() || shadowProperty.isEmpty() || shadowValue.isEmpty())) { return false; } + if (type == TestType.JOBS && thing_name.isEmpty()) { + return false; + } return true; } } diff --git a/deviceadvisor/tests/JobExecution/pom.xml b/deviceadvisor/tests/JobExecution/pom.xml new file mode 100644 index 00000000..98b834d1 --- /dev/null +++ b/deviceadvisor/tests/JobExecution/pom.xml @@ -0,0 +1,46 @@ + + 4.0.0 + software.amazon.awssdk.iotdevicesdk + JobExecution + jar + 1.0-SNAPSHOT + ${project.groupId}:${project.artifactId} + Java bindings for the AWS IoT Core Service + https://github.com/awslabs/aws-iot-device-sdk-java-v2 + + 1.8 + 1.8 + UTF-8 + + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.0.0-SNAPSHOT + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.2.0 + + + add-source + generate-sources + + add-source + + + + ../DATestUtils + + + + + + + + diff --git a/deviceadvisor/tests/JobExecution/src/main/java/jobExecution/JobExecution.java b/deviceadvisor/tests/JobExecution/src/main/java/jobExecution/JobExecution.java new file mode 100644 index 00000000..2e23703a --- /dev/null +++ b/deviceadvisor/tests/JobExecution/src/main/java/jobExecution/JobExecution.java @@ -0,0 +1,290 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package JobExecution; + +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.QualityOfService; +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; +import software.amazon.awssdk.iot.iotjobs.IotJobsClient; +import software.amazon.awssdk.iot.iotjobs.model.DescribeJobExecutionRequest; +import software.amazon.awssdk.iot.iotjobs.model.DescribeJobExecutionResponse; +import software.amazon.awssdk.iot.iotjobs.model.DescribeJobExecutionSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.GetPendingJobExecutionsRequest; +import software.amazon.awssdk.iot.iotjobs.model.GetPendingJobExecutionsResponse; +import software.amazon.awssdk.iot.iotjobs.model.GetPendingJobExecutionsSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.JobExecutionSummary; +import software.amazon.awssdk.iot.iotjobs.model.JobStatus; +import software.amazon.awssdk.iot.iotjobs.model.RejectedError; +import software.amazon.awssdk.iot.iotjobs.model.StartNextJobExecutionResponse; +import software.amazon.awssdk.iot.iotjobs.model.StartNextPendingJobExecutionRequest; +import software.amazon.awssdk.iot.iotjobs.model.StartNextPendingJobExecutionSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionRequest; +import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.NextJobExecutionChangedSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.NextJobExecutionChangedEvent; +import software.amazon.awssdk.iot.iotjobs.model.JobExecutionsChangedSubscriptionRequest; +import software.amazon.awssdk.iot.iotjobs.model.JobExecutionsChangedEvent; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.UUID; + +import DATestUtils.DATestUtils; + +public class JobExecution { + static String clientId = "test-" + UUID.randomUUID().toString(); + static short port = 8883; + + static MqttClientConnection connection; + static IotJobsClient jobs; + static CompletableFuture gotResponse; + static List availableJobs = new LinkedList<>(); + static String currentJobId; + static long currentExecutionNumber = 0; + static int currentVersionNumber = 0; + + static void onRejectedError(RejectedError error) { + System.out.println("Request rejected: " + error.code.toString() + ": " + error.message); + System.exit(1); + } + + static void onGetPendingJobExecutionsAccepted(GetPendingJobExecutionsResponse response) { + System.out.println( + "Pending Jobs: " + (response.queuedJobs.size() + response.inProgressJobs.size() == 0 ? "none" : "")); + for (JobExecutionSummary job : response.inProgressJobs) { + availableJobs.add(job.jobId); + System.out.println(" In Progress: " + job.jobId + " @ " + job.lastUpdatedAt.toString()); + } + for (JobExecutionSummary job : response.queuedJobs) { + availableJobs.add(job.jobId); + System.out.println(" " + job.jobId + " @ " + job.lastUpdatedAt.toString()); + } + gotResponse.complete(null); + } + + static void onDescribeJobExecutionAccepted(DescribeJobExecutionResponse response) { + System.out + .println("Describe Job: " + response.execution.jobId + " version: " + response.execution.versionNumber); + if (response.execution.jobDocument != null) { + response.execution.jobDocument.forEach((key, value) -> { + System.out.println(" " + key + ": " + value); + }); + } + gotResponse.complete(null); + } + + static void onStartNextPendingJobExecutionAccepted(StartNextJobExecutionResponse response) { + System.out.println("Start Job: " + response.execution.jobId); + currentJobId = response.execution.jobId; + currentExecutionNumber = response.execution.executionNumber; + currentVersionNumber = response.execution.versionNumber; + gotResponse.complete(null); + } + + static MqttClientConnection createConnection() { + try (AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder + .newMtlsBuilderFromPath(DATestUtils.certificatePath, DATestUtils.keyPath)) { + builder.withClientId(clientId) + .withEndpoint(DATestUtils.endpoint) + .withPort(port) + .withCleanSession(true) + .withProtocolOperationTimeoutMs(60000); + + MqttClientConnection connection = builder.build(); + return connection; + } catch (Exception ex) { + throw new RuntimeException("Failed to create connection", ex); + } + } + + static void getPendingJobs() throws RuntimeException { + gotResponse = new CompletableFuture<>(); + GetPendingJobExecutionsSubscriptionRequest subscriptionRequest = new GetPendingJobExecutionsSubscriptionRequest(); + subscriptionRequest.thingName = DATestUtils.thing_name; + System.out.println("Subscribing to GetPendingJobExecutionsAccepted for thing '" + + DATestUtils.thing_name + "'"); + CompletableFuture subscribed = jobs.SubscribeToGetPendingJobExecutionsAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onGetPendingJobExecutionsAccepted); + try { + subscribed.get(); + System.out.println("Subscribed to GetPendingJobExecutionsAccepted"); + } catch (Exception ex) { + throw new RuntimeException("Failed to subscribe to GetPendingJobExecutionsAccepted", ex); + } + + subscribed = jobs.SubscribeToGetPendingJobExecutionsRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onRejectedError); + try { + subscribed.get(); + System.out.println("Subscribed to GetPendingJobExecutionsRejected"); + } catch (Exception ex) { + throw new RuntimeException("Failed to subscribe to GetPendingJobExecutionsRejected", ex); + } + + GetPendingJobExecutionsRequest publishRequest = new GetPendingJobExecutionsRequest(); + publishRequest.thingName = DATestUtils.thing_name; + CompletableFuture published = jobs.PublishGetPendingJobExecutions( + publishRequest, + QualityOfService.AT_LEAST_ONCE); + try { + published.get(); + System.out.println("Published to GetPendingJobExecutions"); + } catch (Exception ex) { + throw new RuntimeException("Failed to publish to GetPendingJobExecutions", ex); + } + + // Waiting for either onGetPendingJobExecutionsAccepted or onRejectedError to be + // called. + try { + gotResponse.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred while waiting for pending Jobs", ex); + } + } + + static void getJobDescriptions() throws RuntimeException { + for (String jobId : availableJobs) { + gotResponse = new CompletableFuture<>(); + DescribeJobExecutionSubscriptionRequest subscriptionRequest = new DescribeJobExecutionSubscriptionRequest(); + subscriptionRequest.thingName = DATestUtils.thing_name; + subscriptionRequest.jobId = jobId; + jobs.SubscribeToDescribeJobExecutionAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onDescribeJobExecutionAccepted); + jobs.SubscribeToDescribeJobExecutionRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onRejectedError); + + DescribeJobExecutionRequest publishRequest = new DescribeJobExecutionRequest(); + publishRequest.thingName = DATestUtils.thing_name; + publishRequest.jobId = jobId; + publishRequest.includeJobDocument = true; + publishRequest.executionNumber = 1L; + jobs.PublishDescribeJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + + // Waiting for either onDescribeJobExecutionAccepted or onRejectedError to be + // called. + try { + gotResponse.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred while waiting for Job descriptions", ex); + } + } + } + + static void startNextPendingJob() throws RuntimeException { + gotResponse = new CompletableFuture<>(); + + StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest = new StartNextPendingJobExecutionSubscriptionRequest(); + subscriptionRequest.thingName = DATestUtils.thing_name; + + jobs.SubscribeToStartNextPendingJobExecutionAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onStartNextPendingJobExecutionAccepted); + jobs.SubscribeToStartNextPendingJobExecutionRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onRejectedError); + + StartNextPendingJobExecutionRequest publishRequest = new StartNextPendingJobExecutionRequest(); + publishRequest.thingName = DATestUtils.thing_name; + publishRequest.stepTimeoutInMinutes = 15L; + jobs.PublishStartNextPendingJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + + // Waiting for either onStartNextPendingJobExecutionAccepted or onRejectedError + // to be called. + try { + gotResponse.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred while waiting for starting next pending Job", ex); + } + } + + static void updateCurrentJobStatus(JobStatus jobStatus) throws RuntimeException { + gotResponse = new CompletableFuture<>(); + + UpdateJobExecutionSubscriptionRequest subscriptionRequest = new UpdateJobExecutionSubscriptionRequest(); + subscriptionRequest.thingName = DATestUtils.thing_name; + subscriptionRequest.jobId = currentJobId; + jobs.SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + (response) -> { + System.out.println("Marked job " + currentJobId + " " + jobStatus.toString()); + gotResponse.complete(null); + }); + jobs.SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobExecution::onRejectedError); + + UpdateJobExecutionRequest publishRequest = new UpdateJobExecutionRequest(); + publishRequest.thingName = DATestUtils.thing_name; + publishRequest.jobId = currentJobId; + publishRequest.executionNumber = currentExecutionNumber; + publishRequest.status = jobStatus; + publishRequest.expectedVersion = currentVersionNumber++; + jobs.PublishUpdateJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + + // Waiting for a response to our update. + try { + gotResponse.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred while waiting for updating Job", ex); + } + } + + public static void main(String[] args) { + // Set vars + if (!DATestUtils.init(DATestUtils.TestType.JOBS)) { + throw new RuntimeException("Failed to initialize environment variables."); + } + + try (MqttClientConnection connection = createConnection()) { + jobs = new IotJobsClient(connection); + CompletableFuture connected = connection.connect(); + try { + boolean sessionPresent = connected.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred during connect", ex); + } + + getPendingJobs(); + + // This step is optional for the DA Job test, but perform it anyway to follow a + // supposed flow. + getJobDescriptions(); + + for (int jobIdx = 0; jobIdx < availableJobs.size(); ++jobIdx) { + startNextPendingJob(); + updateCurrentJobStatus(JobStatus.IN_PROGRESS); + // Fake doing something + Thread.sleep(1000); + updateCurrentJobStatus(JobStatus.SUCCEEDED); + } + + CompletableFuture disconnected = connection.disconnect(); + try { + disconnected.get(); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred during disconnect", ex); + } + } catch (Exception ex) { + throw new RuntimeException("Job execution failed", ex); + } + System.exit(0); + } +}