diff --git a/build.gradle.kts b/build.gradle.kts index 10f251c..c9f8eef 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -66,7 +66,7 @@ subprojects { extensions.configure { kotlin { // by default the target is every '.kt' and '.kts' file in the java source sets - ktfmt().dropboxStyle().configure { + ktfmt().kotlinlangStyle().configure { it.setMaxWidth(120) } licenseHeaderFile( diff --git a/kafka-azure-oauth/src/main/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandler.kt b/kafka-azure-oauth/src/main/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandler.kt index a36ee08..12a8bf3 100644 --- a/kafka-azure-oauth/src/main/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandler.kt +++ b/kafka-azure-oauth/src/main/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandler.kt @@ -101,7 +101,8 @@ class OAuthAuthenticateCallbackHandler : AuthenticateCallbackHandler { aadParameters.scopes(), authResult.expiresOnDate().toInstant().toEpochMilli(), aadClient.clientId(), - System.currentTimeMillis()) + System.currentTimeMillis() + ) } catch (e: InterruptedException) { Thread.currentThread().interrupt() throw KafkaOAuthException("Retrieving JWT token was interrupted", e) diff --git a/kafka-azure-oauth/src/test/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandlerTest.kt b/kafka-azure-oauth/src/test/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandlerTest.kt index 71e57f0..cd7d4f4 100644 --- a/kafka-azure-oauth/src/test/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandlerTest.kt +++ b/kafka-azure-oauth/src/test/kotlin/com/gxf/utilities/kafka/oauth/handler/OAuthAuthenticateCallbackHandlerTest.kt @@ -28,7 +28,10 @@ class OAuthAuthenticateCallbackHandlerTest { val appConfig = AppConfigurationEntry( - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", REQUIRED, options()) + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", + REQUIRED, + options() + ) handler.configure(emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, listOf(appConfig)) assertEquals(clientId, handler.clientId) @@ -69,5 +72,6 @@ class OAuthAuthenticateCallbackHandlerTest { CLIENT_ID_CONFIG to clientId, TOKEN_ENDPOINT_CONFIG to tokenEndpoint, SCOPE_CONFIG to scopes, - TOKEN_FILE_CONFIG to tokenFilePath) + TOKEN_FILE_CONFIG to tokenFilePath + ) } diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt index 595fe5d..2f80449 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt @@ -47,7 +47,7 @@ class MessageSigner(properties: MessageSigningProperties) { } fun canSignMessages(): Boolean { - return this.signingEnabled && this.signingKey != null + return signingEnabled && signingKey != null } /** @@ -62,8 +62,8 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ fun signUsingField(message: SignableMessageWrapper): T { - if (this.signingEnabled) { - val signatureBytes = this.signature(message) + if (signingEnabled) { + val signatureBytes = signature(message) message.setSignature(signatureBytes) } return message.message @@ -82,8 +82,8 @@ class MessageSigner(properties: MessageSigningProperties) { fun signUsingHeader( producerRecord: ProducerRecord ): ProducerRecord { - if (this.signingEnabled) { - val signature = this.signature(producerRecord) + if (signingEnabled) { + val signature = signature(producerRecord) producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature.array()) } return producerRecord @@ -103,18 +103,18 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ private fun signature(message: SignableMessageWrapper<*>): ByteBuffer { - check(this.canSignMessages()) { + check(canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } val oldSignature = message.getSignature() - message.setSignature(null) - val byteBuffer = this.toByteBuffer(message) + message.clearSignature() + val byteBuffer = toByteBuffer(message) try { return signature(byteBuffer) } catch (e: SignatureException) { throw UncheckedSecurityException("Unable to sign message", e) } finally { - message.setSignature(oldSignature) + oldSignature?.let { message.setSignature(it) } } } @@ -132,13 +132,13 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ private fun signature(producerRecord: ProducerRecord): ByteBuffer { - check(this.canSignMessages()) { + check(canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } val oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE) val specificRecordBase = producerRecord.value() - val byteBuffer = this.toByteBuffer(specificRecordBase) + val byteBuffer = toByteBuffer(specificRecordBase) try { return signature(byteBuffer) } catch (e: SignatureException) { @@ -152,8 +152,8 @@ class MessageSigner(properties: MessageSigningProperties) { private fun signature(byteBuffer: ByteBuffer): ByteBuffer { val messageBytes: ByteBuffer = - if (this.stripAvroHeader) { - this.stripAvroHeader(byteBuffer) + if (stripAvroHeader) { + stripAvroHeader(byteBuffer) } else { byteBuffer } @@ -163,7 +163,7 @@ class MessageSigner(properties: MessageSigningProperties) { } fun canVerifyMessageSignatures(): Boolean { - return this.signingEnabled && this.verificationKey != null + return signingEnabled && verificationKey != null } /** @@ -173,7 +173,7 @@ class MessageSigner(properties: MessageSigningProperties) { * @return `true` if the signature of the given `message` was verified; `false` if not. */ fun verifyUsingField(message: SignableMessageWrapper): Boolean { - if (!this.canVerifyMessageSignatures()) { + if (!canVerifyMessageSignatures()) { logger.error("This MessageSigner is not configured for verification, it can only be used for signing") return false } @@ -186,8 +186,8 @@ class MessageSigner(properties: MessageSigningProperties) { } try { - message.setSignature(null) - return this.verifySignatureBytes(messageSignature, this.toByteBuffer(message)) + message.clearSignature() + return verifySignatureBytes(messageSignature, toByteBuffer(message)) } catch (e: Exception) { logger.error("Unable to verify message signature", e) return false @@ -203,7 +203,7 @@ class MessageSigner(properties: MessageSigningProperties) { * @return `true` if the signature of the given `consumerRecord` was verified; `false` if not. SignatureException. */ fun verifyUsingHeader(consumerRecord: ConsumerRecord): Boolean { - if (!this.canVerifyMessageSignatures()) { + if (!canVerifyMessageSignatures()) { logger.error("This MessageSigner is not configured for verification, it can only be used for signing") return false } @@ -222,7 +222,7 @@ class MessageSigner(properties: MessageSigningProperties) { try { val specificRecordBase: SpecificRecordBase = consumerRecord.value() - return this.verifySignatureBytes(ByteBuffer.wrap(signatureBytes), this.toByteBuffer(specificRecordBase)) + return verifySignatureBytes(ByteBuffer.wrap(signatureBytes), toByteBuffer(specificRecordBase)) } catch (e: Exception) { logger.error("Unable to verify message signature", e) return false @@ -232,8 +232,8 @@ class MessageSigner(properties: MessageSigningProperties) { @Throws(SignatureException::class) private fun verifySignatureBytes(signatureBytes: ByteBuffer, messageByteBuffer: ByteBuffer): Boolean { val messageBytes: ByteBuffer = - if (this.stripAvroHeader) { - this.stripAvroHeader(messageByteBuffer) + if (stripAvroHeader) { + stripAvroHeader(messageByteBuffer) } else { messageByteBuffer } @@ -249,7 +249,7 @@ class MessageSigner(properties: MessageSigningProperties) { } private fun stripAvroHeader(bytes: ByteBuffer): ByteBuffer { - if (this.hasAvroHeader(bytes)) { + if (hasAvroHeader(bytes)) { return ByteBuffer.wrap(Arrays.copyOfRange(bytes.array(), AVRO_HEADER_LENGTH, bytes.array().size)) } return bytes @@ -274,11 +274,12 @@ class MessageSigner(properties: MessageSigningProperties) { override fun toString(): String { return String.format( "MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", sign=%b, verify=%b]", - this.signatureAlgorithm, - this.keyAlgorithm, - this.signatureProvider, - this.canSignMessages(), - this.canVerifyMessageSignatures()) + signatureAlgorithm, + keyAlgorithm, + signatureProvider, + canSignMessages(), + canVerifyMessageSignatures() + ) } companion object { diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt index 2a7af0c..6637c16 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt @@ -10,14 +10,25 @@ import java.nio.ByteBuffer * Wrapper for signable messages. Because these messages are generated from Avro schemas, they can't be changed. This * wrapper unifies them for the MessageSigner. */ -abstract class SignableMessageWrapper(val message: T) { +class SignableMessageWrapper( + val message: T, + private val messageGetter: (T) -> ByteBuffer, + private val signatureGetter: (T) -> ByteBuffer?, + private val signatureSetter: (T, ByteBuffer?) -> Unit, +) { /** @return ByteBuffer of the whole message */ - @Throws(IOException::class) abstract fun toByteBuffer(): ByteBuffer + @Throws(IOException::class) internal fun toByteBuffer(): ByteBuffer = messageGetter(message) /** @return ByteBuffer of the signature in the message */ - abstract fun getSignature(): ByteBuffer? + internal fun getSignature(): ByteBuffer? = signatureGetter(message) /** @param signature The signature in ByteBuffer form to be set on the message */ - abstract fun setSignature(signature: ByteBuffer?) + internal fun setSignature(signature: ByteBuffer) { + signatureSetter(message, signature) + } + + internal fun clearSignature() { + signatureSetter(message, null) + } } diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt index c34122b..9bc80bf 100644 --- a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt @@ -5,7 +5,6 @@ package com.gxf.utilities.kafka.message.signing import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets import java.security.SecureRandom import java.util.Random import java.util.function.Consumer @@ -28,22 +27,24 @@ class MessageSignerTest { signatureProvider = "SunRsaSign", keyAlgorithm = "RSA", privateKeyFile = ClassPathResource("/rsa-private.pem"), - publicKeyFile = ClassPathResource("/rsa-public.pem")) + publicKeyFile = ClassPathResource("/rsa-public.pem") + ) private val messageSigner = MessageSigner(messageSignerProperties) @Test fun signsMessageWithoutSignature() { - val messageWrapper: SignableMessageWrapper<*> = this.messageWrapper() + val messageWrapper: SignableMessageWrapper = messageWrapper() messageSigner.signUsingField(messageWrapper) assertThat(messageWrapper.getSignature()).isNotNull() + assertThat(messageWrapper.message.signature).isEqualTo(messageWrapper.getSignature()) } @Test fun signsRecordHeaderWithoutSignature() { - val record = this.producerRecord() + val record = producerRecord() messageSigner.signUsingHeader(record) @@ -52,8 +53,8 @@ class MessageSignerTest { @Test fun signsMessageReplacingSignature() { - val randomSignature = this.randomSignature() - val messageWrapper = this.messageWrapper() + val randomSignature = randomSignature() + val messageWrapper = messageWrapper() messageWrapper.setSignature(randomSignature) val actualSignatureBefore = messageWrapper.getSignature() @@ -67,8 +68,8 @@ class MessageSignerTest { @Test fun signsRecordHeaderReplacingSignature() { - val randomSignature = this.randomSignature() - val record = this.producerRecord() + val randomSignature = randomSignature() + val record = producerRecord() record.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) val actualSignatureBefore = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() @@ -82,7 +83,7 @@ class MessageSignerTest { @Test fun verifiesMessagesWithValidSignature() { - val message = this.properlySignedMessage() + val message = properlySignedMessage() val signatureWasVerified = messageSigner.verifyUsingField(message) @@ -91,7 +92,7 @@ class MessageSignerTest { @Test fun verifiesRecordsWithValidSignature() { - val signedRecord = this.properlySignedRecord() + val signedRecord = properlySignedRecord() val result = messageSigner.verifyUsingHeader(signedRecord) @@ -100,7 +101,7 @@ class MessageSignerTest { @Test fun doesNotVerifyMessagesWithoutSignature() { - val messageWrapper = this.messageWrapper() + val messageWrapper = messageWrapper() val validSignature = messageSigner.verifyUsingField(messageWrapper) @@ -109,7 +110,7 @@ class MessageSignerTest { @Test fun doesNotVerifyRecordsWithoutSignature() { - val consumerRecord = this.consumerRecord() + val consumerRecord = consumerRecord() val validSignature = messageSigner.verifyUsingHeader(consumerRecord) @@ -118,8 +119,8 @@ class MessageSignerTest { @Test fun doesNotVerifyMessagesWithIncorrectSignature() { - val randomSignature = this.randomSignature() - val messageWrapper = this.messageWrapper(randomSignature) + val randomSignature = randomSignature() + val messageWrapper = messageWrapper(randomSignature) val validSignature = messageSigner.verifyUsingField(messageWrapper) @@ -128,8 +129,8 @@ class MessageSignerTest { @Test fun doesNotVerifyRecordsWithIncorrectSignature() { - val consumerRecord = this.consumerRecord() - val randomSignature = this.randomSignature() + val consumerRecord = consumerRecord() + val randomSignature = randomSignature() consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) val validSignature = messageSigner.verifyUsingHeader(consumerRecord) @@ -139,7 +140,7 @@ class MessageSignerTest { @Test fun verifiesMessagesPreservingTheSignatureAndItsProperties() { - val message = this.properlySignedMessage() + val message = properlySignedMessage() val originalSignature = message.getSignature() messageSigner.verifyUsingField(message) @@ -157,26 +158,26 @@ class MessageSignerTest { assertThat(messageSignerSigningDisabled.canVerifyMessageSignatures()).isFalse() } - private fun messageWrapper(): TestableWrapper { - return TestableWrapper() + private fun messageWrapper(signature: ByteBuffer? = null): SignableMessageWrapper { + val testableMessage = TestableMessage(signature = signature) + return SignableMessageWrapper( + testableMessage, + TestableMessage::getMsgBytes, + TestableMessage::getSigBytes, + TestableMessage::setSigBytes + ) } - private fun messageWrapper(signature: ByteBuffer): TestableWrapper { - val testableWrapper = TestableWrapper() - testableWrapper.setSignature(signature) - return testableWrapper - } - - private fun properlySignedMessage(): TestableWrapper { - val messageWrapper = this.messageWrapper() + private fun properlySignedMessage(): SignableMessageWrapper { + val messageWrapper = messageWrapper() messageSigner.signUsingField(messageWrapper) return messageWrapper } private fun properlySignedRecord(): ConsumerRecord { - val producerRecord = this.producerRecord() + val producerRecord = producerRecord() messageSigner.signUsingHeader(producerRecord) - return this.producerRecordToConsumerRecord(producerRecord) + return producerRecordToConsumerRecord(producerRecord) } private fun producerRecordToConsumerRecord(producerRecord: ProducerRecord): ConsumerRecord { @@ -197,11 +198,11 @@ class MessageSignerTest { } private fun producerRecord(): ProducerRecord { - return ProducerRecord("topic", this.message()) + return ProducerRecord("topic", message()) } private fun consumerRecord(): ConsumerRecord { - return ConsumerRecord("topic", 0, 123L, null, this.message()) + return ConsumerRecord("topic", 0, 123L, null, message()) } private fun message(): Message { @@ -213,7 +214,8 @@ class MessageSignerTest { override fun getSchema(): Schema { return Schema.Parser() .parse( - """{"type":"record","name":"Message","namespace":"com.alliander.osgp.kafka.message.signing","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}""") + """{"type":"record","name":"Message","namespace":"com.alliander.osgp.kafka.message.signing","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}""" + ) } override fun get(field: Int): Any { @@ -221,23 +223,21 @@ class MessageSignerTest { } override fun put(field: Int, value: Any) { - this.message = value.toString() + message = value.toString() } } - private class TestableWrapper : SignableMessageWrapper("Some test message") { - private var signature: ByteBuffer? = null + /** + * Object to test the wrapper with. Intentionally chose function names that are different from the ones in the + * wrapper class + */ + private class TestableMessage(var message: ByteBuffer = ByteBuffer.allocate(3), var signature: ByteBuffer? = null) { + fun getMsgBytes(): ByteBuffer = ByteBuffer.wrap(message.array()) - override fun toByteBuffer(): ByteBuffer { - return ByteBuffer.wrap(message.toByteArray(StandardCharsets.UTF_8)) - } - - override fun getSignature(): ByteBuffer? { - return this.signature - } + fun getSigBytes(): ByteBuffer? = signature - override fun setSignature(signature: ByteBuffer?) { - this.signature = signature + fun setSigBytes(newSignature: ByteBuffer?) { + signature = newSignature } } } diff --git a/oauth-token-client/src/main/kotlin/com/gxf/utilities/spring/oauth/config/OAuthClientConfig.kt b/oauth-token-client/src/main/kotlin/com/gxf/utilities/spring/oauth/config/OAuthClientConfig.kt index 3f42b5a..23e2637 100644 --- a/oauth-token-client/src/main/kotlin/com/gxf/utilities/spring/oauth/config/OAuthClientConfig.kt +++ b/oauth-token-client/src/main/kotlin/com/gxf/utilities/spring/oauth/config/OAuthClientConfig.kt @@ -42,7 +42,8 @@ class OAuthClientConfig { val credential: IClientCredential = ClientCredentialFactory.createFromCertificate( getPrivateKey(Objects.requireNonNull(clientData.privateKey)), - getCertificate(Objects.requireNonNull(clientData.certificate))) + getCertificate(Objects.requireNonNull(clientData.certificate)) + ) return try { ConfidentialClientApplication.builder(clientData.clientId, credential) .authority(clientData.tokenEndpoint)