From c2598c50e675ef0dfb37f4b381a4eff3bde42934 Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 30 Jan 2024 08:15:09 -0800 Subject: [PATCH] Integration tests for using the Kafka buffer with KMS encryption (#3982) Adds a new integration test for using the Kafka buffer with KMS encryption. Includes a new KMS CDK stack for any projects that need KMS to use. Some improvements to the CDK stack. Resolves #3980 Change the GitHub tests to include only the Kafka buffer tests which current run in GitHub. Signed-off-by: David Venable --- .../kafka-plugin-integration-tests.yml | 2 +- data-prepper-plugins/kafka-plugins/README.md | 9 +- .../kafka-plugins/build.gradle | 1 + .../kafka/buffer/KafkaBuffer_KmsIT.java | 241 ++++++++++++++++++ .../aws-testing-cdk/bin/aws-testing-cdk.ts | 7 +- .../lib/common/GitHubAccessStack.ts | 35 ++- .../aws-testing-cdk/lib/common/KmsStack.ts | 32 +++ 7 files changed, 315 insertions(+), 12 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java create mode 100644 testing/aws-testing-cdk/lib/common/KmsStack.ts diff --git a/.github/workflows/kafka-plugin-integration-tests.yml b/.github/workflows/kafka-plugin-integration-tests.yml index 1f93ccecfe..28fb5bdc05 100644 --- a/.github/workflows/kafka-plugin-integration-tests.yml +++ b/.github/workflows/kafka-plugin-integration-tests.yml @@ -43,7 +43,7 @@ jobs: ./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT - name: Run Kafka integration tests run: | - ./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests '*kafka.buffer*' + ./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT - name: Upload Unit Test Results if: always() diff --git a/data-prepper-plugins/kafka-plugins/README.md b/data-prepper-plugins/kafka-plugins/README.md index c96629e74a..291a96e129 100644 --- a/data-prepper-plugins/kafka-plugins/README.md +++ b/data-prepper-plugins/kafka-plugins/README.md @@ -30,9 +30,16 @@ docker compose --project-directory data-prepper-plugins/kafka-plugins/src/integr Not all integration tests currently work with Docker. But, you can run the following. ``` -./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT +./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin -Dtests.kafka.kms_key=alias/DataPrepperTesting --tests KafkaSourceJsonTypeIT --tests '*kafka.buffer*' ``` +If you do not have a KMS key, you can skip the KMS tests. + +``` +./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT +``` + + See the Old integration tests section to run other tests. However, these are more involved. ### Old integration tests diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 24f2fa62e0..7cd1d9bad6 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -127,6 +127,7 @@ task integrationTest(type: Test) { systemProperty 'tests.kafka.confluent.registry_url', System.getProperty('tests.kafka.confluent.registry_url') systemProperty 'tests.kafka.authconfig.username', System.getProperty('tests.kafka.authconfig.username') systemProperty 'tests.kafka.authconfig.password', System.getProperty('tests.kafka.authconfig.password') + systemProperty 'tests.kafka.kms_key', System.getProperty('tests.kafka.kms_key') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java new file mode 100644 index 0000000000..2b07936065 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java @@ -0,0 +1,241 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.protobuf.ByteString; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.util.TestProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.DataKeySpec; +import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest; +import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse; + +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.Base64; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class KafkaBuffer_KmsIT { + private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer_KmsIT.class); + @Mock + private PluginSetting pluginSetting; + + private KafkaBufferConfig kafkaBufferConfig; + @Mock + private PluginFactory pluginFactory; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private AcknowledgementSet acknowledgementSet; + + private Random random; + + private BufferTopicConfig topicConfig; + + private ByteDecoder byteDecoder; + + private String bootstrapServersCommaDelimited; + private String topicName; + private ObjectMapper objectMapper; + private String kmsKey; + private KmsClient kmsClient; + + @BeforeEach + void setUp() { + System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.apache.ApacheSdkHttpService"); + random = new Random(); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + acknowledgementSet = mock(AcknowledgementSet.class); + lenient().doAnswer((a) -> null).when(acknowledgementSet).complete(); + lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + + when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + + topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5); + + final Map topicConfigMap = Map.of( + "name", topicName, + "group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6), + "create_topic", true + ); + + topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class); + + bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers"); + + LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited); + + final Map bufferConfigMap = Map.of( + "topics", List.of(topicConfigMap), + "bootstrap_servers", List.of(bootstrapServersCommaDelimited), + "encryption", Map.of("type", "none") + ); + kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + + kmsKey = System.getProperty("tests.kafka.kms_key"); + kmsClient = KmsClient.create(); + + byteDecoder = null; + } + + private KafkaBuffer createObjectUnderTest() { + return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, ignored -> DefaultCredentialsProvider.create(), null); + } + + @Nested + class Encrypted { + private Cipher encryptCipher; + private String aesKey; + + @BeforeEach + void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { + final GenerateDataKeyRequest request = GenerateDataKeyRequest + .builder() + .keyId(kmsKey) + .keySpec(DataKeySpec.AES_256) + .build(); + + final GenerateDataKeyResponse response = kmsClient.generateDataKey(request); + + byte[] plaintextDataKey = response.plaintext().asByteArray(); + byte[] encryptedDataKey = response.ciphertextBlob().asByteArray(); + aesKey = Base64.getEncoder() + .withoutPadding() + .encodeToString(encryptedDataKey); + + final SecretKey secretKey = new SecretKeySpec(plaintextDataKey, 0, plaintextDataKey.length, "AES"); + encryptCipher = Cipher.getInstance("AES"); + encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey); + + + final Map topicConfigMap = objectMapper.convertValue(topicConfig, Map.class); + topicConfigMap.put("encryption_key", aesKey); + topicConfigMap.put("kms", Map.of( + "key_id", kmsKey + )); + final Map bufferConfigMap = objectMapper.convertValue(kafkaBufferConfig, Map.class); + bufferConfigMap.put("topics", List.of(topicConfigMap)); + kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + } + + @Test + void write_and_read_encrypted() throws TimeoutException { + KafkaBuffer objectUnderTest = createObjectUnderTest(); + + Record record = createRecord(); + objectUnderTest.write(record, 1_000); + + Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not. + //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata())); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + } + + @Test + void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException { + final KafkaBuffer objectUnderTest = createObjectUnderTest(); + final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName); + + final Record record = createRecord(); + final byte[] unencryptedBytes = record.getData().toJsonString().getBytes(); + final byte[] encryptedBytes = encryptBytes(unencryptedBytes); + + final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedBytes)) + .build(); + + final byte[] unencryptedKeyBytes = createRandomBytes(); + final byte[] encryptedKeyBytes = encryptBytes(unencryptedKeyBytes); + + final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder() + .setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES) + .setData(ByteString.copyFrom(encryptedKeyBytes)) + .build(); + + testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray()); + + final Map.Entry>, CheckpointState> readResult = objectUnderTest.read(10_000); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + final Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + } + + private byte[] encryptBytes(final byte[] plaintextBytes) throws IllegalBlockSizeException, BadPaddingException { + return encryptCipher.doFinal(plaintextBytes); + } + } + + private byte[] createRandomBytes() { + final byte[] writtenBytes = new byte[128]; + random.nextBytes(writtenBytes); + return writtenBytes; + } + + private Record createRecord() { + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + return new Record<>(event); + } + +} diff --git a/testing/aws-testing-cdk/bin/aws-testing-cdk.ts b/testing/aws-testing-cdk/bin/aws-testing-cdk.ts index e2e81811bf..d6405efdec 100644 --- a/testing/aws-testing-cdk/bin/aws-testing-cdk.ts +++ b/testing/aws-testing-cdk/bin/aws-testing-cdk.ts @@ -3,11 +3,16 @@ import 'source-map-support/register'; import * as cdk from 'aws-cdk-lib'; import {GitHubAccessStack} from '../lib/common/GitHubAccessStack'; import {SecretsManagerStack} from '../lib/aws-secrets-manager/SecretsManagerStack'; +import {KmsStack} from "../lib/common/KmsStack"; const app = new cdk.App(); const githubStack = new GitHubAccessStack(app, 'GitHubStack', {}); +new KmsStack(app, 'CommonKmsStack', { + testingRole: githubStack.gitHubActionsTestingRole +}) + new SecretsManagerStack(app, 'SecretsManagerStack', { testingRole: githubStack.gitHubActionsTestingRole -}); \ No newline at end of file +}); diff --git a/testing/aws-testing-cdk/lib/common/GitHubAccessStack.ts b/testing/aws-testing-cdk/lib/common/GitHubAccessStack.ts index eb4626d614..7b66b9a3ff 100644 --- a/testing/aws-testing-cdk/lib/common/GitHubAccessStack.ts +++ b/testing/aws-testing-cdk/lib/common/GitHubAccessStack.ts @@ -5,9 +5,16 @@ import {Stack, StackProps} from 'aws-cdk-lib'; import {Construct} from 'constructs'; -import {OpenIdConnectPrincipal, OpenIdConnectProvider, Role} from 'aws-cdk-lib/aws-iam' +import { + AccountPrincipal, + CompositePrincipal, + OpenIdConnectPrincipal, + OpenIdConnectProvider, + Role +} from 'aws-cdk-lib/aws-iam' const DEFAULT_ORGANIZATION = 'opensearch-project' +const GITHUB_TOKEN_URL = 'token.actions.githubusercontent.com' /** * Creates the IAM resources necessary for GitHub to access roles within @@ -19,13 +26,18 @@ export class GitHubAccessStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); - const gitHubOidcProvider = new OpenIdConnectProvider(this, 'GitHubOidcProvider', { - url: 'https://token.actions.githubusercontent.com', - thumbprints: [ - '6938fd4d98bab03faadb97b34396831e3780aea1' - ], - clientIds: ['sts.amazonaws.com'] - }); + const oidcProviderExists: boolean = scope.node.tryGetContext('gitHubOidcProviderExists'); + + const gitHubOidcProvider = + oidcProviderExists ? + OpenIdConnectProvider.fromOpenIdConnectProviderArn(this, 'GitHubOidcProvider', `arn:aws:iam::${this.account}:oidc-provider/${GITHUB_TOKEN_URL}`) : + new OpenIdConnectProvider(this, 'GitHubOidcProvider', { + url: `https://${GITHUB_TOKEN_URL}`, + thumbprints: [ + '6938fd4d98bab03faadb97b34396831e3780aea1' + ], + clientIds: ['sts.amazonaws.com'] + }); const dataPrepperOrganization: string = scope.node.tryGetContext('dataPrepperOrganization') || DEFAULT_ORGANIZATION; @@ -36,9 +48,14 @@ export class GitHubAccessStack extends Stack { }, }); + const currentAccountPrincipal = new AccountPrincipal(this.account); + this.gitHubActionsTestingRole = new Role(this, 'GitHubActionsTestingRole', { roleName: 'GitHubActionsTesting', - assumedBy: gitHubPrincipal + assumedBy: new CompositePrincipal( + gitHubPrincipal, + currentAccountPrincipal + ) }); } } diff --git a/testing/aws-testing-cdk/lib/common/KmsStack.ts b/testing/aws-testing-cdk/lib/common/KmsStack.ts new file mode 100644 index 0000000000..0b6a098d3a --- /dev/null +++ b/testing/aws-testing-cdk/lib/common/KmsStack.ts @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import {Stack, StackProps} from 'aws-cdk-lib'; +import {Construct} from 'constructs'; +import {Role} from "aws-cdk-lib/aws-iam"; +import {Key} from "aws-cdk-lib/aws-kms"; + +export interface KmsStackProps extends StackProps { + readonly testingRole: Role; +} + +/** + * CDK stack that creates a common KMS key. + */ +export class KmsStack extends Stack { + readonly kmsKey: Key; + + constructor(scope: Construct, id: string, props: KmsStackProps) { + super(scope, id, props); + + this.kmsKey = new Key(this, 'DataPrepperTestingKey', { + alias: 'DataPrepperTesting', + description: 'Shared KMS key for testing any Data Prepper features that use KMS.' + }); + + this.kmsKey.grantEncryptDecrypt(props.testingRole) + } +} +