diff --git a/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt index 3eb88b3..f97f832 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt @@ -1,18 +1,22 @@ package gg.beemo.latte.broker.rabbitmq -import com.rabbitmq.client.Address -import com.rabbitmq.client.Channel -import com.rabbitmq.client.Connection -import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.* import gg.beemo.latte.broker.BrokerConnection import gg.beemo.latte.broker.BrokerMessageHeaders import gg.beemo.latte.broker.MessageId import gg.beemo.latte.logging.Log -import java.util.Collections +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean -// TODO Implementation considerations: -// - Channels are not thread-safe (for sending); need to create a coroutine-wrapper around them -// - "Consuming in one thread and publishing in another thread on a shared channel can be safe." + +private class ChannelData( + val channel: Channel, + val sendMutex: Mutex = Mutex(), + var isConsuming: AtomicBoolean = AtomicBoolean(false), + var consumerTag: String? = null, +) class RabbitConnection( rabbitHosts: Array, @@ -26,7 +30,7 @@ class RabbitConnection( private val log by Log private var connection: Connection? = null - private val channels = Collections.synchronizedMap(HashMap()) + private val channels = Collections.synchronizedMap(HashMap()) override suspend fun start() { connection = ConnectionFactory().apply { @@ -34,8 +38,8 @@ class RabbitConnection( // TODO This will trust every cert, even self-signed ones useSslProtocol() } + useNio() }.newConnection(rabbitAddresses, instanceId) - // TODO Create exchange } override suspend fun abstractSend( @@ -44,7 +48,25 @@ class RabbitConnection( value: String, headers: BrokerMessageHeaders ): MessageId { - TODO() + if (shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers)) { + + val channelData = getOrCreateChannel(topic) + // RabbitMQ's channels are not thread-safe for sending. Consuming and sending + // through the same channel at the same time is fine though. + channelData.sendMutex.withLock { + val properties = AMQP.BasicProperties.Builder().apply { + headers(headers.headers) // lol + // TODO Investigate other properties + }.build() + // TODO Later, we can set mandatory=true here and set up a dead letter exchange. + // This would be especially useful for raid bans, so that they don't get lost. + // Though iirc they should be queued, no? Investigate queueing behavior, again. + channelData.channel.basicPublish(topic, key, properties, value.toByteArray()) + } + + } + + return headers.messageId } override fun destroy() { @@ -55,16 +77,57 @@ class RabbitConnection( } override fun createTopic(topic: String) { - val channel = channels.computeIfAbsent(topic) { - val connection = checkNotNull(connection) { "Connection not open" } - connection.createChannel() + val channelData = getOrCreateChannel(topic) + if (channelData.isConsuming.getAndSet(true)) { + return + } + val consumer = object : DefaultConsumer(channelData.channel) { + + override fun handleDelivery( + consumerTag: String, + envelope: Envelope, + properties: AMQP.BasicProperties, + body: ByteArray + ) { + val key = envelope.routingKey ?: "" + val value = String(body) + val headers = BrokerMessageHeaders(properties.headers.mapValues { it.value.toString() }) + dispatchIncomingMessage(topic, key, value, headers) + channel.basicAck(envelope.deliveryTag, false) + } + + override fun handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) { + if (sig.isInitiatedByApplication) { + return + } + log.error("RabbitMQ consumer for topic $topic has shut down unexpectedly", sig) + // TODO Automatic Reconnect? + } } - // TODO Consume + channelData.consumerTag = channelData.channel.basicConsume(createQueueName(topic), false, consumer) } override fun removeTopic(topic: String) { val channel = channels.remove(topic) - channel?.close() + channel?.channel?.queueDelete(createQueueName(topic)) + channel?.channel?.close() } + private fun getOrCreateChannel(topic: String): ChannelData { + return channels.computeIfAbsent(topic) { + val connection = checkNotNull(connection) { "Connection not open" } + val channel = connection.createChannel().apply { + val exchangeName = topic + val queueName = createQueueName(topic) + val routingKey = "#" + exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true) + queueDeclare(queueName, true, false, false, null) + queueBind(queueName, exchangeName, routingKey) + } + ChannelData(channel) + } + } + + private fun createQueueName(topic: String) = "$serviceName.$instanceId.$topic" + }