Skip to content

Commit

Permalink
Integration tests for using the Kafka buffer with KMS encryption (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#3982)

Adds a new integration test for using the Kafka buffer with KMS encryption. Includes a new KMS CDK stack for any projects that need KMS to use. Some improvements to the CDK stack. Resolves opensearch-project#3980

Change the GitHub tests to include only the Kafka buffer tests which current run in GitHub.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jan 30, 2024
1 parent d5a9b0b commit c2598c5
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kafka-plugin-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaStartIT
- name: Run Kafka integration tests
run: |
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests '*kafka.buffer*'
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT
- name: Upload Unit Test Results
if: always()
Expand Down
9 changes: 8 additions & 1 deletion data-prepper-plugins/kafka-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ docker compose --project-directory data-prepper-plugins/kafka-plugins/src/integr
Not all integration tests currently work with Docker. But, you can run the following.

```
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin -Dtests.kafka.kms_key=alias/DataPrepperTesting --tests KafkaSourceJsonTypeIT --tests '*kafka.buffer*'
```

If you do not have a KMS key, you can skip the KMS tests.

```
./gradlew data-prepper-plugins:kafka-plugins:integrationTest -Dtests.kafka.bootstrap_servers=localhost:9092 -Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin --tests KafkaSourceJsonTypeIT --tests KafkaBufferIT --tests KafkaBufferOTelIT
```


See the Old integration tests section to run other tests. However, these are more involved.

### Old integration tests
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ task integrationTest(type: Test) {
systemProperty 'tests.kafka.confluent.registry_url', System.getProperty('tests.kafka.confluent.registry_url')
systemProperty 'tests.kafka.authconfig.username', System.getProperty('tests.kafka.authconfig.username')
systemProperty 'tests.kafka.authconfig.password', System.getProperty('tests.kafka.authconfig.password')
systemProperty 'tests.kafka.kms_key', System.getProperty('tests.kafka.kms_key')

filter {
includeTestsMatching '*IT'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.buffer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest;
import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse;

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class KafkaBuffer_KmsIT {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer_KmsIT.class);
@Mock
private PluginSetting pluginSetting;

private KafkaBufferConfig kafkaBufferConfig;
@Mock
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private AcknowledgementSet acknowledgementSet;

private Random random;

private BufferTopicConfig topicConfig;

private ByteDecoder byteDecoder;

private String bootstrapServersCommaDelimited;
private String topicName;
private ObjectMapper objectMapper;
private String kmsKey;
private KmsClient kmsClient;

@BeforeEach
void setUp() {
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.apache.ApacheSdkHttpService");
random = new Random();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer((a) -> null).when(acknowledgementSet).complete();
lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString());

topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5);

final Map<String, Object> topicConfigMap = Map.of(
"name", topicName,
"group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6),
"create_topic", true
);

topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class);

bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers");

LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited);

final Map<String, Object> bufferConfigMap = Map.of(
"topics", List.of(topicConfigMap),
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
"encryption", Map.of("type", "none")
);
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);

kmsKey = System.getProperty("tests.kafka.kms_key");
kmsClient = KmsClient.create();

byteDecoder = null;
}

private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, null, ignored -> DefaultCredentialsProvider.create(), null);
}

@Nested
class Encrypted {
private Cipher encryptCipher;
private String aesKey;

@BeforeEach
void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException {
final GenerateDataKeyRequest request = GenerateDataKeyRequest
.builder()
.keyId(kmsKey)
.keySpec(DataKeySpec.AES_256)
.build();

final GenerateDataKeyResponse response = kmsClient.generateDataKey(request);

byte[] plaintextDataKey = response.plaintext().asByteArray();
byte[] encryptedDataKey = response.ciphertextBlob().asByteArray();
aesKey = Base64.getEncoder()
.withoutPadding()
.encodeToString(encryptedDataKey);

final SecretKey secretKey = new SecretKeySpec(plaintextDataKey, 0, plaintextDataKey.length, "AES");
encryptCipher = Cipher.getInstance("AES");
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);


final Map<String, Object> topicConfigMap = objectMapper.convertValue(topicConfig, Map.class);
topicConfigMap.put("encryption_key", aesKey);
topicConfigMap.put("kms", Map.of(
"key_id", kmsKey
));
final Map<String, Object> bufferConfigMap = objectMapper.convertValue(kafkaBufferConfig, Map.class);
bufferConfigMap.put("topics", List.of(topicConfigMap));
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);
}

@Test
void write_and_read_encrypted() throws TimeoutException {
KafkaBuffer objectUnderTest = createObjectUnderTest();

Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
void read_decrypts_data_from_the_predefined_key() throws IllegalBlockSizeException, BadPaddingException {
final KafkaBuffer objectUnderTest = createObjectUnderTest();
final TestProducer testProducer = new TestProducer(bootstrapServersCommaDelimited, topicName);

final Record<Event> record = createRecord();
final byte[] unencryptedBytes = record.getData().toJsonString().getBytes();
final byte[] encryptedBytes = encryptBytes(unencryptedBytes);

final KafkaBufferMessage.BufferData bufferedData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedBytes))
.build();

