Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FDP-2758: New SignableMessageWrapper that doesn't need subclassing #31

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ subprojects {
extensions.configure<SpotlessExtension> {
kotlin {
// by default the target is every '.kt' and '.kts' file in the java source sets
ktfmt().dropboxStyle().configure {
ktfmt().kotlinlangStyle().configure {
it.setMaxWidth(120)
}
licenseHeaderFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class OAuthAuthenticateCallbackHandler : AuthenticateCallbackHandler {
aadParameters.scopes(),
authResult.expiresOnDate().toInstant().toEpochMilli(),
aadClient.clientId(),
System.currentTimeMillis())
System.currentTimeMillis()
)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw KafkaOAuthException("Retrieving JWT token was interrupted", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class OAuthAuthenticateCallbackHandlerTest {

val appConfig =
AppConfigurationEntry(
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule", REQUIRED, options())
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule",
REQUIRED,
options()
)
handler.configure(emptyMap<String?, Any>(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, listOf(appConfig))

assertEquals(clientId, handler.clientId)
Expand Down Expand Up @@ -69,5 +72,6 @@ class OAuthAuthenticateCallbackHandlerTest {
CLIENT_ID_CONFIG to clientId,
TOKEN_ENDPOINT_CONFIG to tokenEndpoint,
SCOPE_CONFIG to scopes,
TOKEN_FILE_CONFIG to tokenFilePath)
TOKEN_FILE_CONFIG to tokenFilePath
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MessageSigner(properties: MessageSigningProperties) {
}

fun canSignMessages(): Boolean {
return this.signingEnabled && this.signingKey != null
return signingEnabled && signingKey != null
}

/**
Expand All @@ -62,8 +62,8 @@ class MessageSigner(properties: MessageSigningProperties) {
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
*/
fun <T> signUsingField(message: SignableMessageWrapper<T>): T {
if (this.signingEnabled) {
val signatureBytes = this.signature(message)
if (signingEnabled) {
val signatureBytes = signature(message)
message.setSignature(signatureBytes)
}
return message.message
Expand All @@ -82,8 +82,8 @@ class MessageSigner(properties: MessageSigningProperties) {
fun signUsingHeader(
producerRecord: ProducerRecord<String, out SpecificRecordBase>
): ProducerRecord<String, out SpecificRecordBase> {
if (this.signingEnabled) {
val signature = this.signature(producerRecord)
if (signingEnabled) {
val signature = signature(producerRecord)
producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature.array())
}
return producerRecord
Expand All @@ -103,18 +103,18 @@ class MessageSigner(properties: MessageSigningProperties) {
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
*/
private fun signature(message: SignableMessageWrapper<*>): ByteBuffer {
check(this.canSignMessages()) {
check(canSignMessages()) {
"This MessageSigner is not configured for signing, it can only be used for verification"
}
val oldSignature = message.getSignature()
message.setSignature(null)
val byteBuffer = this.toByteBuffer(message)
message.clearSignature()
val byteBuffer = toByteBuffer(message)
try {
return signature(byteBuffer)
} catch (e: SignatureException) {
throw UncheckedSecurityException("Unable to sign message", e)
} finally {
message.setSignature(oldSignature)
oldSignature?.let { message.setSignature(it) }
}
}

Expand All @@ -132,13 +132,13 @@ class MessageSigner(properties: MessageSigningProperties) {
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
*/
private fun signature(producerRecord: ProducerRecord<String, out SpecificRecordBase>): ByteBuffer {
check(this.canSignMessages()) {
check(canSignMessages()) {
"This MessageSigner is not configured for signing, it can only be used for verification"
}
val oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE)
producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE)
val specificRecordBase = producerRecord.value()
val byteBuffer = this.toByteBuffer(specificRecordBase)
val byteBuffer = toByteBuffer(specificRecordBase)
try {
return signature(byteBuffer)
} catch (e: SignatureException) {
Expand All @@ -152,8 +152,8 @@ class MessageSigner(properties: MessageSigningProperties) {

private fun signature(byteBuffer: ByteBuffer): ByteBuffer {
val messageBytes: ByteBuffer =
if (this.stripAvroHeader) {
this.stripAvroHeader(byteBuffer)
if (stripAvroHeader) {
stripAvroHeader(byteBuffer)
} else {
byteBuffer
}
Expand All @@ -163,7 +163,7 @@ class MessageSigner(properties: MessageSigningProperties) {
}

fun canVerifyMessageSignatures(): Boolean {
return this.signingEnabled && this.verificationKey != null
return signingEnabled && verificationKey != null
}

/**
Expand All @@ -173,7 +173,7 @@ class MessageSigner(properties: MessageSigningProperties) {
* @return `true` if the signature of the given `message` was verified; `false` if not.
*/
fun <T> verifyUsingField(message: SignableMessageWrapper<T>): Boolean {
if (!this.canVerifyMessageSignatures()) {
if (!canVerifyMessageSignatures()) {
logger.error("This MessageSigner is not configured for verification, it can only be used for signing")
return false
}
Expand All @@ -186,8 +186,8 @@ class MessageSigner(properties: MessageSigningProperties) {
}

try {
message.setSignature(null)
return this.verifySignatureBytes(messageSignature, this.toByteBuffer(message))
message.clearSignature()
return verifySignatureBytes(messageSignature, toByteBuffer(message))
} catch (e: Exception) {
logger.error("Unable to verify message signature", e)
return false
Expand All @@ -203,7 +203,7 @@ class MessageSigner(properties: MessageSigningProperties) {
* @return `true` if the signature of the given `consumerRecord` was verified; `false` if not. SignatureException.
*/
fun verifyUsingHeader(consumerRecord: ConsumerRecord<String, out SpecificRecordBase>): Boolean {
if (!this.canVerifyMessageSignatures()) {
if (!canVerifyMessageSignatures()) {
logger.error("This MessageSigner is not configured for verification, it can only be used for signing")
return false
}
Expand All @@ -222,7 +222,7 @@ class MessageSigner(properties: MessageSigningProperties) {

try {
val specificRecordBase: SpecificRecordBase = consumerRecord.value()
return this.verifySignatureBytes(ByteBuffer.wrap(signatureBytes), this.toByteBuffer(specificRecordBase))
return verifySignatureBytes(ByteBuffer.wrap(signatureBytes), toByteBuffer(specificRecordBase))
} catch (e: Exception) {
logger.error("Unable to verify message signature", e)
return false
Expand All @@ -232,8 +232,8 @@ class MessageSigner(properties: MessageSigningProperties) {
@Throws(SignatureException::class)
private fun verifySignatureBytes(signatureBytes: ByteBuffer, messageByteBuffer: ByteBuffer): Boolean {
val messageBytes: ByteBuffer =
if (this.stripAvroHeader) {
this.stripAvroHeader(messageByteBuffer)
if (stripAvroHeader) {
stripAvroHeader(messageByteBuffer)
} else {
messageByteBuffer
}
Expand All @@ -249,7 +249,7 @@ class MessageSigner(properties: MessageSigningProperties) {
}

private fun stripAvroHeader(bytes: ByteBuffer): ByteBuffer {
if (this.hasAvroHeader(bytes)) {
if (hasAvroHeader(bytes)) {
return ByteBuffer.wrap(Arrays.copyOfRange(bytes.array(), AVRO_HEADER_LENGTH, bytes.array().size))
}
return bytes
Expand All @@ -274,11 +274,12 @@ class MessageSigner(properties: MessageSigningProperties) {
override fun toString(): String {
return String.format(
"MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", sign=%b, verify=%b]",
this.signatureAlgorithm,
this.keyAlgorithm,
this.signatureProvider,
this.canSignMessages(),
this.canVerifyMessageSignatures())
signatureAlgorithm,
keyAlgorithm,
signatureProvider,
canSignMessages(),
canVerifyMessageSignatures()
)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@ import java.nio.ByteBuffer
* Wrapper for signable messages. Because these messages are generated from Avro schemas, they can't be changed. This
* wrapper unifies them for the MessageSigner.
*/
abstract class SignableMessageWrapper<T>(val message: T) {
class SignableMessageWrapper<T>(
val message: T,
private val messageGetter: (T) -> ByteBuffer,
private val signatureGetter: (T) -> ByteBuffer?,
private val signatureSetter: (T, ByteBuffer?) -> Unit,
) {

/** @return ByteBuffer of the whole message */
@Throws(IOException::class) abstract fun toByteBuffer(): ByteBuffer
@Throws(IOException::class) internal fun toByteBuffer(): ByteBuffer = messageGetter(message)

/** @return ByteBuffer of the signature in the message */
abstract fun getSignature(): ByteBuffer?
internal fun getSignature(): ByteBuffer? = signatureGetter(message)

/** @param signature The signature in ByteBuffer form to be set on the message */
abstract fun setSignature(signature: ByteBuffer?)
internal fun setSignature(signature: ByteBuffer) {
signatureSetter(message, signature)
}

internal fun clearSignature() {
signatureSetter(message, null)
}
}
Loading
Loading