Skip to content

Commit

Permalink
Add deferred topic creation mode
Browse files Browse the repository at this point in the history
Kafka needs topics to exist before startup, RabbitMQ can only initialize the channels after startup.
  • Loading branch information
wasdennnoch committed May 4, 2024
1 parent 2fa7b3e commit a63deaa
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 10 deletions.
27 changes: 24 additions & 3 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package gg.beemo.latte.broker

import gg.beemo.latte.logging.Log
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.collections.HashSet
import kotlin.math.abs

fun interface TopicListener {
fun onMessage(topic: String, key: String, value: String, headers: BrokerMessageHeaders)
Expand All @@ -14,12 +17,25 @@ abstract class BrokerConnection {
abstract val serviceName: String
abstract val instanceId: String
abstract val supportsTopicHotSwap: Boolean
abstract val deferInitialTopicCreation: Boolean

protected val topicListeners: MutableMap<String, MutableSet<TopicListener>> = Collections.synchronizedMap(HashMap())
private val deferredTopicsToCreate: MutableSet<String> = Collections.synchronizedSet(HashSet())
private val hasStarted = AtomicBoolean(false)

private val log by Log

abstract suspend fun start()
suspend fun start() {
abstractStart()
hasStarted.set(true)
for (topic in deferredTopicsToCreate) {
createTopic(topic)
}
deferredTopicsToCreate.clear()
}

internal abstract suspend fun abstractStart()

open fun destroy() {
log.debug("Destroying BrokerConnection")
topicListeners.clear()
Expand Down Expand Up @@ -53,8 +69,13 @@ abstract class BrokerConnection {

internal fun on(topic: String, cb: TopicListener) {
topicListeners.computeIfAbsent(topic) {
log.debug("Creating new topic '{}'", topic)
createTopic(topic)
if (!hasStarted.get() && deferInitialTopicCreation) {
log.debug("Deferring creation of topic '{}' to after connected", topic)
deferredTopicsToCreate.add(topic)
} else {
log.debug("Creating new topic '{}'", topic)
createTopic(topic)
}
Collections.synchronizedSet(HashSet())
}.add(cb)
}
Expand Down
3 changes: 2 additions & 1 deletion latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class LocalConnection(
) : BrokerConnection() {

override val supportsTopicHotSwap = true
override val deferInitialTopicCreation = false

override suspend fun abstractSend(
topic: String,
Expand All @@ -30,7 +31,7 @@ class LocalConnection(
return headers.messageId
}

override suspend fun start() {
override suspend fun abstractStart() {
// Nothing to start :)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class KafkaConnection(
) : BrokerConnection() {

override val supportsTopicHotSwap = false
override val deferInitialTopicCreation = false
private val kafkaHostsString = kafkaHosts.joinToString(",")
private val log by Log

Expand Down Expand Up @@ -81,7 +82,7 @@ class KafkaConnection(
return headers.messageId
}

override suspend fun start() {
override suspend fun abstractStart() {
check(!isRunning) { "KafkaConnection is already running!" }
log.debug("Starting Kafka Connection")
createTopics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,36 @@ private class ChannelData(
var consumerTag: String? = null,
)

// TODO Temporary keys/topics created by RPC clients should ideally be cached and re-used,
// instead of being destroyed and recreated every time.

class RabbitConnection(
rabbitHosts: Array<String>,
override val serviceName: String,
override val instanceId: String,
private val useTls: Boolean = false,
private val username: String = "guest",
private val password: String = "guest",
) : BrokerConnection() {

override val supportsTopicHotSwap = true
override val deferInitialTopicCreation = true
private val rabbitAddresses = rabbitHosts.map(Address::parseAddress)
private val log by Log

private var connection: Connection? = null
private val channels = Collections.synchronizedMap(HashMap<String, ChannelData>())

override suspend fun start() {
connection = ConnectionFactory().apply {
override suspend fun abstractStart() {
connection = ConnectionFactory().also {
if (useTls) {
// TODO This will trust every cert, even self-signed ones
useSslProtocol()
it.useSslProtocol()
}
useNio()
it.useNio()
it.username = username
it.password = password
// TODO Investogate more properties, such as client-provided name
}.newConnection(rabbitAddresses, instanceId)
}

Expand All @@ -55,8 +64,9 @@ class RabbitConnection(
// through the same channel at the same time is fine though.
channelData.sendMutex.withLock {
val properties = AMQP.BasicProperties.Builder().apply {
// https://www.rabbitmq.com/docs/publishers#message-properties
deliveryMode(2) // Persistent
headers(headers.headers) // lol
// TODO Investigate other properties
}.build()
// TODO Later, we can set mandatory=true here and set up a dead letter exchange.
// This would be especially useful for raid bans, so that they don't get lost.
Expand Down

0 comments on commit a63deaa

Please sign in to comment.