Skip to content

Commit

Permalink
Continue implementing RabbitMQ connection
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Mar 18, 2024
1 parent 994da22 commit 2fa7b3e
Showing 1 changed file with 79 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package gg.beemo.latte.broker.rabbitmq

import com.rabbitmq.client.Address
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.*
import gg.beemo.latte.broker.BrokerConnection
import gg.beemo.latte.broker.BrokerMessageHeaders
import gg.beemo.latte.broker.MessageId
import gg.beemo.latte.logging.Log
import java.util.Collections
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean

// TODO Implementation considerations:
// - Channels are not thread-safe (for sending); need to create a coroutine-wrapper around them
// - "Consuming in one thread and publishing in another thread on a shared channel can be safe."

private class ChannelData(
val channel: Channel,
val sendMutex: Mutex = Mutex(),
var isConsuming: AtomicBoolean = AtomicBoolean(false),
var consumerTag: String? = null,
)

class RabbitConnection(
rabbitHosts: Array<String>,
Expand All @@ -26,16 +30,16 @@ class RabbitConnection(
private val log by Log

private var connection: Connection? = null
private val channels = Collections.synchronizedMap(HashMap<String, Channel>())
private val channels = Collections.synchronizedMap(HashMap<String, ChannelData>())

override suspend fun start() {
connection = ConnectionFactory().apply {
if (useTls) {
// TODO This will trust every cert, even self-signed ones
useSslProtocol()
}
useNio()
}.newConnection(rabbitAddresses, instanceId)
// TODO Create exchange
}

override suspend fun abstractSend(
Expand All @@ -44,7 +48,25 @@ class RabbitConnection(
value: String,
headers: BrokerMessageHeaders
): MessageId {
TODO()
if (shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers)) {

val channelData = getOrCreateChannel(topic)
// RabbitMQ's channels are not thread-safe for sending. Consuming and sending
// through the same channel at the same time is fine though.
channelData.sendMutex.withLock {
val properties = AMQP.BasicProperties.Builder().apply {
headers(headers.headers) // lol
// TODO Investigate other properties
}.build()
// TODO Later, we can set mandatory=true here and set up a dead letter exchange.
// This would be especially useful for raid bans, so that they don't get lost.
// Though iirc they should be queued, no? Investigate queueing behavior, again.
channelData.channel.basicPublish(topic, key, properties, value.toByteArray())
}

}

return headers.messageId
}

override fun destroy() {
Expand All @@ -55,16 +77,57 @@ class RabbitConnection(
}

override fun createTopic(topic: String) {
val channel = channels.computeIfAbsent(topic) {
val connection = checkNotNull(connection) { "Connection not open" }
connection.createChannel()
val channelData = getOrCreateChannel(topic)
if (channelData.isConsuming.getAndSet(true)) {
return
}
val consumer = object : DefaultConsumer(channelData.channel) {

override fun handleDelivery(
consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray
) {
val key = envelope.routingKey ?: ""
val value = String(body)
val headers = BrokerMessageHeaders(properties.headers.mapValues { it.value.toString() })
dispatchIncomingMessage(topic, key, value, headers)
channel.basicAck(envelope.deliveryTag, false)
}

override fun handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) {
if (sig.isInitiatedByApplication) {
return
}
log.error("RabbitMQ consumer for topic $topic has shut down unexpectedly", sig)
// TODO Automatic Reconnect?
}
}
// TODO Consume
channelData.consumerTag = channelData.channel.basicConsume(createQueueName(topic), false, consumer)
}

override fun removeTopic(topic: String) {
val channel = channels.remove(topic)
channel?.close()
channel?.channel?.queueDelete(createQueueName(topic))
channel?.channel?.close()
}

private fun getOrCreateChannel(topic: String): ChannelData {
return channels.computeIfAbsent(topic) {
val connection = checkNotNull(connection) { "Connection not open" }
val channel = connection.createChannel().apply {
val exchangeName = topic
val queueName = createQueueName(topic)
val routingKey = "#"
exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true)
queueDeclare(queueName, true, false, false, null)
queueBind(queueName, exchangeName, routingKey)
}
ChannelData(channel)
}
}

private fun createQueueName(topic: String) = "$serviceName.$instanceId.$topic"

}

0 comments on commit 2fa7b3e

Please sign in to comment.