final byte[] unencryptedKeyBytes = createRandomBytes();
final byte[] encryptedKeyBytes = encryptBytes(unencryptedKeyBytes);

final KafkaBufferMessage.BufferData keyData = KafkaBufferMessage.BufferData.newBuilder()
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
.setData(ByteString.copyFrom(encryptedKeyBytes))
.build();

testProducer.publishRecord(keyData.toByteArray(), bufferedData.toByteArray());

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

final Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

private byte[] encryptBytes(final byte[] plaintextBytes) throws IllegalBlockSizeException, BadPaddingException {
return encryptCipher.doFinal(plaintextBytes);
}
}

private byte[] createRandomBytes() {
final byte[] writtenBytes = new byte[128];
random.nextBytes(writtenBytes);
return writtenBytes;
}

private Record<Event> createRecord() {
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
return new Record<>(event);
}

}
7 changes: 6 additions & 1 deletion testing/aws-testing-cdk/bin/aws-testing-cdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import {GitHubAccessStack} from '../lib/common/GitHubAccessStack';
import {SecretsManagerStack} from '../lib/aws-secrets-manager/SecretsManagerStack';
import {KmsStack} from "../lib/common/KmsStack";

const app = new cdk.App();

const githubStack = new GitHubAccessStack(app, 'GitHubStack', {});

new KmsStack(app, 'CommonKmsStack', {
testingRole: githubStack.gitHubActionsTestingRole
})

new SecretsManagerStack(app, 'SecretsManagerStack', {
testingRole: githubStack.gitHubActionsTestingRole
});
});
35 changes: 26 additions & 9 deletions testing/aws-testing-cdk/lib/common/GitHubAccessStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@

import {Stack, StackProps} from 'aws-cdk-lib';
import {Construct} from 'constructs';
import {OpenIdConnectPrincipal, OpenIdConnectProvider, Role} from 'aws-cdk-lib/aws-iam'
import {
AccountPrincipal,
CompositePrincipal,
OpenIdConnectPrincipal,
OpenIdConnectProvider,
Role
} from 'aws-cdk-lib/aws-iam'

const DEFAULT_ORGANIZATION = 'opensearch-project'
const GITHUB_TOKEN_URL = 'token.actions.githubusercontent.com'

/**
* Creates the IAM resources necessary for GitHub to access roles within
Expand All @@ -19,13 +26,18 @@ export class GitHubAccessStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);

const gitHubOidcProvider = new OpenIdConnectProvider(this, 'GitHubOidcProvider', {
url: 'https://token.actions.githubusercontent.com',
thumbprints: [
'6938fd4d98bab03faadb97b34396831e3780aea1'
],
clientIds: ['sts.amazonaws.com']
});
const oidcProviderExists: boolean = scope.node.tryGetContext('gitHubOidcProviderExists');

const gitHubOidcProvider =
oidcProviderExists ?
OpenIdConnectProvider.fromOpenIdConnectProviderArn(this, 'GitHubOidcProvider', `arn:aws:iam::${this.account}:oidc-provider/${GITHUB_TOKEN_URL}`) :
new OpenIdConnectProvider(this, 'GitHubOidcProvider', {
url: `https://${GITHUB_TOKEN_URL}`,
thumbprints: [
'6938fd4d98bab03faadb97b34396831e3780aea1'
],
clientIds: ['sts.amazonaws.com']
});

const dataPrepperOrganization: string = scope.node.tryGetContext('dataPrepperOrganization') || DEFAULT_ORGANIZATION;

Expand All @@ -36,9 +48,14 @@ export class GitHubAccessStack extends Stack {
},
});

const currentAccountPrincipal = new AccountPrincipal(this.account);

this.gitHubActionsTestingRole = new Role(this, 'GitHubActionsTestingRole', {
roleName: 'GitHubActionsTesting',
assumedBy: gitHubPrincipal
assumedBy: new CompositePrincipal(
gitHubPrincipal,
currentAccountPrincipal
)
});
}
}
32 changes: 32 additions & 0 deletions testing/aws-testing-cdk/lib/common/KmsStack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import {Stack, StackProps} from 'aws-cdk-lib';
import {Construct} from 'constructs';
import {Role} from "aws-cdk-lib/aws-iam";
import {Key} from "aws-cdk-lib/aws-kms";

export interface KmsStackProps extends StackProps {
readonly testingRole: Role;
}

/**
* CDK stack that creates a common KMS key.
*/
export class KmsStack extends Stack {
readonly kmsKey: Key;

constructor(scope: Construct, id: string, props: KmsStackProps) {
super(scope, id, props);

this.kmsKey = new Key(this, 'DataPrepperTestingKey', {
alias: 'DataPrepperTesting',
description: 'Shared KMS key for testing any Data Prepper features that use KMS.'
});

this.kmsKey.grantEncryptDecrypt(props.testingRole)
}
}

0 comments on commit c2598c5

Please sign in to comment.