diff --git a/.github/workflows/library_concurrency_tests.yml b/.github/workflows/library_concurrency_tests.yml new file mode 100644 index 000000000..f0a39317f --- /dev/null +++ b/.github/workflows/library_concurrency_tests.yml @@ -0,0 +1,123 @@ +# This workflow performs interoperability tests across the supported runtimes of the MPL. +name: Library Interoperability Tests + +on: + workflow_call: + inputs: + dafny: + description: "The Dafny version to run" + required: true + type: string + regenerate-code: + description: "Regenerate code using smithy-dafny" + required: false + default: false + type: boolean + +jobs: + generateEncryptVectors: + strategy: + matrix: + library: [AwsCryptographicMaterialProviders] + os: [ + # https://taskei.amazon.dev/tasks/CrypTool-5283 + # windows-latest, + ubuntu-latest, + macos-13, + ] + language: [ + java, + # net, + # python, + # rust + ] + # https://taskei.amazon.dev/tasks/CrypTool-5284 + dotnet-version: ["6.0.x"] + java-versions: [8, 11, 16, 17] + runs-on: ${{ matrix.os }} + permissions: + id-token: write + contents: read + steps: + - name: Support longpaths on Git checkout + run: | + git config --global core.longpaths true + + # Test Vectors need to call KMS + - name: Configure AWS Credentials for Tests + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-region: us-west-2 + role-to-assume: arn:aws:iam::370957321024:role/GitHub-CI-MPL-Dafny-Role-us-west-2 + role-session-name: InterOpTests + + - uses: actions/checkout@v3 + # Not all submodules are needed. + # We manually pull the submodule we DO need. + - run: git submodule update --init libraries + - run: git submodule update --init --recursive smithy-dafny + + # Setup Java in Rust is needed for running polymorph + - name: Setup Java 17 + if: matrix.language == 'java' || matrix.language == 'rust' + uses: actions/setup-java@v3 + with: + distribution: "corretto" + java-version: 17 + + - name: Setup .NET Core SDK '6.0.x' + uses: actions/setup-dotnet@v3 + with: + dotnet-version: "6.0.x" + + - name: Setup Dafny + uses: dafny-lang/setup-dafny-action@v1.7.0 + with: + dafny-version: ${{ inputs.dafny }} + + - name: Regenerate code using smithy-dafny if necessary + if: ${{ inputs.regenerate-code }} + uses: ./.github/actions/polymorph_codegen + with: + dafny: ${{ inputs.dafny }} + library: ${{ matrix.library }} + diff-generated-code: false + + # Build implementation for each runtime + - name: Build ${{ matrix.library }} implementation in Java + shell: bash + working-directory: ./${{ matrix.library }} + run: | + # This works because `node` is installed by default on GHA runners + CORES=$(node -e 'console.log(os.cpus().length)') + make build_java CORES=$CORES + + - name: Setup gradle + if: matrix.language == 'java' + uses: gradle/gradle-build-action@v2 + with: + gradle-version: 7.2 + + - name: Setup Java ${{matrix.java-versions}} + uses: actions/setup-java@v3 + with: + distribution: "corretto" + java-version: ${{matrix.java-versions}} + + - name: Clean for next Java + uses: gradle/gradle-build-action@v3 + with: + arguments: clean + build-root-directory: ./${{ matrix.library }}/runtimes/java + + - name: Compile Java 8 + uses: gradle/gradle-build-action@v3 + with: + arguments: build + build-root-directory: ./${{ matrix.library }}/runtimes/java + + - name: Test Java 8 + uses: gradle/gradle-build-action@v3 + with: + arguments: testConcurrentExamples + build-root-directory: ./${{ matrix.library }}/runtimes/java diff --git a/AwsCryptographicMaterialProviders/runtimes/java/build.gradle.kts b/AwsCryptographicMaterialProviders/runtimes/java/build.gradle.kts index 4ae605172..872ad742d 100644 --- a/AwsCryptographicMaterialProviders/runtimes/java/build.gradle.kts +++ b/AwsCryptographicMaterialProviders/runtimes/java/build.gradle.kts @@ -315,6 +315,30 @@ val testExamples = task("testExamples") { testLogging { events("passed") } + filter { + excludeTestsMatching("software.amazon.cryptography.example.hierarchy.concurrent.*") + } +} + +val testConcurrentExamples = task("testConcurrentExamples") { + description = "Runs examples tests." + group = "verification" + + testClassesDirs = sourceSets["testExamples"].output.classesDirs + classpath = sourceSets["testExamples"].runtimeClasspath + sourceSets["examples"].output + sourceSets.main.get().output + // This will show System.out.println statements + testLogging.showStandardStreams = true + useTestNG() { + suites("src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/testng-parallel.xml") + maxParallelForks = 2 + } + + testLogging { + events("passed") + } + filter { + includeTestsMatching("software.amazon.cryptography.example.hierarchy.concurrent.*") + } } fun buildPom(mavenPublication: MavenPublication) { diff --git a/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/ConcurrentConditionCheckWriteTest.java b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/ConcurrentConditionCheckWriteTest.java new file mode 100644 index 000000000..479d5d7b2 --- /dev/null +++ b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/ConcurrentConditionCheckWriteTest.java @@ -0,0 +1,156 @@ +package software.amazon.cryptography.example.hierarchy.concurrent; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest; +import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsResponse; +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; +import software.amazon.awssdk.utils.ImmutableMap; +import software.amazon.cryptography.example.Constants; +import software.amazon.cryptography.example.DdbHelper; +import software.amazon.cryptography.example.Fixtures; + +// These concurrent tests check that the +public class ConcurrentConditionCheckWriteTest { + + private static final Integer threadCount = 5; + private static final String mLockedId = "concurrency-test-write-key"; + private static final Map INDEX_EXPR_ATT_NAMES = + ImmutableMap.of("#pk", "branch-key-id"); + + private static final List identifiers = Collections.unmodifiableList( + Arrays.asList("1", "2", "3", "4", "5") + ); + private Map threadIdToDdbClient; + private static Map indexToThreadId; + private ConcurrentLinkedDeque unpickedIndices; + + @BeforeClass + public void setup() { + threadIdToDdbClient = new ConcurrentHashMap<>(6, 1, threadCount); + identifiers.forEach(id -> + threadIdToDdbClient.put(id, DynamoDbClient.create()) + ); + indexToThreadId = new ConcurrentHashMap<>(6, 1, threadCount); + unpickedIndices = new ConcurrentLinkedDeque<>(identifiers); + } + + @AfterClass + public void teardown() { + DynamoDbClient _ddbClient = DynamoDbClient.create(); + identifiers.forEach(id -> + DdbHelper.deleteKeyStoreDdbItem( + mLockedId, + "branch:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + _ddbClient, + true + ) + ); + } + + public static Map indexItem( + final AttributeValue value, + final String timestamp + ) { + Map item = new HashMap<>(); + + item.put("branch-key-id", AttributeValue.builder().s(mLockedId).build()); + item.put("type", AttributeValue.builder().s(indexType()).build()); + item.put("value", value); + item.put("timestamp", AttributeValue.builder().s(timestamp).build()); + return item; + } + + private static String indexType() { + return "branch:ACTIVE"; + } + + public static TransactWriteItem conditionalWrite( + final AttributeValue value, + final String timestamp + ) { + return TransactWriteItem + .builder() + .put(putBuilder -> + putBuilder + .tableName(Fixtures.TEST_KEYSTORE_NAME) + .item(indexItem(value, timestamp)) + .conditionExpression("attribute_not_exists(#pk)") + .expressionAttributeNames(INDEX_EXPR_ATT_NAMES) + ) + .build(); + } + + private DynamoDbClient clientForThread(final String threadIdToIndex) { + return threadIdToDdbClient.computeIfAbsent( + threadIdToIndex, + ddbClient -> DynamoDbClient.create() + ); + } + + @Test(threadPoolSize = 5, invocationCount = 30, timeOut = 1000) + public void TestConcurrentWriteCheck() { + String threadId = String.valueOf(Thread.currentThread().getId()); + String threadIdToIndex = indexToThreadId.computeIfAbsent( + threadId, + str -> unpickedIndices.pop() + ); + AttributeValue value = AttributeValue.builder().s(threadIdToIndex).build(); + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSS'Z'"); // Quoted "Z" to indicate UTC, no timezone offset + df.setTimeZone(tz); + String timestamp = df.format(new Date()); + + System.out.println( + "Thread ID: " + + Thread.currentThread().getId() + + " ThreadIndex: " + + threadIdToIndex + + " Timestamp: " + + timestamp + ); + + try { + DynamoDbClient client = clientForThread(threadIdToIndex); + TransactWriteItemsResponse transactWriteItemsResponse = + client.transactWriteItems( + TransactWriteItemsRequest + .builder() + .transactItems(conditionalWrite(value, timestamp)) + .build() + ); + Assert.assertTrue( + transactWriteItemsResponse.sdkHttpResponse().isSuccessful() + ); + } catch (TransactionCanceledException exception) { + // We can fail for two reasons, either there's already a transact write in flight + // 0r we have failed the condition check. + exception + .cancellationReasons() + .forEach(cancellationReason -> { + Assert.assertTrue( + (cancellationReason.code().equals("TransactionConflict") || + cancellationReason.code().equals("ConditionalCheckFailed")) + ); + }); + } + } +} diff --git a/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/README.md b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/README.md new file mode 100644 index 000000000..796613ff8 --- /dev/null +++ b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/README.md @@ -0,0 +1,27 @@ +[//]: # "Copyright Amazon.com Inc. or its affiliates. All Rights Reserved." +[//]: # "SPDX-License-Identifier: CC-BY-SA-4.0" + +# AWS Cryptographic Material Providers Library Concurrency Testing Framework + +Welcome to the AWS Cryptographic Material Providers Library Concurrency and Parallelization +Testing Framework 🎉! + +This framework helps set up scenarios that we would like to run in a parallel or multithreaded environment. + +Some things to keep in mind when you add tests. Think about how you will be creating resources per +thread and what kind of state you need to keep between tests. + +Examples: + +- [Test regular DynamoDB Client TransactWrites](./ConcurrentConditionCheckWriteTest.java) +- [Test ACTIVE branch key reads while branch key creation is inflight](./StorageWriteReadConcurrencyTests.java) +- [Test branch key reads while branch key versioning is inflight](./StorageVersionReadConcurrencyTests.java) + +[Security issue notifications](./CONTRIBUTING.md#security-issue-notifications) + +## Security + +If you discover a potential security issue in this project +we ask that you notify AWS/Amazon Security via our +[vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). +Please **do not** create a public GitHub issue. diff --git a/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageVersionReadConcurrencyTests.java b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageVersionReadConcurrencyTests.java new file mode 100644 index 000000000..81e53e77a --- /dev/null +++ b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageVersionReadConcurrencyTests.java @@ -0,0 +1,302 @@ +package software.amazon.cryptography.example.hierarchy.concurrent; + +import static software.amazon.cryptography.example.hierarchy.concurrent.StorageWriteReadConcurrencyTests.createKeyStore; +import static software.amazon.cryptography.example.hierarchy.concurrent.StorageWriteReadConcurrencyTests.createStorageLayer; + +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.QueryRequest; +import software.amazon.awssdk.services.dynamodb.model.QueryResponse; +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; +import software.amazon.cryptography.example.DdbHelper; +import software.amazon.cryptography.example.Fixtures; +import software.amazon.cryptography.keystore.KeyStore; +import software.amazon.cryptography.keystore.model.DynamoDBTable; +import software.amazon.cryptography.keystore.model.GetActiveBranchKeyInput; +import software.amazon.cryptography.keystore.model.GetActiveBranchKeyOutput; +import software.amazon.cryptography.keystore.model.Storage; +import software.amazon.cryptography.keystoreadmin.KeyStoreAdmin; +import software.amazon.cryptography.keystoreadmin.model.CreateKeyInput; +import software.amazon.cryptography.keystoreadmin.model.KeyStoreAdminConfig; +import software.amazon.cryptography.keystoreadmin.model.KmsSymmetricKeyArn; +import software.amazon.cryptography.keystoreadmin.model.VersionKeyInput; + +// This class contains a suite of tests that check for behavior of reading +// a branch key while a version operation is in flight +public class StorageVersionReadConcurrencyTests { + + private static final String branchKeyId = + "concurrency-test-version-key-" + UUID.randomUUID(); + private static final Integer threadCount = 10; + private static final List identifiers = Collections.unmodifiableList( + Arrays.asList( + IntStream + .rangeClosed(1, 10) + .mapToObj(String::valueOf) + .toArray(String[]::new) + ) + ); + + private Map threadIndexToStorage; + private Map threadIndexToKeyStore; + private static Map< + String, + String + > storageIndexToThreadId, keyStoreIndexToThreadId; + private static Map versionKeyOutputMap; + private ConcurrentLinkedDeque< + String + > unpickedIndicesForStorage, unpickedIndicesForKeyStore; + private static Map encryptionContext; + private static final AtomicInteger counter = new AtomicInteger(0); + private static final TimeZone timeZone = TimeZone.getTimeZone("UTC"); + private static final DateFormat dateFormat = new SimpleDateFormat( + "yyyy-MM-dd'T'HH:mm:ss.SSSSS'Z'" + ); + private static final QueryRequest queryRequestForCleanUp = QueryRequest + .builder() + .tableName(Fixtures.TEST_KEYSTORE_NAME) + .keyConditionExpression("#pk = :pkval") + .expressionAttributeNames( + new HashMap() { + { + put("#pk", "branch-key-id"); + } + } + ) + .expressionAttributeValues( + new HashMap() { + { + put(":pkval", AttributeValue.builder().s(branchKeyId).build()); + } + } + ) + .build(); + + @BeforeClass + public void setup() { + dateFormat.setTimeZone(timeZone); + threadIndexToStorage = new ConcurrentHashMap<>(16, 1, threadCount); + threadIndexToKeyStore = new ConcurrentHashMap<>(16, 1, threadCount); + storageIndexToThreadId = new ConcurrentHashMap<>(16, 1, threadCount); + keyStoreIndexToThreadId = new ConcurrentHashMap<>(16, 1, threadCount); + versionKeyOutputMap = new ConcurrentHashMap<>(16, 1, threadCount); + + unpickedIndicesForStorage = new ConcurrentLinkedDeque<>(identifiers); + unpickedIndicesForKeyStore = new ConcurrentLinkedDeque<>(identifiers); + // For every identifier which will ultimately map to one thread, we will create a unique + // storage layer per thread with a unique ddb client. This will make it so that + // we isolate resources even further and prevent resource reuse. + identifiers.forEach(id -> { + threadIndexToStorage.put(id, createStorageLayer()); + threadIndexToKeyStore.put(id, createKeyStore()); + }); + + encryptionContext = new HashMap<>(); + encryptionContext.put("custom", "ec"); + + final DynamoDbClient _ddbClient = DynamoDbClient.create(); + DynamoDBTable table = DynamoDBTable + .builder() + .ddbClient(_ddbClient) + .ddbTableName(Fixtures.TEST_KEYSTORE_NAME) + .build(); + Storage tmp = Storage.builder().ddb(table).build(); + KeyStoreAdmin admin = KeyStoreAdmin + .builder() + .KeyStoreAdminConfig( + KeyStoreAdminConfig + .builder() + .storage(tmp) + .logicalKeyStoreName(Fixtures.TEST_KEYSTORE_NAME) + .build() + ) + .build(); + CreateKeyInput createKeyInput = CreateKeyInput + .builder() + .Identifier(branchKeyId) + .EncryptionContext(encryptionContext) + .KmsArn( + KmsSymmetricKeyArn + .builder() + .KmsKeyArn(Fixtures.KEYSTORE_KMS_ARN) + .build() + ) + .build(); + admin.CreateKey(createKeyInput); + System.out.println( + "Successfully set up test with branch key: " + branchKeyId + ); + } + + @AfterClass + public void teardown() { + final DynamoDbClient _ddbClient = DynamoDbClient.create(); + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + "branch:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + "beacon:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + QueryResponse res = _ddbClient.query(queryRequestForCleanUp); + res + .items() + .forEach(response -> { + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + response.get("type").s(), + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + }); + } + + private Storage storageForThread(final String threadIdToIndex) { + return threadIndexToStorage.computeIfAbsent( + threadIdToIndex, + k -> createStorageLayer() + ); + } + + private KeyStore keyStoreForThread(String threadIdToIndex) { + return threadIndexToKeyStore.computeIfAbsent( + threadIdToIndex, + k -> createKeyStore() + ); + } + + private void raceToVersionWithStorage(KeyStoreAdmin admin) { + VersionKeyInput input = VersionKeyInput + .builder() + .Identifier(branchKeyId) + .KmsArn( + KmsSymmetricKeyArn + .builder() + .KmsKeyArn(Fixtures.KEYSTORE_KMS_ARN) + .build() + ) + .build(); + admin.VersionKey(input); + } + + private GetActiveBranchKeyOutput raceToReadActiveWithKeyStore( + KeyStore keyStore + ) { + GetActiveBranchKeyInput input = GetActiveBranchKeyInput + .builder() + .branchKeyIdentifier(branchKeyId) + .build(); + return keyStore.GetActiveBranchKey(input); + } + + @Test(threadPoolSize = 10, invocationCount = 100, timeOut = 10000) + public void testConcurrentVersionWithStorage() { + String threadId = String.valueOf(Thread.currentThread().getId()); + String threadIdToIndex = storageIndexToThreadId.computeIfAbsent( + threadId, + str -> unpickedIndicesForStorage.pop() + ); + + String timestamp = dateFormat.format(new Date()); + try { + Storage threadStorage = storageForThread(threadIdToIndex); + KeyStoreAdminConfig keyStoreAdminConfig = KeyStoreAdminConfig + .builder() + .storage(threadStorage) + .logicalKeyStoreName(Fixtures.TEST_KEYSTORE_NAME) + .build(); + KeyStoreAdmin admin = KeyStoreAdmin + .builder() + .KeyStoreAdminConfig(keyStoreAdminConfig) + .build(); + raceToVersionWithStorage(admin); + System.out.println( + "Successfully versioned branch key! Thread ID: " + + Thread.currentThread().getId() + + " ThreadIndex: " + + threadIdToIndex + + " Timestamp: " + + timestamp + + " BranchKeyId: " + + branchKeyId + ); + } catch (TransactionCanceledException exception) { + System.out.println("Failed to write branch key: " + branchKeyId); + // Exceptions that get thrown when you write keys using the Storage interface + exception + .cancellationReasons() + .forEach(cancellationReason -> { + Assert.assertTrue( + (cancellationReason.code().equals("TransactionConflict") || + cancellationReason.code().equals("None") || + cancellationReason.code().equals("ConditionalCheckFailed")) + ); + }); + } + } + + @Test(threadPoolSize = 10, invocationCount = 100, timeOut = 10000) + public void testConcurrentActiveReadWhileVersionInFlight() { + // Since on set up we create a branch key, we should always be able to read. + String threadId = String.valueOf(Thread.currentThread().getId()); + String threadIdToIndex = keyStoreIndexToThreadId.computeIfAbsent( + threadId, + str -> unpickedIndicesForKeyStore.pop() + ); + KeyStore keyStore = keyStoreForThread(threadIdToIndex); + GetActiveBranchKeyOutput output = raceToReadActiveWithKeyStore(keyStore); + versionKeyOutputMap.put( + output.branchKeyMaterials().branchKeyVersion(), + output.branchKeyMaterials().branchKey() + ); + System.out.println( + "Successfully read branch key: " + + branchKeyId + + " with version: " + + output.branchKeyMaterials().branchKeyVersion() + ); + } + + @Test( + dependsOnMethods = { + "testConcurrentActiveReadWhileVersionInFlight", + "testConcurrentVersionWithStorage", + } + ) + public void testVersionReads() { + Assert.assertFalse(versionKeyOutputMap.isEmpty()); + for (String key : versionKeyOutputMap.keySet()) { + System.out.println( + "key: " + key + " value " + versionKeyOutputMap.get(key) + ); + } + } +} diff --git a/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageWriteReadConcurrencyTests.java b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageWriteReadConcurrencyTests.java new file mode 100644 index 000000000..48ebd6b5b --- /dev/null +++ b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/StorageWriteReadConcurrencyTests.java @@ -0,0 +1,264 @@ +package software.amazon.cryptography.example.hierarchy.concurrent; + +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.cryptography.example.DdbHelper; +import software.amazon.cryptography.example.Fixtures; +import software.amazon.cryptography.keystore.KeyStore; +import software.amazon.cryptography.keystore.model.DynamoDBTable; +import software.amazon.cryptography.keystore.model.GetActiveBranchKeyInput; +import software.amazon.cryptography.keystore.model.GetActiveBranchKeyOutput; +import software.amazon.cryptography.keystore.model.KMSConfiguration; +import software.amazon.cryptography.keystore.model.KeyStorageException; +import software.amazon.cryptography.keystore.model.KeyStoreConfig; +import software.amazon.cryptography.keystore.model.KeyStoreException; +import software.amazon.cryptography.keystore.model.Storage; +import software.amazon.cryptography.keystoreadmin.KeyStoreAdmin; +import software.amazon.cryptography.keystoreadmin.model.CreateKeyInput; +import software.amazon.cryptography.keystoreadmin.model.CreateKeyOutput; +import software.amazon.cryptography.keystoreadmin.model.KeyStoreAdminConfig; +import software.amazon.cryptography.keystoreadmin.model.KmsSymmetricKeyArn; + +// This class contains a suite of tests to check behavior in the storage layer +// of the library's APIs. These APIs write using the storage layer and . +public class StorageWriteReadConcurrencyTests { + + private static final String branchKeyId = + "concurrency-test-write-key-" + UUID.randomUUID(); + private static final Integer threadCount = 15; + private static final List identifiers = Collections.unmodifiableList( + Arrays.asList( + IntStream + .rangeClosed(1, 15) + .mapToObj(String::valueOf) + .toArray(String[]::new) + ) + ); + + private Map threadIndexToStorage; + private Map threadIndexToKeyStore; + private static Map indexToThreadId; + private static Map< + String, + GetActiveBranchKeyOutput + > getActiveBranchKeyOutputs; + private ConcurrentLinkedDeque< + String + > unpickedIndicesForStorage, unpickedIndicesForKeyStore; + private static Map encryptionContext; + private static final AtomicInteger counter = new AtomicInteger(0); + private static final TimeZone timeZone = TimeZone.getTimeZone("UTC"); + private static final DateFormat dateFormat = new SimpleDateFormat( + "yyyy-MM-dd'T'HH:mm:ss.SSSSS'Z'" + ); + + @BeforeClass + public void setup() { + dateFormat.setTimeZone(timeZone); + threadIndexToStorage = new ConcurrentHashMap<>(16, 1, threadCount); + threadIndexToKeyStore = new ConcurrentHashMap<>(16, 1, threadCount); + indexToThreadId = new ConcurrentHashMap<>(16, 1, threadCount); + getActiveBranchKeyOutputs = new ConcurrentHashMap<>(16, 1, threadCount); + + unpickedIndicesForStorage = new ConcurrentLinkedDeque<>(identifiers); + unpickedIndicesForKeyStore = new ConcurrentLinkedDeque<>(identifiers); + // For every identifier which will ultimately map to one thread, we will create a unique + // storage layer per thread with a unique ddb client. This will make it so that + // we isolate resources even further and prevent resource reuse. + identifiers.forEach(id -> { + threadIndexToStorage.put(id, createStorageLayer()); + threadIndexToKeyStore.put(id, createKeyStore()); + }); + + encryptionContext = new HashMap<>(); + encryptionContext.put("custom", "ec"); + } + + @AfterClass + public void teardown() { + GetItemResponse res = DdbHelper.getKeyStoreDdbItem( + branchKeyId, + "branch:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create() + ); + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + "branch:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + "beacon:ACTIVE", + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + DdbHelper.deleteKeyStoreDdbItem( + branchKeyId, + res.item().get("version").s(), + Fixtures.TEST_KEYSTORE_NAME, + DynamoDbClient.create(), + true + ); + } + + public static KeyStore createKeyStore() { + final DynamoDbClient _ddbClient = DynamoDbClient.create(); + final KmsClient _kmsClient = KmsClient.create(); + final KeyStoreConfig config = KeyStoreConfig + .builder() + .ddbClient(_ddbClient) + .ddbTableName(Fixtures.TEST_KEYSTORE_NAME) + .logicalKeyStoreName(Fixtures.TEST_KEYSTORE_NAME) + .kmsClient(_kmsClient) + .kmsConfiguration( + KMSConfiguration.builder().kmsKeyArn(Fixtures.KEYSTORE_KMS_ARN).build() + ) + .build(); + return KeyStore.builder().KeyStoreConfig(config).build(); + } + + public static Storage createStorageLayer() { + final DynamoDbClient _ddbClient = DynamoDbClient.create(); + DynamoDBTable table = DynamoDBTable + .builder() + .ddbClient(_ddbClient) + .ddbTableName(Fixtures.TEST_KEYSTORE_NAME) + .build(); + return Storage.builder().ddb(table).build(); + } + + private Storage storageForThread(final String threadIdToIndex) { + return threadIndexToStorage.computeIfAbsent( + threadIdToIndex, + k -> createStorageLayer() + ); + } + + private KeyStore keyStoreForThread(String threadIdToIndex) { + return threadIndexToKeyStore.computeIfAbsent( + threadIdToIndex, + k -> createKeyStore() + ); + } + + private CreateKeyOutput raceToWriteWithStorage(KeyStoreAdmin admin) { + CreateKeyInput createKeyInput = CreateKeyInput + .builder() + .Identifier(branchKeyId) + .EncryptionContext(encryptionContext) + .KmsArn( + KmsSymmetricKeyArn + .builder() + .KmsKeyArn(Fixtures.KEYSTORE_KMS_ARN) + .build() + ) + .build(); + return admin.CreateKey(createKeyInput); + } + + private GetActiveBranchKeyOutput raceToReadWithKeyStore(KeyStore keyStore) { + GetActiveBranchKeyInput input = GetActiveBranchKeyInput + .builder() + .branchKeyIdentifier(branchKeyId) + .build(); + return keyStore.GetActiveBranchKey(input); + } + + @Test(threadPoolSize = 15, invocationCount = 150, timeOut = 10000) + public void testConcurrentStorage() { + String threadId = String.valueOf(Thread.currentThread().getId()); + String threadIdToIndex = indexToThreadId.computeIfAbsent( + threadId, + str -> unpickedIndicesForStorage.pop() + ); + + String timestamp = dateFormat.format(new Date()); + + try { + if (Integer.parseInt(threadIdToIndex) % 2 == 0) { + Storage threadStorage = storageForThread(threadIdToIndex); + KeyStoreAdminConfig keyStoreAdminConfig = KeyStoreAdminConfig + .builder() + .storage(threadStorage) + .logicalKeyStoreName(Fixtures.TEST_KEYSTORE_NAME) + .build(); + KeyStoreAdmin admin = KeyStoreAdmin + .builder() + .KeyStoreAdminConfig(keyStoreAdminConfig) + .build(); + raceToWriteWithStorage(admin); + System.out.println( + "Successfully wrote! Thread ID: " + + Thread.currentThread().getId() + + " ThreadIndex: " + + threadIdToIndex + + " Timestamp: " + + timestamp + + " BranchKeyId: " + + branchKeyId + ); + } else { + String iteration = String.valueOf(counter.incrementAndGet()); + KeyStore keyStore = keyStoreForThread(threadIdToIndex); + GetActiveBranchKeyOutput output = raceToReadWithKeyStore(keyStore); + getActiveBranchKeyOutputs.put(iteration, output); + System.out.println("Successfully read branch key: " + branchKeyId); + } + } catch (TransactionCanceledException exception) { + System.out.println("Failed to write branch key: " + branchKeyId); + // Exceptions that get thrown when you write keys using the Storage interface + exception + .cancellationReasons() + .forEach(cancellationReason -> { + Assert.assertTrue( + (cancellationReason.code().equals("TransactionConflict") || + cancellationReason.code().equals("None") || + cancellationReason.code().equals("ConditionalCheckFailed")) + ); + }); + } catch (KeyStorageException | KeyStoreException e) { + System.out.println("Failed to read branch key: " + branchKeyId); + // Exceptions that get thrown when you read keys using the KeyStore interface. + Assert.assertEquals( + e.getMessage(), + "No item found for corresponding branch key identifier." + ); + } + } + + @Test(dependsOnMethods = { "testConcurrentStorage" }) + public void testReadAfterWriteCheck() { + // Iterate through the values and check that it equals the first item in the map, + // if there are any difference the test will fail. + System.out.println(getActiveBranchKeyOutputs.size()); + GetActiveBranchKeyOutput first = getActiveBranchKeyOutputs + .values() + .iterator() + .next(); + for (GetActiveBranchKeyOutput value : getActiveBranchKeyOutputs.values()) { + Assert.assertEquals( + value.branchKeyMaterials().branchKey(), + first.branchKeyMaterials().branchKey() + ); + } + } +} diff --git a/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/testng-parallel.xml b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/testng-parallel.xml new file mode 100644 index 000000000..43f0afa4d --- /dev/null +++ b/AwsCryptographicMaterialProviders/runtimes/java/src/testExamples/java/software/amazon/cryptography/example/hierarchy/concurrent/testng-parallel.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file