Skip to content

Commit

Permalink
Migrate from Akka to Pekko
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jun 5, 2023
1 parent 18279a0 commit db4f3fe
Show file tree
Hide file tree
Showing 83 changed files with 608 additions and 517 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ jobs:

- name: Build project
env:
ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }}
ALPAKKA_S3_REGION_DEFAULT_REGION: us-west-2
ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }}
ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER: static
ALPAKKA_S3_REGION_PROVIDER: static
PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_PROVIDER: static
PEKKO_CONNECTORS_S3_REGION_DEFAULT_REGION: us-west-2
PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }}
PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }}
PEKKO_CONNECTORS_S3_REGION_PROVIDER: static
run: sbt ++${{ matrix.scala }} clean coverage test

- name: Compile docs
Expand Down
8 changes: 0 additions & 8 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1 @@
updatePullRequests = "always"

# https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
updates.pin = [{ groupId = "com.typesafe.akka", version = "2.6." }]
updates.ignore = [
{ groupId = "com.typesafe.akka" },
{ groupId = "com.lightbend.akka" },
{ groupId = "com.lightbend.akka.grpc" }
]
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
package io.aiven.guardian.kafka.backup.gcs

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.stream.alpakka.google.GoogleAttributes
import akka.stream.alpakka.google.GoogleSettings
import akka.stream.alpakka.googlecloud.storage.StorageObject
import akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaConsumerInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
import org.apache.pekko

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import pekko.actor.ActorSystem
import pekko.http.scaladsl.model.ContentTypes
import pekko.stream.connectors.google.GoogleAttributes
import pekko.stream.connectors.google.GoogleSettings
import pekko.stream.connectors.googlecloud.storage.StorageObject
import pekko.stream.connectors.googlecloud.storage.scaladsl.GCStorage
import pekko.stream.scaladsl.Sink
import pekko.util.ByteString

// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
// Alpakka to allow us to commit Kafka cursor whenever chunks are uploaded
// Pekko Connectors to allow us to commit Kafka cursor whenever chunks are uploaded
class BackupClient[T <: KafkaConsumerInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.SinkShape
import akka.stream.alpakka.s3._
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaConsumerInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.models.BackupObjectMetadata
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.apache.pekko

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import java.time.Instant

import pekko.Done
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.SinkShape
import pekko.stream.connectors.s3._
import pekko.stream.connectors.s3.scaladsl.S3
import pekko.stream.scaladsl._
import pekko.util.ByteString

class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
Expand Down Expand Up @@ -126,7 +128,7 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin
maybeParts <- getPartsFromUpload(key, uploadId)
} yield maybeParts.map { parts =>
val finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
case Some(part) if part.size >= pekko.stream.connectors.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
Expand Down Expand Up @@ -199,7 +201,7 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin

private[s3] def kafkaBatchSink
: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] =
// See https://doc.akka.io/docs/akka/current/stream/operators/Partition.html for an explanation on Partition
// See https://pekko.apache.org/docs/pekko/current/stream/operators/Partition.html for an explanation on Partition
Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import io.aiven.guardian.kafka.backup.KafkaConsumerInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.apache.pekko

import scala.collection.immutable
import scala.concurrent.Future

import java.util.concurrent.ConcurrentLinkedQueue

import pekko.Done
import pekko.actor.ActorSystem
import pekko.stream.connectors.s3.S3Headers
import pekko.stream.connectors.s3.S3Settings
import pekko.stream.connectors.s3.SuccessfulUploadPart
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Sink

class BackupClientChunkState[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package io.aiven.guardian.kafka.backup.s3

import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import com.softwaremill.diffx.generic.auto._
import com.softwaremill.diffx.scalatest.DiffMustMatcher._
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -14,7 +10,8 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators._
import io.aiven.guardian.kafka.s3.S3Spec
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import org.apache.pekko
import org.mdedetrich.pekko.stream.support.CirceStreamSupport
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.must.Matchers

Expand All @@ -25,6 +22,11 @@ import scala.language.postfixOps

import java.time.OffsetDateTime

import pekko.stream.connectors.s3.scaladsl.S3
import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source

trait BackupClientSpec extends S3Spec with Matchers with BeforeAndAfterAll with LazyLogging {

val ThrottleElements: Int = 100
Expand All @@ -50,7 +52,7 @@ trait BackupClientSpec extends S3Spec with Matchers with BeforeAndAfterAll with
val calculatedFuture = for {
_ <- createBucket(s3Config.dataBucket)
_ <- backupClient.backup.run()
_ <- akka.pattern.after(delay)(Future.successful(()))
_ <- pekko.pattern.after(delay)(Future.successful(()))
bucketContents <-
S3.listBucket(s3Config.dataBucket, None, s3Headers)
.withAttributes(s3Attrs)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.backup.KafkaConsumer
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import org.apache.pekko

import pekko.actor.ActorSystem
import pekko.kafka.CommitterSettings
import pekko.kafka.ConsumerMessage
import pekko.kafka.ConsumerSettings
import pekko.kafka.scaladsl.Consumer
import pekko.stream.SharedKillSwitch
import pekko.stream.scaladsl.SourceWithContext

class KafkaConsumerWithKillSwitch(
configureConsumer: Option[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.s3.MinioS3Test
import io.aiven.guardian.pekko.AnyPropTestKit
import org.apache.pekko.actor.ActorSystem

class MinioBackupClientSpec
extends AnyPropTestKit(ActorSystem("MinioS3BackupClientSpec"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import com.softwaremill.diffx.scalatest.DiffMustMatcher._
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.Utils
import io.aiven.guardian.kafka.backup.MockedBackupClientInterface
Expand All @@ -18,7 +12,9 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators._
import io.aiven.guardian.kafka.s3.S3Spec
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import io.aiven.guardian.pekko.AnyPropTestKit
import org.apache.pekko
import org.mdedetrich.pekko.stream.support.CirceStreamSupport
import org.scalatest.matchers.must.Matchers

import scala.concurrent.ExecutionContext
Expand All @@ -27,6 +23,12 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps

import pekko.actor.ActorSystem
import pekko.stream.connectors.s3.S3Settings
import pekko.stream.connectors.s3.scaladsl.S3
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source

class MockedKafkaClientBackupConsumerSpec
extends AnyPropTestKit(ActorSystem("MockedKafkaClientBackupClientSpec"))
with S3Spec
Expand Down Expand Up @@ -70,7 +72,7 @@ class MockedKafkaClientBackupConsumerSpec
val calculatedFuture = for {
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
bucketContents <- akka.pattern.after(10 seconds)(
bucketContents <- pekko.pattern.after(10 seconds)(
S3.listBucket(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq)
)
keysSorted = bucketContents.map(_.key).sortBy(Utils.keyToOffsetDateTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package io.aiven.guardian.kafka.backup.s3

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.scaladsl.Source
import io.aiven.guardian.kafka.backup.MockedBackupClientInterface
import io.aiven.guardian.kafka.backup.MockedKafkaConsumerInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.apache.pekko

import scala.concurrent.duration._
import scala.language.postfixOps

import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.connectors.s3.S3Headers
import pekko.stream.connectors.s3.S3Settings
import pekko.stream.scaladsl.Source

class MockedS3BackupClientInterface(
kafkaData: Source[ReducedConsumerRecord, NotUsed],
timeConfiguration: TimeConfiguration,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.backup.configs.Compression
import io.aiven.guardian.pekko.AnyPropTestKit
import org.apache.pekko.actor.ActorSystem

class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with RealS3BackupClientTest {
override val compression: Option[Compression] = None
Expand Down
Loading

0 comments on commit db4f3fe

Please sign in to comment.