Skip to content

Commit

Permalink
feat: Initial journal implementation (#2)
Browse files Browse the repository at this point in the history
* write and replay events
* client provider
* config structure
  • Loading branch information
patriknw authored May 27, 2024
1 parent e6b93f2 commit e89efb0
Show file tree
Hide file tree
Showing 18 changed files with 892 additions and 3 deletions.
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ akka.persistence.dynamodb {
journal {
class = "akka.persistence.dynamodb.journal.DynamoDBJournal"

# name of the table to use for events
table = "event_journal"

# replay filter not needed for this plugin
replay-filter.mode = off

Expand All @@ -29,3 +32,11 @@ akka.persistence.dynamodb {
}
}
// #query-settings
akka.persistence.dynamodb {
client {
host = "localhost"
port = 8000
}
}
// #client-settings

71 changes: 71 additions & 0 deletions core/src/main/scala/akka/persistence/dynamodb/ClientProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import java.net.URI
import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.jdk.CollectionConverters._

import akka.Done
import akka.actor.CoordinatedShutdown
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

object ClientProvider extends ExtensionId[ClientProvider] {
def createExtension(system: ActorSystem[_]): ClientProvider = new ClientProvider(system)

// Java API
def get(system: ActorSystem[_]): ClientProvider = apply(system)
}
class ClientProvider(system: ActorSystem[_]) extends Extension {
private val clients = new ConcurrentHashMap[String, DynamoDbAsyncClient]
private val clientSettings = new ConcurrentHashMap[String, ClientSettings]

CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close DynamoDB clients") { () =>
// FIXME is this blocking, and should be run on blocking dispatcher?
clients.asScala.values.foreach(_.close())
Future.successful(Done)
}

def clientFor(configLocation: String): DynamoDbAsyncClient = {
clients.computeIfAbsent(
configLocation,
configLocation => {
val settings = clientSettingsFor(configLocation)
createClient(settings)
})
}

def clientSettingsFor(configLocation: String): ClientSettings = {
clientSettings.get(configLocation) match {
case null =>
val settings = ClientSettings(system.settings.config.getConfig(configLocation))
// it's just a cache so no need for guarding concurrent updates
clientSettings.put(configLocation, settings)
settings
case settings => settings
}
}

private def createClient(settings: ClientSettings): DynamoDbAsyncClient = {
// FIXME more config
DynamoDbAsyncClient.builder
.httpClientBuilder(NettyNioAsyncHttpClient.builder)
.endpointOverride(URI.create(s"http://${settings.host}:${settings.port}"))
.region(Region.US_WEST_2)
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummyKey", "dummySecret")))
.build()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import akka.annotation.InternalStableApi
import com.typesafe.config.Config

/**
* INTERNAL API
*/
@InternalStableApi
object DynamoDBSettings {

def apply(config: Config): DynamoDBSettings = {
val journalTable: String = config.getString("journal.table")
new DynamoDBSettings(journalTable)
}

}

/**
* INTERNAL API
*/
@InternalStableApi
final class DynamoDBSettings private (val journalTable: String) {

override def toString =
s"DynamoDBSettings($journalTable)"
}

object ClientSettings {
def apply(config: Config): ClientSettings =
new ClientSettings(host = config.getString("host"), port = config.getInt("port"))
}

final class ClientSettings(val host: String, val port: Int) {
override def toString: String =
s"ClientSettings($host, $port)"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.internal

import java.time.Instant
import java.time.temporal.ChronoUnit

import akka.annotation.InternalApi

/**
* INTERNAL API
*/
@InternalApi private[akka] object InstantFactory {

/**
* Current time truncated to microseconds.
*/
def now(): Instant =
Instant.now().truncatedTo(ChronoUnit.MICROS)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.internal

import java.util.{ HashMap => JHashMap }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.dynamodb.DynamoDBSettings
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.Put
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest
import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest

/**
* INTERNAL API
*/
@InternalApi private[akka] object JournalDao {
private val log: Logger = LoggerFactory.getLogger(classOf[JournalDao])
}

/**
* INTERNAL API
*/
@InternalApi private[akka] class JournalDao(
system: ActorSystem[_],
settings: DynamoDBSettings,
client: DynamoDbAsyncClient) {
import JournalDao._

private val persistenceExt: Persistence = Persistence(system)

private implicit val ec: ExecutionContext = system.executionContext

def writeEvents(events: Seq[SerializedJournalItem]): Future[Done] = {
require(events.nonEmpty)

// it's always the same persistenceId for all events
val persistenceId = events.head.persistenceId
val slice = persistenceExt.sliceForPersistenceId(persistenceId)

def putItemAttributes(item: SerializedJournalItem) = {
import JournalAttributes._
val attributes = new JHashMap[String, AttributeValue]
attributes.put(Pid, AttributeValue.fromS(persistenceId))
attributes.put(SeqNr, AttributeValue.fromN(item.seqNr.toString))
attributes.put(Slice, AttributeValue.fromN(slice.toString))
attributes.put(EventSerId, AttributeValue.fromN(item.serId.toString))
attributes.put(EventSerManifest, AttributeValue.fromS(item.serManifest))
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(item.payload.get)))
attributes.put(Writer, AttributeValue.fromS(item.writerUuid))
attributes
}

val totalEvents = events.size
if (totalEvents == 1) {
val req = PutItemRequest
.builder()
.item(putItemAttributes(events.head))
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.tableName(settings.journalTable)
.build()
val result = client.putItem(req).asScala

if (log.isDebugEnabled()) {
result.foreach { response =>
log.debug(
"Wrote [{}] events for persistenceId [{}], consumed [{}] WCU",
1,
persistenceId,
response.consumedCapacity.capacityUnits)
}
}
result.map(_ => Done)(ExecutionContexts.parasitic)
} else {
val writeItems =
events.map { item =>
TransactWriteItem
.builder()
.put(Put.builder().item(putItemAttributes(item)).tableName(settings.journalTable).build())
.build()
}.asJava

val req = TransactWriteItemsRequest
.builder()
.transactItems(writeItems)
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.transactWriteItems(req).asScala

result.failed.foreach { exc => println(exc) }

if (log.isDebugEnabled()) {
result.foreach { response =>
log.debug(
"Wrote [{}] events for persistenceId [{}], consumed [{}] WCU",
events.size,
persistenceId,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
}
}
result.map(_ => Done)(ExecutionContexts.parasitic)
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.internal

import java.time.Instant

import scala.jdk.CollectionConverters._

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.QueryRequest

/**
* INTERNAL API
*/
@InternalApi private[akka] class QueryDao(
system: ActorSystem[_],
settings: DynamoDBSettings,
client: DynamoDbAsyncClient) {
def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[SerializedJournalItem, NotUsed] = {

val expressionAttributeValues =
Map(
":pid" -> AttributeValue.fromS(persistenceId),
":from" -> AttributeValue.fromN(fromSequenceNr.toString),
":to" -> AttributeValue.fromN(toSequenceNr.toString)).asJava

import JournalAttributes._
val req = QueryRequest.builder
.tableName(settings.journalTable)
.keyConditionExpression(s"$Pid = :pid AND $SeqNr BETWEEN :from AND :to")
.expressionAttributeValues(expressionAttributeValues)
.build()

val publisher = client.queryPaginator(req)

Source.fromPublisher(publisher).mapConcat { response =>
response.items().iterator().asScala.map { item =>
// FIXME read all attributes
SerializedJournalItem(
slice = item.get(Slice).n().toInt,
entityType = "",
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = Instant.EPOCH,
payload = Some(item.get(EventPayload).b().asByteArray()),
serId = item.get(EventSerId).n().toInt,
serManifest = item.get(EventSerManifest).s(),
writerUuid = item.get(Writer).s(),
tags = Set.empty,
metadata = None)
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.internal

import java.time.Instant

import akka.annotation.InternalApi

final case class SerializedJournalItem(
slice: Int,
entityType: String,
persistenceId: String,
seqNr: Long,
writeTimestamp: Instant,
payload: Option[Array[Byte]],
serId: Int,
serManifest: String,
writerUuid: String,
tags: Set[String],
metadata: Option[SerializedEventMetadata])

final case class SerializedEventMetadata(serId: Int, serManifest: String, payload: Array[Byte])

/**
* INTERNAL API
*/
@InternalApi private[akka] object JournalAttributes {
// FIXME should attribute names be shorter?
val Pid = "pid"
val SeqNr = "seq_nr"
val Slice = "slice"
val EventSerId = "event_ser_id"
val EventSerManifest = "event_ser_manifest"
val EventPayload = "event_payload"
val Writer = "writer"
}
Loading

0 comments on commit e89efb0

Please sign in to comment.